• Springboot整合RocketMQ之生产者


    上一篇rocketMQ文章介绍相关概念与理论,接下来看下整合springboot使用

    1、普通消息发送


    如果是集群就先创建Topic

    • 向集群中创建 Topic
      发送消息前,需要确保目标主题已经被创建和初始化。可以利用 RocketMQ Admin 工具创建目标 Topic 。
      RocketMQ 部署安装包默认开启了 autoCreateTopicEnable 配置,会自动为发送的消息创建 Topic,但该特性仅推荐在初期测试时使用。
      生产环境强烈建议管理所有主题的生命周期,关闭自动创建参数,以避免生产集群出现大量无效主题,无法管理和回收,造成集群注册压力增大,影响生产集群的稳定性。
    > sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -n 127.0.0.1:9876
    create topic to 127.0.0.1:10911 success.
    TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes=null]
    
    • 1
    • 2
    • 3
    • 添加客户端依赖
    
    			<dependency>
                    <groupId>org.apache.rocketmqgroupId>
                    <artifactId>rocketmq-spring-boot-starterartifactId>
                    <version>2.2.0version>
                dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 配置信息
    server:
      port: 8086
    #rocketmq配置信息
    rocketmq:
      #nameservice服务器地址(多个以英文逗号隔开)
      name-server: 服务器ip:9876
      #生产者配置 【这些值应放在.properties文件中!】
      producer:
    #    #组名
        group: shen-producer-group
    #    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    #    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    #    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    #    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    #    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    #    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    #    #目的地(topic:tag)
    #    #topic
    #    topic: shen-topic
    #    #sync tag(同步消息tag)
    #    sync-tag: shen-sync-tags
    #    #async tag(异步消息tag)
    #    async-tag: shen-async-tags
    #    #oneway tag(单向消息tag)
    #    oneway-tag: shen-oneway-tags
    
    logging:
      file:
        path: E:\workspace\log\mq_product\
    #    path: /usr/log/mqproductservice/mqproductservice.log
      level:
        root: INFO
        com.anran.projectmanage.mapper: DEBUG
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    # 不同类型的消息使用不同的topic
    rocketmq.producer.topic: shen-topic
    rocketmq.producer.group: shen-producer-group
    # sync tag(同步消息tag)
    rocketmq.producer.sync-tag: TagS
    #async tag(异步消息tag)
    rocketmq.producer.async-tag: TagA
    #oneway tag(单向消息tag)
    rocketmq.producer.oneway-tag: TagO
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    @RestController
    @RequestMapping("/rocket")
    public class RocketController {
        @Value(value = "${rocketmq.producer.topic}:${rocketmq.producer.sync-tag}")
        private String syncDestination;
        @Value(value = "${rocketmq.producer.topic}")
        private String syncTopic;
        @Value(value = "${rocketmq.producer.sync-tag}")
        private String syncTag;
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 消息发送
      Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

    1.1 同步发送

    同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
    在这里插入图片描述
    同步发送的整个代码流程如下:

    1. 首先会创建一个producer。普通消息可以创建DefaultMQProducer,创建时需要填写生产组的名称,生产者组是指同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
    2. 设置 NameServer 的地址。Apache RocketMQ很多方式设置NameServer地址(客户端配置中有介绍),这里是在代码中调用producer的API setNamesrvAddr进行设置,如果有多个NameServer,中间以分号隔开,比如"127.0.0.2:9876;127.0.0.3:9876"。
    3. 第三步是构建消息。指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤。
    4. 最后调用send接口将消息发送出去。同步发送等待结果最后返回SendResult,SendResut包含实际发送状态还包括SEND_OK(发送成功),FLUSH_DISK_TIMEOUT(刷盘超时), FLUSH_SLAVE_TIMEOUT(同步到备超时),
      SLAVE_NOT_AVAILABLE(备不可用),如果发送失败会抛出异常。
        /**
         * 发送同步消息
         * 消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式
         * 可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
         */
        @PostMapping("/syncSend")
        public String syncSend(@RequestBody UserContent user) {
            // 发送消息前插入数据库
            // 发送消息 tags在topic之后用:拼接 自动解析
            SendResult sendResult = rocketMQTemplate.syncSend(syncTopic + ":" + syncTag, JSONObject.toJSONString(user));
            System.out.printf("同步发送字符串%s,发送结果:%s", user.toString(), sendResult);
            // 发送消息后插入数据库
            // 解析发送结果
    //        if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    //            response = sendResult.getMsgId() + " : " + sendResult.getSendStatus();
    //        }
            return sendResult.getSendStatus().toString();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    DefaultMQProducer 方式发送

    public class SyncProducer {
      public static void main(String[] args) throws Exception {
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("sync_group"); //(1)
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");  //(2)
        // 启动producer
        producer.start();
        for (int i = 0; i < 100; i++) {
          // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
          Message msg = new Message("sync_topic" /* Topic */,
            "TagA" /* Tag */,
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );   //(3)
          // 利用producer进行发送,并同步等待发送结果
          SendResult sendResult = producer.send(msg);   //(4)
          System.out.printf("%s%n", sendResult);
        }
        // 一旦producer不再使用,关闭producer
        producer.shutdown();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    同步发送方式请务必捕获发送异常,并做业务侧失败兜底逻辑,如果忽略异常则可能会导致消息未成功发送的情况。

    1.2 异步发送

    异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
    在这里插入图片描述

    异步发送需要实现异步发送回调接口(SendCallback)。

    消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

    RocketMQTemplate方式发送

        /**
         * 异步发送
         * 发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息
         */
        @PostMapping("asyncSend")
        public void asyncSend(@RequestBody UserContent user) {
            System.out.printf("async send begin %s%n", LocalDateTime.now());
            rocketMQTemplate.asyncSend("shen-asyncSend:TagAsync", JSONObject.toJSONString(user), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    // 记录到数据库 成功
    //                MqLogSuccess mqLogSuccess = bulidMqLogSuccess(message, sendResult);
    //                mqLogSuccessMapper.insert(mqLogSuccess);
                    System.out.printf("异步发送成功%s%n", sendResult);
                }
    
                @Override
                public void onException(Throwable e) {
                    // 记录到数据库 失败
    //                MqLogFail mqLogFail = bulidMqLogFail(message);
    //                mqLogFailMapper.insert(mqLogFail);
                    System.out.printf("异步发送失败%s%n", e.getMessage());
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    DefaultMQProducer 方式发送

    public class AsyncProducer {
      public static void main(String[] args) throws Exception {
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("shen-asyncSend");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动producer
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
          final int index = i;
          // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
          Message msg = new Message("async_topic",
            "TagA",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
          // 异步发送消息, 发送结果通过callback返回给客户端
          producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
              System.out.printf("%-10d OK %s %n", index,
                sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
              System.out.printf("%-10d Exception %s %n", index, e);
              e.printStackTrace();
            }
          });
        }
        // 一旦producer不再使用,关闭producer
        producer.shutdown();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是send方法需要传入 SendCallback 的实现,SendCallback 接口主要有onSuccess 和 onException 两个方法,表示消息发送成功和消息发送失败。

    1.3 单向模式发送

    在这里插入图片描述
    发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

        /**
         * 单向模式发送
         * 发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答
         * 此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
         */
        @PostMapping("/sendOneWay")
        public void sendOneWay(@RequestBody UserContent user){
            // 消息埋点
    //        MqLogAll mqLogAll = bulidMqLogAll(message);
    //        mqLogAllMapper.insert(mqLogAll);记录到数据库
            // 发送消息 tags在topic之后用:拼接
            rocketMQTemplate.sendOneWay("oneway_topic:TagOneWay", JSONObject.toJSONString(user));
            System.out.println("sendOneWay发送成功");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    DefaultMQProducer 方式发送

    public class OnewayProducer {
      public static void main(String[] args) throws Exception{
        // 初始化一个producer并设置Producer group name
        DefaultMQProducer producer = new DefaultMQProducer("shen-oneWay");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动producer
        producer.start();
        for (int i = 0; i < 100; i++) {
          // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
          Message msg = new Message("TopicTest" /* Topic */,
            "TagA" /* Tag */,
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
          );
          // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
          producer.sendOneway(msg);
        }
         // 一旦producer不再使用,关闭producer
         producer.shutdown();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    单向模式调用sendOneway,不会对返回结果有任何等待和处理。

    2、顺序消息发送


    2.1 顺序消息介绍

    顺序消息是一种对消息发送和消费顺序有严格要求的消息。

    对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

    需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

    生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

    • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
    • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
    在这里插入图片描述
    顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。

    例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。
    在这里插入图片描述
    顺序消息示例代码

    class Producer {
        public static void main(String[] args) throws UnsupportedEncodingException {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("sequence_group");
                producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
                producer.start();
    
                String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
                for (int i = 0; i < 100; i++) {
                    int orderId = i % 10;
                    Message msg =
                            new Message("sequence_topic", tags[i % tags.length], "KEY" + i,
                                    ("Hello RocketMQ ".concat(i + "~~" + Utils.getFormatTime()).getBytes(RemotingHelper.DEFAULT_CHARSET)));
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                }
    
                producer.shutdown();
            } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2.2顺序消息的一致性

    如果一个Broker掉线,那么此时队列总数是否会发化?

    如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(–order)为true,表示顺序消息:

    > sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
    create topic to 127.0.0.1:10911 success.
    TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]
    
    • 1
    • 2
    • 3

    其次要保证NameServer中的配置 orderMessageEnablereturnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。

    3、 延时消息发送


    3.1 延时消息介绍

    延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

    在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
    在这里插入图片描述

    3.2 延时消息约束

    Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:
    在这里插入图片描述

    3.2 延时消息的使用场景

    比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

    public class ScheduledMessageProducer {
        public static void main(String[] args) throws Exception {
            // 实例化一个生产者来产生延时消息
            DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
            // 设置NameServer的地址
            producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
            // 启动生产者
            producer.start();
            int totalMessagesToSend = 5;
            for (int i = 0; i < totalMessagesToSend; i++) {
                Message message = new Message("TestTopic", (LocalDateTime.now()+",Hello scheduled message " + i).getBytes());
                // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间[18个等级])
                message.setDelayTimeLevel(3);
                // 发送消息
                producer.send(message);
            }
            // 关闭生产者
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    DelayTimeLevel:消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
    延时消息的实现逻辑需要先经过定时存储等待触发,延时时间到达后才会被投递给消费者。因此,如果将大量延时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。


    4、批量消息发送


    在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
    在这里插入图片描述
    示例代码:

    public class SimpleBatchProducer {
    
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
            producer.start();
    
            //If you just send messages of no more than 1MiB at a time, it is easy to use batch
            //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
            String topic = "BatchTest";
            List<Message> messages = new ArrayList<>();
            messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
    
            producer.send(messages);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里调用非常简单,将消息打包成 Collection msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同

    批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

    • 消息分割代码演示【支持单条超过大小限制】:
    /**
     * 消息列表分割器:其只会处理每条消息的大小不超4M的情况【将一个消息集合中的消息分割为多个消息列表(不超4M)】
     * 若存在某条消息,其本身大小大于4M,这个分割器无法处理,其直接将这条消息构成一个子列表返回。并没有再进行分割
     * 本地SplitterProducer代码不支持单条超过大小限制,若只有一条并超过的限制大小会抛数组越界异常;若多条中其中有一条超过限制,则会丢失一条消息
     */
    public class MessageListSplitter implements Iterator<List<Message>> {
        // 指定极限值为4M
        private final int SIZE_LIMIT = 4 * 1024 * 1024;
        // private final int SIZE_LIMIT = 1024;
        // 存放所有要发送的消息
        private final List<Message> messages;
        // 要进行批量发送消息的小集合起始索引
        private int currIndex;
    
        public MessageListSplitter(List<Message> messages) {
            this.messages = messages;
        }
    
        @Override
        public boolean hasNext() {
            // 判断当前开始遍历的消息索引要小于消息总数
            return currIndex < messages.size();
        }
    
        @Override
        public List<Message> next() {
            int nextIndex = currIndex;
            // 记录当前要发送的这一小批次消息列表的大小
            int totalSize = 0;
            for (; nextIndex < messages.size(); nextIndex++) {
                // 获取当前遍历的消息
                Message message = messages.get(nextIndex);
                // 统计当前遍历的message的大小
                int tmpSize = message.getTopic().length() + message.getBody().length;
                Map<String, String> properties = message.getProperties();
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                // Producer发送消息Message的结构:Topic + Body + Log(固定20字节) + Properties
                tmpSize = tmpSize + 20;
    
                // 判断当前消息本身是否大于4M
                if (tmpSize > SIZE_LIMIT) {
                    if (nextIndex - currIndex == 0) {
                        nextIndex++;
                    }
                    break;
                }
                // 当前消息的大小 + 之前统计要发送这一小批次消息列表的大小 》极限值4M
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    // 统计要发送这一小批次消息列表的大小
                    totalSize += tmpSize;
                }
    
            } // end-for
    
            // 获取当前messages列表的子集合[currIndex, nextIndex)
            List<Message> subList = messages.subList(currIndex, nextIndex);
            // 下次遍历的开始索引
            currIndex = nextIndex;
            return subList;
        }
    }
    
    // 定义批量消息生产者
    class BatchProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
            // 指定要发送的消息的最大大小,默认是4M
            // 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
            // maxMessageSize属性
            // producer.setMaxMessageSize(8 * 1024 * 1024);
            producer.start();
    
            // 定义要发送的消息集合
            List<Message> messages = new ArrayList<>();
            for (int i = 0; i < 5; i++) {
            	if(i==3)
                    messages.add(new Message("TopicD", "TagA", "OrderID000", "Hello,RocketMQ 可以试着让该条消息超过限制大小~".getBytes()));
                byte[] body = (LocalDateTime.now() + " Hi,RocketMQ " + i).getBytes();
                Message msg = new Message("TopicD", "someTag", body);
                messages.add(msg);
            }
    
            // 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
            MessageListSplitter splitter = new MessageListSplitter(messages);
            while (splitter.hasNext()) {
                try {
                    List<Message> listItem = splitter.next();
                    producer.send(listItem);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    5、事务消息发送

    5.1 事务消息介绍

    在一些对数据一致性有强需求的场景,可以用 Apache RocketMQ 事务消息来解决,从而保证上下游数据的一致性。
    在这里插入图片描述
    以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

    • 主分支订单系统状态更新:由未支付变更为支付成功。
    • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
    • 积分系统状态变更:变更用户积分,更新用户积分表。
    • 购物车系统状态变更:清空购物车,更新用户购物车记录。
      在这里插入图片描述
      使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
      在这里插入图片描述
      【可先看下图↓】事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性。

    整个事务消息的详细交互流程如下图所示:
    在这里插入图片描述

    5.2 事务消息步骤

    1. 生产者将半事务消息发送至 RocketMQ Broker。 RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack
    2. 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
    3. 生产者开始执行本地事务逻辑。
    4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
      • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
      • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
    5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
    6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置
      事务消息回查步骤如下:
    7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

    5.3 事务消息示例代码

    import com.formiss.common.Constant;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.io.UnsupportedEncodingException;
    import java.time.LocalDateTime;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            TransactionListener transactionListener = new TransactionListenerImpl();
            TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
    
            producer.setExecutorService(executorService);
            producer.setTransactionListener(transactionListener);
            producer.setNamesrvAddr(Constant.ROCKETMQ_SERVER_HOST_PORT);
            producer.start();
    
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg =
                            new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                                    (LocalDateTime.now()+" Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                    System.out.printf("%s%n", sendResult);
    
                    Thread.sleep(10);
                } catch (MQClientException | UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            for (int i = 0; i < 100000; i++) {
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    
        static class TransactionListenerImpl implements TransactionListener {
            private AtomicInteger transactionIndex = new AtomicInteger(0);
    
            private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                int value = transactionIndex.getAndIncrement();
                int status = value % 3;
                localTrans.put(msg.getTransactionId(), status);
                return LocalTransactionState.UNKNOW;
            }
    
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                Integer status = localTrans.get(msg.getTransactionId());
                if (null != status) {
                    switch (status) {
                        case 0:
                            return LocalTransactionState.UNKNOW;
                        case 1:
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case 2:
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                        default:
                            return LocalTransactionState.COMMIT_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    事务消息的发送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 进行发送,上述的例子中设置了事务回查的线程池,如果不设置也会默认生成一个,最重要的是需要实现 TransactionListener 接口,并传入 TransactionMQProducer

    executeLocalTransaction 是半事务消息发送成功后,执行本地事务的方法,具体执行完本地事务后,可以在该方法中返回以下三种状态:

    • LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息
    • LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
    • LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。

    checkLocalTransaction是由于二次确认消息没有收到,Broker端回查事务状态的方法。回查规则:本地事务执行完成后,若Broker端收到的本地事务返回状态为LocalTransactionState.UNKNOW,或生产者应用退出导致本地事务未提交任何状态。则Broker端会向消息生产者发起事务回查,第一次回查后仍未获取到事务状态,则之后每隔一段时间会再次回查。

    此外,需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制,回查时Broker端如果发现原始生产者已经崩溃崩溃,则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。

    下一篇介绍消费者使用













    RocketMQ消息丢失如何排查?

    死信队列

    git

  • 相关阅读:
    浅析数据迁移工具Sqoop
    火热报名中|墨菲安全发起首届 OSCS 软件供应链安全技术论坛
    「干货」从动态的角度分析DDR的时序结构
    云服务器利用Docker搭建sqli-labs靶场环境
    判断js中变量的类型的方法
    服务网格技术的发展与趋势
    Linux C语言基础 day10
    扫码挪车小程序源码专业版上线了
    GAT-图注意力模型
    sql分词查询,实现类似ES的效果
  • 原文地址:https://blog.csdn.net/JemeryShen/article/details/126765530