• RocketMQ实践与原理分析(Docker安装RocketMQ)


    前言

    QBM之前使用的消息中间件是ActiveMQ,后续需要升级为RocketMQ。
    MQ广泛应用于很多业务场景中,主要的作用

    • 异步解耦
    • 削峰

    常用MQ中间件对比,参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis

    协议和特点消息有序性定时消息批量消息广播消息消息过滤服务器触发的重新投递消息存储
    ActiveMQPush model, support OpenWire, STOMP, AMQP, MQTT, JMSExclusive (独自)Consumer or Exclusive Queues can ensure orderingSupportedNot SupporedSupportedSupportedNot SupportedSupports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB
    KafkaPull model, support TCPEnsure ordering of messages within a partitionNot SupportedSupported, with async producerNot SupportedSupported, you can use Kafka Streams to filter messagesNot SupportedHigh performance file storage
    RocketMQPull model, support TCP, JMS, OpenMessagingEnsure strict ordering of messages,and can scale out gracefullySupportedSupported, with sync mode to avoid message lossSupportedSupported, property filter expressions based on SQL92SupportedHigh performance and low latency file storage

    通过学习并结合业务知识,重点思考的问题:

    • 顺序性消费?顺序消费场景某个消息失败导致消息挤压?
    • 消息的挤压?如何根据业务划分topic和tag?相同tag分group?,同一业务消息有相同的key?
    • 消息消费的多线程问题?某个业务场景比如财务需要单线程消费?

    目录

    • 概述
    • 实践
    • 原理分析

    一、概述

    RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

    基于最基础的发布订阅模型,而在实际的应用中,结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区(对应Message Queue),同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
    image.png

    ps:存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。

    核心概念

    • Group:一类生产者或者消费者,每个包含GroupID
    • Producer:消息发布者,RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
    • Consumer:消息订阅者,从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费
    • Message Queue:消息存储和传输的实际容器,也是消息的最小存储单元。每个Topic下会由一个到多个队列来存储消息,RocketMQ 对 Topic 进行了分区,这种操作被称为队列(MessageQueue)。队列可以提升消息发送和消费的并发度。
      • 死信队列:用于处理无法被正常消息的消息,当一条信息初次消费失败,消息队列RocketMQ会自动进行消息重试,达到重试最大次数仍然失败的话,若消费仍然失败,该消息不会被丢弃,而是直接发到设置的该Consumer对应的死信队列里面。
    • Topic:消息主题
    • Message:生产者向Topic发送并最终传给消费者的数据消息载体,生产者为消息定义的属性成为消息属性,包含Message Key和Tag,MQ本身会生成一个Message ID。

    消息类型分类

    • 一次性消息:Exactly-Once:Consumer消费一次仅能消费一次。
    • 集群消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID均分消费。
    • 广播消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID全量消费。
    • 定时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟到当前时间点之后某一个时间点才发给Consumer。
    • 延时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟一定时间才发给Consumer。
    • 事务消息:类似X/Open XA的分布式事务功能。
    • 顺序消息:一种按照顺序进行发布和消费的消息模型,分为全局顺序消息和分区顺序消息。
      • 全局顺序消息:对于指定的一个Topic,所有的消息按照严格的FIFO的顺序进行发布和消费。
      • 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一分区内的消息按照严格的FIFO顺序进行发布的消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。

    其他消息相关

    • ConsumerOffset消费位点:一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
      • 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅Topic的消费进度,设置完成后Consumer将接受设定时间点之后由Producer发送到消息队列服务端的消息。
    • MessageQueueOffset消息位点:消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点
    • MessageKey消息索引:通过设置的消息索引可以快速查找到对应的消息内容。
    • 消息堆积:消息被堆积在了RocketMQ的服务端,Consumer没有能力消费来不及消费。
    • 消息过滤:Consumer可以根据Tag对消息进行过滤,确保Consumer最终只能接受到被过滤后的消息,过滤操作是在服务端完成的。
    • 消息轨迹:消息从生产到消费过程的链路追钟,方便定位排查问题。
    • 消息视图:面向开发者提供的消息只读性接口,通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身修改。

    参考:

    二、实践

    安装RocketMQ参考:https://rocketmq.apache.org/docs/quickStart/01quickstart
    容器安装RocketMQ,需要分开安装Nameserver容器和Broker容器以及控制台Console容器,其中Nameserver和Broker的连接通过broker.conf
    这样做是为了解耦和方便管理:https://juejin.cn/post/7218438764100108325
    开发测试直接使用docker安装

    # 拉取镜像
    docker pull rocketmqinc/rocketmq
    
    # 一、启动NameServer容器,创建一个新的容器并指定 RocketMQ 的镜像
    docker run -d \
    --name rmqnamesrv \
    -p 9876:9876 \
    -v /home/docker/mydata/rocketmq/conf:/root/config \
    -v /home/docker/mydata/rocketmq/logs:/root/logs \
    -e "JAVA_OPTS=-Duser.home=/opt" \
    rocketmqinc/rocketmq \
    sh mqnamesrv 
    
    # 参数说明:
    -d 以守护线程方式启动
    --name rmqnamesrv 设置容器名称
    -p 9876:9876 端口映射
    -v 把容器内的/root/logs日志路径挂载到宿主机的自定义路径中(需根据自己的路径自行创建)
    -v 把容器内的/root/store数据存储目录挂载到宿主机的自定义目录(需根据自己的路径自行创建)
    rocketmqinc/rocketmq 使用镜像的名称
    sh mqnamesrv 执行name server脚本
    
    
    # 进入容器
    docker exec -it d60b /bin/bash
    
    # 修改broker.conf文件,设置通信的brokerIP
    vi ... /conf/broker.conf,然后添加brokerIP1 = xxx.xxx.xxx.xxx,内容为宿主机的IP
    
    # broker.conf的其他配置项
    
    # 所属集群名称,如果节点较多可以配置多个
    brokerClusterName = DefaultCluster
    #broker名称,master和slave使用相同的名称,表明他们的主从关系
    brokerName = broker-a
    #0表示Master,大于0表示不同的slave
    brokerId = 0
    #表示几点做消息删除动作,默认是凌晨4点
    deleteWhen = 04
    #在磁盘上保留消息的时长,单位是小时
    fileReservedTime = 48
    #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
    brokerRole = ASYNC_MASTER
    #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
    flushDiskType = ASYNC_FLUSH
    
    # 回到宿主机,将broker.conf拷贝到宿主机
    # nameserver容器内配置文件/opt/rocketmq-4.4.0/conf
    
    docker cp d60b:/opt/rocketmq-4.4.0/conf/broker.conf /home/docker/mydata/rocketmq/conf/broker.conf
    
    
    
    
    
    
    # 二、启动Broker容器
    
    docker run -d  \
    --name rmqbroker \
    --link rmqnamesrv:namesrv \
    -p 10911:10911 \
    -p 10909:10909 \
    -v  /home/docker/mydata/rocketmq/broker/logs:/root/logs \
    -v  /home/docker/mydata/rocketmq/broker/store:/root/store \
    -v /home/docker/mydata/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
    -e "NAMESRV_ADDR=namesrv:9876" \
    -e "MAX_POSSIBLE_HEAP=200000000" \
    rocketmqinc/rocketmq \
    sh mqbroker -c  ../conf/broker.conf 
    
    
    # 参数说明
    --link rmqnamesrv:namesrv  和rmqnamesrv容器通信
    -p 10911:10911 把容器的非vip通道端口挂载到宿主机
    -p 10909:10909 把容器的vip通道端口挂载到宿主机
    -e “NAMESRV_ADDR=namesrv:9876”  指定namesrv的地址为本机namesrv的ip地址:9876
    -e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker 指定broker服务的最大堆内存(暂未配置)
    sh mqbroker -c  ../conf/broker.conf  读取../conf/broker.conf配置并启动broker
    
    
    # 三、安装控制台
    docker pull styletang/rocketmq-console-ng
    
    
    docker run -d \
    -p 8081:8080 \
    -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=120.46.82.131:9876 -Drocketmq.config.isVIPChannel=false" \
    styletang/rocketmq-console-ng
    
    # 四、访问控制台(别忘了开8081防火墙)
    xxx.xxx.xxx.xxx:8081
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    2.1、整合SpringBoot使用
    2.2、消息概念

    RocketMQ消息构成分为四部分

    • topic
    • body
    • properties:消息属性
      • keys:消息的业务关键词,方面MQ根据该值创建哈希索引,便于console查找消息。
      • delayTimeLevel:消息延时级别,0表示不延时
      • waitStoreMsgOK:消息是否在服务器落盘后才会应答
    • transactionId:消息事务id,会在事务消息中使用。
    • flag:业务设置的标识

    企业微信截图_16949357341722.png

    什么时候该用Topic?什么时候该用Tag?从下面四点考虑

    • 消息类型是否一致:如事务消息、顺序消息、普通消息一般Topic不同
    • 业务是否关联:没有之间关联的一般使用Topic
    • 消息优先级:不同优先级的一般使用Topic
    • 消息量级:有些业务消息虽然量小但是实时性要求高,不能和量级大的用一个Topic,否则可能会被饿死。

    总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

    2.3、消息发送方式demo
    • 同步发送:发送消息收到上一条发送成功再发送下一条。可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
    • 异步发送:发送消息不必等待结果进而发下一条消息。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。send方法传递回调函数处理发送结果。
    • 单向模式发送:只发消息,不等待结果,无回调。一般在微秒级别,一般用在日志场景。
    • 顺序发送:消息按照FIFO发送和消费,发送的时候按照分区key将消息投放到某一个队列,消费按照顺序消费。在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步。需要同时保证生产和消费有序。
      • 生产顺序性:需要保证单一生产者串行发送两个条件。
      • 消费顺序性:按照FIFO消费。
    • 延时发送:消息到达之后延时一段时间再投递到Consumer消费。支持18个等级的延迟投递,每个等级对应着不同的延时时间。
    • 批量发送:将消息聚成一批以后发送,可以增加吞吐率,并减少API和网络调用的次数。注意批量消息大小不能超1MB
    • 事务发送:TCC保证数据强一致的消息,在对一些对数据有着一致性强要求的场景选用,保证上下游数据一致。本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。事务消息支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

    顺序消息场景举例
    例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。

    顺序消息如何保证严格的消息顺序性?
    如果某Broker掉线,此时队列总数会发生变化,如何保证消息顺序性?如果队列数量变化,同一个分片key的消息可能会被发送到不同的队列上,也可能发送到掉线的Broker队列,会发送失败,针对第一种情况,就不能保证消息的顺序性。RocketMQ提供两种模式,如果需要保证严格顺序性而不是可用性创建Topic指定 --order = true。其次要保证NameServer中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

    事务消息如何保证事务性的?

    • 消息的发送分为两个阶段,也就是TCC,第一个阶段会发送一个半事务消息(暂时不能投递给Consumer的消息,消息已经存在Broker了),第二阶段是判断是否收到生产者对该消息的二次确认,执行Commit or Rollback。
    • 由于网络闪断、生产者应用重启等原因,导致某事务消息的二次确认丢失,为防止一直卡在暂不能投递的状态,Broker端会扫描发现,需要主动向生产者询问该消息的最终状态(Commit or Rollback),询问也是固定次数,超过默认回滚。

    image.png

    ps:此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

    package scl.controller;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.*;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import scl.mq.RocketMqConstants;
    
    import java.io.UnsupportedEncodingException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @projectName: learn-projects-demo
     * @package: scl.controller
     * @className: TestController
     * @author: sichaolong
     * @description: TODO
     * @date: 2023/9/17 12:38
     * @version: 1.0
     */
    @RestController
    public class TestController {
        @Autowired
        private RocketMQTemplate rocketmqTemplate;
    
        @GetMapping("/template/test")
        public SendResult testRocketMqTemplate() {
            Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
            rocketmqTemplate.send(RocketMqConstants.TOPIC_TEST, msg);
            SendResult sendResult = rocketmqTemplate.syncSend(RocketMqConstants.TOPIC_TEST_SYNC, msg);
            System.out.println(sendResult);
            return sendResult;
        }
    
    
        /**
         * 同步发送消息
         * @throws Exception
         */
        @GetMapping("/client/sync/test")
        public void testRocketMqClientSync() throws Exception {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
            //(1)设置NameServer地址
            producer.setNamesrvAddr("120.46.82.131:9876");
            //(2)启动producer
            producer.start();
            for (int i = 0; i < 100; i++) {
                // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
                org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
                        /* Topic */,
                        "TagA"
                        /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                        /* Message body */
                );
                //(3)利用producer进行发送,并同步等待发送结果
                SendResult sendResult = producer.send(msg);
                //(4)
                System.out.printf("%s%n", sendResult);
            }
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        }
    
    
        /**
         * 异步发送消息
         * @throws Exception
         */
        @GetMapping("/client/async/test")
        public void testRocketMqClientAsync() throws Exception {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
            //(1)设置NameServer地址
            producer.setNamesrvAddr("120.46.82.131:9876");
            //(2)启动producer
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
            int messageCount = 100;
            final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
            for (int i = 0; i < messageCount; i++) {
                try {
                    final int index = i;
                    // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
                    org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
                            /* Topic */,
                            "TagA"
                            /* Tag */,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                            /* Message body */
                    );
                    // 异步发送消息, 发送结果通过callback返回给客户端
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index,
                                    sendResult.getMsgId());
                            countDownLatch.countDown();
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                            countDownLatch.countDown();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                    countDownLatch.countDown();
                }
            }
            //异步发送,如果要求可靠传输,必须要等回调接口返回明确结果后才能结束逻辑,否则立即关闭Producer可能导致部分消息尚未传输成功
            countDownLatch.await(5, TimeUnit.SECONDS);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        }
    
        /**
         * 发送单一消息
         * @throws Exception
         */
    
        @GetMapping("/client/once/test")
        public void testRocketMqClientOnce() throws Exception {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
            //(1)设置NameServer地址
            producer.setNamesrvAddr("120.46.82.131:9876");
            //(2)启动producer
            producer.start();
            for (int i = 0; i < 100; i++) {
                // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
                org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
                        /* Topic */,
                        "TagA"
                        /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                        /* Message body */
                );
                // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
                producer.sendOneway(msg);
            }
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        }
    
    
        /**
         * 发送顺序消息
         * @throws Exception
         */
        @GetMapping("/client/ordered/test")
        public void testRocketMqClientOrdered() throws Exception {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
                //(1)设置NameServer地址
                producer.setNamesrvAddr("120.46.82.131:9876");
                producer.start();
    
                String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
                for (int i = 0; i < 100; i++) {
                    int orderId = i % 10;
                    // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
                    org.apache.rocketmq.common.message.Message msg =
                            new org.apache.rocketmq.common.message.Message(
                                    "TopicTest",
                                    tags[i % tags.length],
                                    "KEY" + i,
                                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, org.apache.rocketmq.common.message.Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                        // 这里传递的orderId,作为分区key
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                }
    
                producer.shutdown();
            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发送延时消息
         * @throws Exception
         */
        @GetMapping("/client/delay/test")
        public void testRocketMqClientDelay() throws Exception {
    
            // Instantiate a producer to send scheduled messages
            DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
            //(1)设置NameServer地址
            producer.setNamesrvAddr("120.46.82.131:9876");
            producer.start();
            // Launch producer
            producer.start();
            int totalMessagesToSend = 100;
            for (int i = 0; i < totalMessagesToSend; i++) {
                org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
                        /* Topic */,
                        "TagA"
                        /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                        /* Message body */
                );
                // This message will be delivered to consumer 10 seconds later.
                msg.setDelayTimeLevel(3);
                // Send the message
                producer.send(msg);
            }
    
            // Shutdown producer after use.
            producer.shutdown();
        }
    
    
        /**
         * 发送批量消息
         * @throws Exception
         */
        @GetMapping("/client/batch/test")
        public void testRocketMqClientBatch() throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("test-client-producer-group");
            producer.setNamesrvAddr("120.46.82.131:9876");
    
            producer.start();
    
            //If you just send messages of no more than 1MiB at a time, it is easy to use batch
            //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
            String topic = "BatchTest";
            List< org.apache.rocketmq.common.message.Message> messages = new ArrayList<>();
            messages.add(new  org.apache.rocketmq.common.message.Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new  org.apache.rocketmq.common.message.Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new  org.apache.rocketmq.common.message.Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
    
            producer.send(messages);
        }
    
    
        /**
         * 发送事务消息
         * @throws Exception
         */
        @GetMapping("/client/transaction/test")
        public void testRocketMqClientTransaction() throws Exception {
    
            // 线程池
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
    
            // 事务管理器
            TransactionListener transactionListener = new TransactionListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("test-client-producer-group");
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            producer.start();
    
    
            for (int i = 0; i < 10; i++) {
                try {
    
                    org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("TopicTest"
                            /* Topic */,
                            "TagA"
                            /* Tag */,
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                            /* Message body */
                    );
                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    
    
        /**
         * 实现事务管理器
         */
        static class TransactionListenerImpl implements TransactionListener {
            private AtomicInteger transactionIndex = new AtomicInteger(0);
    
            private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
            /**
             * 执行事务
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(org.apache.rocketmq.common.message.Message msg, Object arg) {
                int value = transactionIndex.getAndIncrement();
                int status = value % 3;
                localTrans.put(msg.getTransactionId(), status);
                return LocalTransactionState.UNKNOW;
            }
    
            /**
             * 检查事务状态
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                Integer status = localTrans.get(msg.getTransactionId());
                if (null != status) {
                    switch (status) {
                        case 0:
                            return LocalTransactionState.UNKNOW;
                        case 1:
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case 2:
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        default:
                            return LocalTransactionState.COMMIT_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
    
    
    
    
    
    
    
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    2.4、消息消费相关

    消息消费分配策略:某一个消费者组在集群消费模式下(广播模式下无效),可以配置消息的分配策略, 通过设置消费者的consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),常见的策略

    • 平均分配:此种策略可以通过增加消费者的数量来提高并行度,通过增加消费者提高消费能力。
    • 机房优先分配
    • 一致性hash分配

    消息消费位点:每个队列会记录最小位点、最大位点。对于消费组还有消费位点的概念

    • 在集群模式下:某一个消费者组的消费位点由客户端提交给服务端保存。
    • 在广播模式下:消费位点是由客户端自己保存的。

    一般情况下消费位点正常更新,不会存在消息重复,如果某个消费者宕机 or 新的消费者 加入消费者组,在集群模式下,就会触发重平衡,每个消费者之前消费的队列可能发生改变,针对某个队列的最大位点值是由客户端消费之后同步到队列的,过程存在延迟,导致消费者在重新更换消费队列的时候读取该队列脏的消费位点,导致该队列消息消费少量重复。

    **消费方式推、拉:**MQ的消费模式可以大致分为两种,一种是Push、一种是Pull。

    • Push:服务端推消息给客户端,及时性好,但是如果客户端没做好流控,可能会导致消息挤压或者崩溃。
    • Pull:客户端主动拉取消息,客户端可以根据自己能力消费,但是客户端拉取的频率需要控制好,太高导致服务端客户端压力,太低导致消息消费不及时。
    2.4.1、Push模式消费消息

    (1)Push模式并发消费API

    // 设置消费者组的当前消费者 为 集群模式(默认)
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 设置消费者组的当前消费者 为 广播模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    
    // Push 模式
    public class Consumer {
      public static void main(String[] args) throws InterruptedException, MQClientException {
        // 初始化consumer,并设置consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
       
        // 设置NameServer地址 
        consumer.setNamesrvAddr("localhost:9876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
        consumer.subscribe("TopicTest", "*");
        //注册回调接口来处理从Broker中收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
        });
        // 启动Consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    (2)Push模式顺序消费API:集群模式下,同一消费组的消费者存在并发消费的时候,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证消息在同一个队列中按照FIFO的顺序,也无法保证消息实际被顺序消费。上面代码实现MessageListenerConcurrently接口,保证并发消费,有序性消费如何实现?

    consumer.registerMessageListener(new MessageListenerOrderly() {
        AtomicLong consumeTimes = new AtomicLong(0);
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            this.consumeTimes.incrementAndGet();
            if ((this.consumeTimes.get() % 2) == 0) {
                return ConsumeOrderlyStatus.SUCCESS;
            } else if ((this.consumeTimes.get() % 5) == 0) {
                context.setSuspendCurrentQueueTimeMillis(3000);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    (3)Push模式的消息重试:只在集群模式下某个Consumer消费失败的消息会被Broker重新投递给其他消费者消费,若达到最大重试次数还没被成功消费,则消息将被投递到死信队列。

    // 重试次数
    consumer.setMaxReconsumeTimes(10);
    // 重试间隔
    consumer.setSuspendCurrentQueueTimeMillis(5000);
    
    • 1
    • 2
    • 3
    • 4

    顺序消费和并发消费重试机制不同,

    • 顺序消费需要保证顺序,即使某个消息消费失败,需要在客户端本地完成最大重试次数,这样可以避免消息被跳过导致消息FIFO被打乱。
    • 并发消费无需保证顺序,某个消息消费失败则消息被重新投递到Broker,等待Broker重新投递回来,后面的消息不影响消费。这里也并不是投递到Broker的原Topic,而是放到了一个特殊的Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。

    (4)Push模式的死信队列:消息重试至一定次数仍不被成功消费,被发送到特殊的队列中,称为死信队列Dead-Letter Queue,死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

    疑问:顺序消息进入死信队列影响后续的消息消费吗?
    当顺序消息进入死信队列时,RocketMQ会将它标记为无法正常消费的消息,并将其存储在DLQ中。这意味着这些消息将不再参与后续的正常消费流程,但它们并不会直接影响后续的消息。

    2.4.2、Pull模式消费消息

    存在两种Pull模式,一种是比较原始的Pull Consumer,另外一种是Lite Pull Consumer

    • Pull Consumer:它不提供相关的订阅方法,需要调用pull方法时指定队列进行拉取,并需要自己更新位点
    • Lite Pull Consumer:它提供了Subscribe和Assign两种方式,使用起来更加方便。RocketMQ 4.6.0推出的新的Pull Consumer

    普通的Pull Consumer API

    public class PullConsumerTest {
      public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        try {
        // Pull Consumer 模式
        // 指定需要pull的队列,获取topic下的全部队列:Set queueSet =  consumer.fetchSubscribeMessageQueues("TopicTest");
        // 同一个消费组下的多个LitePullConsumer会负载均衡消费,与PushConsumer一致。
          MessageQueue mq = new MessageQueue();
          mq.setQueueId(0);
          mq.setTopic("TopicTest");
          mq.setBrokerName("jinrongtong-MacBook-Pro.local");
          long offset = 26;
          PullResult pullResult = consumer.pull(mq, "*", offset, 32);
          if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
            System.out.printf("%s%n", pullResult.getMsgFoundList());
            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
        consumer.shutdown();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Lite Pull Consumer 的Subscribe API

    public class LitePullConsumerSubscribe {
        public static volatile boolean running = true;
        public static void main(String[] args) throws Exception {
            DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
            litePullConsumer.subscribe("TopicTest", "*");
            litePullConsumer.setPullBatchSize(20);
            litePullConsumer.start();
            try {
                while (running) {
                    // litePullConsumer默认是自动提交位点
                    List<MessageExt> messageExts = litePullConsumer.poll();
                    System.out.printf("%s%n", messageExts);
                }
            } finally {
                litePullConsumer.shutdown();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    Lite Pull Consumer 的Assign API

    三、原理分析

    参卡:4.x官网文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis

    3.1、RocketMQ部署模型

    Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?

    RocketMQ部署架构上主要分为四个部分
    image.png

    • 生产者Producer:通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
    • 消费者Consumer:消费消息角色,支持推Push、拉Pull两种模式对消息消费,同时也支持集群方式和广播方式的消费。
    • 域名服务器NameServer:一个简单的Topic路由注册中心,支持Topic、Broker的动态注册与发现。一般集群部署,各实例间相互不进行信息通讯,集群中的每个NameServer都全量的保存完整的路由信息,某个NameServer下线也不影响可用性。主要包括两个功能
      • Broker管理:接受Broker集群的注册信息并保存下来作为路由信息,然后提供心跳检测机制,检测Broker是否还存活。
      • 路由信息管理:保存关于Broker集群的整个路由信息 和 用于客户端查询的队列信息。
    • **代理服务器Broker:**主要负责消息的存储、投递和查询以及服务高可用保证。因为各个Broker中的信息不一样,不能简单像NameServer那样直接集群部署,而是需要采取主从模式集群架构。Broker采取Master-Slave结构,通过指定相同的BrokerName、不同的BrokerId来区分主(BrokerId = 0)、从(BrokerId = 1)

    小结

    • Broker注册:每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
    • Producer注册:Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
    • **Consumer注册:**Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

    // TODO

  • 相关阅读:
    数据结构与算法(三) 深度优先搜索
    windows+Reids可视化工具RESP.app的安装以及使用
    c++学习day3 c++指针
    Debezium的基本使用(以MySQL为例)
    模型降阶方法之 POD
    Java学习笔记——Collection之Queue
    深信服-逆向笔试复盘 9月1日
    Mysql概念
    【django问题集】django.db.utils.OperationalError: (1040, ‘Too many connections‘)
    SpringMVC 学习(六)乱码问题
  • 原文地址:https://blog.csdn.net/qq_24654501/article/details/132926825