• 消息的并发处理


    看一下实现消息并发处理的代码,并发处理会增大实现流量控制、保证消息顺序方面的难度。

    1 并发处理过程

    处理效率的高低是反应Consumer实现好坏的重要指标,本节以ConsumeMessageConcurrentlyService类为例来分析RocketMQ的实现方式。ConsumeMessageConcurrentlyService类在org.apache.rocketmq.client.impl.consumer包中。

    这个类定义了三个线程池,一个主线程池用来正常执行收到的消息,用户可以自定义通过consumeThreadMin和consumeThreadMax来自定义线程个数。另外两个都是单线程的线程池,一个用来执行推迟消费的消息,另一个用来定期清理超时消息(15分钟),如代码清单11-8所示。

    代码清单11-8 三个线程池

    this.consumeExecutor = new ThreadPoolExecutor(
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60,
        TimeUnit.MILLISECONDS, this.consumeRequestQueue,
        new ThreadFactoryImpl("ConsumeMessageThread_"));
    this.scheduledExecutorService =
        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "ConsumeMessageScheduledThread_"));
    this.cleanExpireMsgExecutors =
        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "CleanExpireMsgScheduledThread_"));

     

    从Broker获取到一批消息以后,根据BatchSize的设置,把一批消息封装到一个ConsumeRequest中,然后把这个ConsumeRequest提交到consumeExecutor线程池中执行,如代码清单11-9所示。

    代码清单11-9 任务分发逻辑

    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs,
            processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List msgThis = new ArrayList
                (consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis,
                processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }

     

    消息的处理结果可能有不同的值,主要的两个是CONSUME_SUCCESS和RECONSUME_LATER。如果消费不成功,要把消息提交到上面说的scheduledExecutorService线程池中,5秒后再执行;如果消费模式是CLUSTERING模式,未消费成功的消息会先被发送回Broker,供这个ConsumerGroup里的其他Consumer消费,如果发送回Broker失败,再调用RECONSUME_LATER,消息消费的Status处理逻辑如代码清单11-10所示。

    代码清单11-10 消息消费的Status处理逻辑

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size
                (); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop " +
                    "it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List msgBackFailed = new ArrayList
                (consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size
                (); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                this.submitConsumeRequestLater(msgBackFailed,
                    consumeRequest.getProcessQueue(), consumeRequest
                        .getMessageQueue());
            }
            break;
        default:
            break;
    }

     

    处理逻辑是用户自定义的,当消息量大的时候,处理逻辑执行效率的高低影响系统的吞吐量。可以把多条消息组合起来处理,或者提高线程数,以提高系统的吞吐量。

    2 ProcessQueue对象

    在前面的源码中,有个ProcessQueue类型的对象,这个对象的功能是什么呢?从Broker获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态,比如如何获得当前消息堆积的数量,如何解决处理超时情况等。RocketMQ定义了一个快照类ProcessQueue来解决这些问题,在PushConsumer运行的时候,每个Message Queue都会有一个对应的ProcessQueue对象,保存了这个Message Queue消息处理状态的快照,如代码清单11-11所示。ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到但是还未被处理的消息,读写锁控制着多个线程对TreeMap对象的并发访问。

    代码清单11-11 保存消息消费的状态

    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    private final TreeMap msgTreeMap = new TreeMap();
    private final AtomicLong msgCount = new AtomicLong();
    private final AtomicLong msgSize = new AtomicLong();
    private final Lock lockConsume = new ReentrantLock();

     

    有了ProcessQueue对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息。顺序消息是通过ConsumeMessageOrderlyService类实现的,主要流程和ConsumeMessageConcurrentlyService类似,区别只是在对并发消费的控制上。

  • 相关阅读:
    RxJava(四)-过滤操作符
    【计算机毕设选题】计算机毕业设计选题,500个热门选题推荐
    MySQL 定时清空某表
    C语言:二维数组的传递
    LeetCode·每日一题·754.到达终点数字·数学
    NeurIPS 2023 | MQ-Det: 首个支持多模态查询的开放世界目标检测大模型
    Java内存马相关文章
    JavaScript -- Map对象及常用方法介绍
    系统集成|第二十章(笔记)
    Spring学习(2) Spring的IOC底层实现
  • 原文地址:https://blog.csdn.net/zhao_god/article/details/134545345