• canal rocketmq


    上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。

    使用队列,可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里,也可以直接使用canal server, canal server原生支持几种队列:Kafka, RocketMQ ,RabbitMQ, PulsarMQ, 下面了解一下canal sever具体的处理过程。

    canal server将消息投递到mq中

    在canal server中,如果检测到配置了mq, 就会启动线程来读取bin log事件,并投递到mq中:
    CanalMQStarter

    while (running && destinationRunning.get()) {
         Message message;
         if (getTimeout != null && getTimeout > 0) {
             message = canalServer.getWithoutAck(clientIdentity,
                 getBatchSize,
                 getTimeout.longValue(),
                 TimeUnit.MILLISECONDS);
         } else {
             message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
         }
    
         final long batchId = message.getId();
         int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
         if (batchId != -1 && size != 0) {
             canalMQProducer.send(canalDestination, message, new Callback() {
                 @Override
                 public void commit() {
                     canalServer.ack(clientIdentity, batchId); // 提交确认
                 }
                 @Override
                 public void rollback() {
                     canalServer.rollback(clientIdentity, batchId);
                 }
             }); // 发送message到topic
         } else {
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
                 // ignore
             }
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    从代码可以看到,首先调用getWithoutAck从实例获取事件,然后调用canalMQProducer.send将消息投递到队列中,如果投递成功就执行ack,否则执行rollback, 因为投递消息到队列是非常快的操作,所以这就降低了阻塞的风险。

    最终发送mq消息的代码如下(CanalRocketMQProducer):

        private void sendMessage(Message message, int partition) {
        	//...
                SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {
                    if (partition >= mqs.size()) {
                        return mqs.get(partition % mqs.size());
                    } else {
                        return mqs.get(partition);
                    }
                }, null);
    		//...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里有个分区的概念,对于RocketMQ来说就是队列选择,这关系到顺序消费。

    业务代码使用RocketMQCanalConnector消费数据

        while (running) {
            try {
                connector.connect();
                connector.subscribe();
                while (running) {
                    List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取message
                    for (Message message : messages) {
                        long batchId = message.getId();
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            // try {
                            // Thread.sleep(1000);
                            // } catch (InterruptedException e) {
                            // }
                        } else {
                            printSummary(message, batchId, size);
                            printEntry(message.getEntries());
                            // logger.info(message.toString());
                        }
                    }
                    connector.ack(); // 提交确认
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    
        connector.unsubscribe();
        // connector.stopRunning();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    可以看到这和之前ClusterCanalConnector一样的处理方法,只是底层实现不一样,在subscribe的时候,调用了mq的subscribe:

        public synchronized void subscribe(String filter) throws CanalClientException {
               //...
                rocketMQConsumer.subscribe(this.topic, "*");
                rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
                    @Override
                    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
                        context.setAutoCommit(true);
                        boolean isSuccess = process(messageExts);
                        if (isSuccess) {
                            return ConsumeOrderlyStatus.SUCCESS;
                        } else {
                            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }
                    }
                });
                rocketMQConsumer.start();
            //...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    可以看到这里使用了MessageListenerOrderly来进行顺序消费, 使用process来处理消息

    private boolean process(List<MessageExt> messageExts) {
            //...
            for (MessageExt messageExt : messageExts) {
    			//...
                        if (!flatMessage) {
                            Message message = CanalMessageDeserializer.deserializer(data);
                            messageList.add(message);
                        } else {
                            FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
                            messageList.add(flatMessage);
                        }
                    
            ConsumerBatchMessage batchMessage;
            if (!flatMessage) {
                batchMessage = new ConsumerBatchMessage<Message>(messageList);
            } else {
                batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
            }
            try {
                messageBlockingQueue.put(batchMessage);
            } catch (InterruptedException e) {
                logger.error("Put message to queue error", e);
                throw new RuntimeException(e);
            }
            boolean isCompleted;
            try {
                isCompleted = batchMessage.waitFinish(batchProcessTimeout);
            } catch (InterruptedException e) {
                logger.error("Interrupted when waiting messages to be finished.", e);
                throw new RuntimeException(e);
            }
            boolean isSuccess = batchMessage.isSuccess();
            return isCompleted && isSuccess;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    这里将数据放到了messageBlockingQueue中,然后等待消息执行完成, ConsumerBatchMessage内置了一个CountDownLatch, batchMessage.waitFinish会阻塞在这里。
    客户端使用getFlatList/getFlatListWithoutAck取数据时,就是从messageBlockingQueue取出数据,调用ack时,会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。

        @Override
        public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
                if (this.lastGetBatchMessage != null) {
                    throw new CanalClientException("mq get/ack not support concurrent & async ack");
                }
                ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
               //...
        }
        @Override
        public void ack() throws CanalClientException {
                if (this.lastGetBatchMessage != null) {
                    this.lastGetBatchMessage.ack();
                }
            //...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    对于MessageListenerOrderly来说,是一个消费线程对应一个mq队列的,从而实现多线程消费,而这里把不同mq队列的消息在messageBlockingQueue中排队,并且使用getListWithoutAck/ack也不支持并发,又变成了单线程模式,这可能对性能造成影响,建议生产环境对性能有要求时,采用自己写代码来实现mq的消费。

    配置

    mq相关参数说明

  • 相关阅读:
    systemverilog:interface中的modport用法
    非素数模下的二次剩余
    ssm手机销售网站的设计与实现毕业设计-附源码
    2023年建筑电工(建筑特殊工种)证考试题库及建筑电工(建筑特殊工种)试题解析
    LeetCode7-整数反转
    Unigram,Bigram,N-gram介绍
    集线器与交换机、虚拟局域网(3.3)
    redis缓存穿透、击穿、雪崩
    6. 项目管理之进度管理
    【对比】Gemini:听说GPT-4你小子挺厉害
  • 原文地址:https://blog.csdn.net/crazyman2010/article/details/133778262