• zookeeper应用之分布式队列


    队列这种数据结构都不陌生,特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能,这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。

    这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点,消费者watcher监听节点新增事件来消费消息。

    生产者:

    CuratorFramework client = ...
    client.start();
    String path = "/testqueue";
    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,"11".getBytes())
    • 1
    • 2
    • 3
    • 4
    • 5

    消费者:

    CuratorFramework client = ...
    client.start();
    String path = "/testqueue";
    PathChildrenCache pathCache = new PathChildrenCache(client,path,true);
    pathCache.getListenable().addListener(new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED){
                ChildData data = event.getData();
    
                //handle msg
    
                client.delete().forPath(data.getPath());
            }
        }
    });
    pathCache.start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    使用curator queue:

    先来使用基本的队列类DistributedQueue。

    DistributedQueue的初始化需要提交准备几个参数:

    client连接就不多说了:

    CuratorFramework client = ...
    
    • 1

    QueueSerializer:这个主要是用来指定对消息data进行序列化和反序列化

    这里就搞一个简单的字符串类型:

    QueueSerializer<String> serializer = new QueueSerializer<String>() {
        @Override
        public byte[] serialize(String item) {
            return item.getBytes();
        }
    
        @Override
        public String deserialize(byte[] bytes) {
            return new String(bytes);
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    QueueConsumer消息consumer,当有新消息来的时候会调用consumer.consumeMessage()来处理消息

    这里也搞个简单的string类型的处理consumer

    QueueConsumer<String> consumer = new QueueConsumer<String>() {
        @Override
        public void consumeMessage(String s) throws Exception {
            System.out.println("receive msg:"+s);
        }
    
        @Override
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
    	//TODO
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    队列消息发布:

    //队列节点路径
    String queuePath = "/queue";
    //使用上面准备的几个参数构造DistributedQueue对象
    DistributedQueue<String> queue =  QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue();
    queue.start();
    //调用put方法生产消息
    queue.put("hello");
    queue.put("msg");
    Thread.sleep(2000);
    queue.put("3");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这样在启动测试程序在,consumer的consumeMessage方法就会收到queue.put的消息。

    这里有个问题有没有发现,在初始化queue的时候需要指定consumer,那岂不是只能同一个程序中生产消费,何来的分布式?

    其实这里在queue对象创建的时候consumer可以为null,这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。

    在DistributedQueue类的构造函数有一步设置isProducerOnly属性

    isProducerOnly = (consumer == null);
    
    • 1

    然后在start()方法会根据isProducerOnly来判断启动方式

    if ( !isProducerOnly || (maxItems != QueueBuilder.NOT_SET) )
    {
        childrenCache.start();
    }
    
    if ( !isProducerOnly )
    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call()
                    {
                        runLoop();
                        return null;
                    }
                }
            );
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    这里看到consumer为空,两个if不成立,不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。

    源码分析

    先从消息的发布也就是put方法

    首先调用makeItemPath()获取创建节点路径:

    ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);
    
    • 1

    这里QUEUE_ITEM_NAME=“queue-”。

    然后调用internalPut()方法来创建节点路径

    //先累加消息数量putCount
    putCount.incrementAndGet();
    //使用serializer序列化消息数据
    byte[]              bytes = ItemSerializer.serialize(multiItem, serializer);
    //根据background来创建节点
    if ( putInBackground )
    {
        doPutInBackground(item, path, givenMultiItem, bytes);
    }
    else
    {
        doPutInForeground(item, path, givenMultiItem, bytes);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    看doPutInForeground里就是具体的创建节点了

    //创建节点
    client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes);
    //哦,错了这里putCount不是总消息数,是正在创建消息数,创建完再回减
    synchronized(putCount)
    {
        putCount.decrementAndGet();
        putCount.notifyAll();
    }//如果有对应的lisener依次调用
    putListenerContainer.forEach(listener -> {
        if ( item != null )
        {
            listener.putCompleted(item);
        }
        else
        {
            listener.putMultiCompleted(givenMultiItem);
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消息的发布就完成了。

    然后是消息的consumer,这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作:

    1、childrenCache.start();

    childrenCache初始化是在queue的构造函数里

    childrenCache = new ChildrenCache(client, queuePath)
    
    • 1

    其start方法会调用

    private final CuratorWatcher watcher = new CuratorWatcher()
    {
      @Override
      public void process(WatchedEvent event) throws Exception
      {
        if ( !isClosed.get() )
        {
          sync(true);
        }
      }
    };
        private final BackgroundCallback  callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNewChildren(event.getChildren());
                }
            }
        };
        void start() throws Exception
        {
            sync(true);
        }
            private synchronized void sync(boolean watched) throws Exception
        {
            if ( watched )
            {//走这里
                client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
            }
            else
            {
                client.getChildren().inBackground(callback).forPath(path);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    这里先把代码都贴上,看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理,最后是调用到setNewChildren方法

    private synchronized void setNewChildren(List newChildren)
    {
        if ( newChildren != null )
        {
            Data currentData = children.get();
            //将数据设置到children变量里,消息版本+1
            children.set(new Data(newChildren, currentData.version + 1));
            //notifyAll() 等待线程获取消息
            notifyFromCallback();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里有引入了一个children变量,然后将数据设置到了该变量里。

    private final AtomicReference<Data> children = new AtomicReference<Data>(new Data(Lists.<String>newArrayList(), 0));
    
    • 1

    children其实是线程间通信一个共享数据容器变量。这里设置了数据,然后具体的数据消费在下一步。

    2、线程池里丢了个任务去执行runLoop();方法。

    回到DistributedQueue.start的第二步,执行runLoop()方法,看名字就应该知道了一直轮询获取消息。

    还是来看代码吧

    private void runLoop()
    {
        long         currentVersion = -1;
        long         maxWaitMs = -1;
            //while一直轮询
            while ( state.get() == State.STARTED  )
            {
                try
                {//从childrenCache里获取数据
                    ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                    currentVersion = data.version;
    
                    List<String>        children = Lists.newArrayList(data.children);
                    sortChildren(children); // makes sure items are processed in the correct order
    
                    if ( children.size() > 0 )
                    {
                        maxWaitMs = getDelay(children.get(0));
                        if ( maxWaitMs > 0 )
                        {
                            continue;
                        }
                    }
                    else
                    {
                        continue;
                    }
                    /**处理数据 这里取出消息后会删除节点,然后使用serializer反序列化节点数据,
                    调用consumer.consumeMessage来处理消息
                    **/
                    processChildren(children, currentVersion);
                }
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    这里获取数据使用了childrenCache.blockingNextGetData

    synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
    {
        long            startMs = System.currentTimeMillis();
        boolean         hasMaxWait = (unit != null);
        long            maxWaitMs = hasMaxWait ? unit.toMillis(maxWait) : -1;
        //数据版本没变一直wait等待
        while ( startVersion == children.get().version )
        {
            if ( hasMaxWait )
            {
                long        elapsedMs = System.currentTimeMillis() - startMs;
                long        thisWaitMs = maxWaitMs - elapsedMs;
                if ( thisWaitMs <= 0 )
                {
                    break;
                }
                wait(thisWaitMs);
            }
            else
            {
                wait();
            }
        }
        return children.get();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这里就有wait阻塞等消息,当消息来时候会被唤醒。

    其它类型队列:

    curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现,有兴趣的自己看吧。

  • 相关阅读:
    算法笔记(3)—— 快速 I/O 算法:快速输入算法、快速输出算法
    Netty入门——ByteBuf
    有哪些高质量的自学网站?
    微服务入门之某硅谷商城
    3D点云数据集制作实录【LiDAR】
    Docker 的数据管理 端口映射 容器互联 镜像的创建
    L2-041 插松枝
    2022年“网络安全”赛项驻马店市赛选拔赛 任务书
    图解MySQL中的JOIN类型
    表格与表单
  • 原文地址:https://blog.csdn.net/sinat_16493273/article/details/134511977