• RocketMQ 发送顺序消息


    顺序消息应用场景

    在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 RocketMQ 的顺序消息可以有效保证数据传输的顺序性。比如:同一个用户的操作,一定是先生成订单、再进行支付、扣减库存、生成物流信息等。

    消息组(MessageGroup)

    RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

    基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

    顺序性

    RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

    生产的顺序性

    生产的顺序性就是必须保证每个消息在生成时是顺序的,且顺序的发送到 MQ 服务器。要保证生产的顺序,需要满足以下条件

    • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
    • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

    MQ 存储的顺序性

    MQ 按顺序收到消息后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

    • 相同消息组的消息按照先后顺序被存储在同一个队列。
    • 不同消息组的消息可以混合在同一个队列中,且不保证连续。
      在这里插入图片描述

    消费的顺序性

    消费的顺序性,是消费者在消费的时候要严格按照 MQ 中的存储顺序来执行。

    • 消费者保证执行的顺序
      • PushConsumer 类型消费者,RocketMQ 会保证消息按照存储顺序一条一条投递给消费者
      • SimpleConsumer 类型消费者,需要业务实现方自行保证消费的顺序。消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。
    • 重试策略

    Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

    所以对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

    rocketmq-client-java 示例(gRPC 协议)

    1. 创建 FIFO 主题

    本示例,我们模拟多个用户的一系列操作,并多个消息组区分不同的顺序消息。要求每个用户的消息按顺序执行,不同用户的消息之间不做必要关联。

    $> ./mqadmin updatetopic -n localhost:9876 -c DefaultCluster -t MY_FIFO_TOPIC -o true -a +message.type=FIFO
    
    • 1

    注意:这里比普通消息和顺序消息多了一个 -o 参数,表示 order 的意思。

    生产者代码

    import com.yyoo.mq.rocket.MyMQProperties;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.message.Message;
    import org.apache.rocketmq.client.apis.producer.Producer;
    import org.apache.rocketmq.client.apis.producer.SendReceipt;
    
    import java.io.IOException;
    
    public class FifoProducerDemo {
    
        public static void main(String[] args) throws ClientException, IOException {
    
            // 用于提供:生产者、消费者、消息对应的构建类 Builder
            ClientServiceProvider provider = ClientServiceProvider.loadService();
    
            // 构建配置类(包含端点位置、认证以及连接超时等的配置)
            ClientConfiguration configuration = ClientConfiguration.newBuilder()
                    // endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
                    .setEndpoints(MyMQProperties.ENDPOINTS)
                    .build();
    
            // 构建生产者
            Producer producer = provider.newProducerBuilder()
                    // Topics 列表:生产者和主题是多对多的关系,同一个生产者可以向多个主题发送消息
                    .setTopics("MY_FIFO_TOPIC")
                    .setClientConfiguration(configuration)
                    // 构建生产者,此方法会抛出 ClientException 异常
                    .build();
    
            for(int i = 1; i <= 10;i++) {
    
                String msgGroup = "user" ; // 表示有两个用户
                String keys = "key_" + i;
                // 构建消息类
                Message message = provider.newMessageBuilder()
                        // 设置消息发送到的主题
                        .setTopic("MY_FIFO_TOPIC")
                        // 设置消息索引键,可根据关键字精确查找某条消息。其一般为业务上的唯一值。如:订单id
                        .setKeys(keys)
                        // 设置消息Tag,表示为创建订单
                        .setTag("ORDER_CREATE")
                        // 设置消息组
                        .setMessageGroup(msgGroup)
                        // 消息体,单条消息的传输负载不宜过大。所以此处的字节大小最好有个限制
                        .setBody(("{\"success\":true,\"msg\":\""+ msgGroup + ":" + keys +"\"}").getBytes())
                        .build();
    
                // 发送消息(此处最好进行异常处理,对消息的状态进行一个记录)
                try {
                    SendReceipt sendReceipt = producer.send(message);
                    System.out.println(keys);
                    System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
                } catch (ClientException e) {
                    System.out.println("Failed to send message");
                }
            }
    
    
            // 发送完,关闭生产者
            // producer.close();
    
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    发送顺序消息时,消息一定要设置消息组,同一消息组的消息将会按服务器接收的顺序进行消费。

    注:发送顺序消息前需要设置 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 为 true。特别是 orderMessageEnable 默认为 false。建议在启动 namesrv 的时候使用自定义配置,在自定义配置中配置选项为true即可。

    # namesrv.conf 为我们自定义的配置文件
    nohup sh bin/mqnamesrv -c conf/namesrv.conf &
    
    • 1
    • 2

    消费者代码

    import com.yyoo.mq.rocket.MyMQProperties;
    import org.apache.rocketmq.client.apis.ClientConfiguration;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.apis.ClientServiceProvider;
    import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
    import org.apache.rocketmq.client.apis.consumer.FilterExpression;
    import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
    import org.apache.rocketmq.client.apis.consumer.PushConsumer;
    
    import java.nio.ByteBuffer;
    import java.util.Collections;
    
    public class FifoConsumerDemo {
    
        public static void main(String[] args) throws ClientException {
    
    
    
            // 用于提供:生产者、消费者、消息对应的构建类 Builder
            ClientServiceProvider provider = ClientServiceProvider.loadService();
    
            // 构建配置类(包含端点位置、认证以及连接超时等的配置)
            ClientConfiguration configuration = ClientConfiguration.newBuilder()
                    // endpoints 即为 proxy 的地址,多个用分号隔开。如:xxx:8081;xxx:8081
                    .setEndpoints(MyMQProperties.ENDPOINTS)
                    .build();
    
    
            // 设置过滤条件(这里为使用 tag 进行过滤)
            String tag = "ORDER_CREATE";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    
            // 构建消费者
            PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                    .setClientConfiguration(configuration)
                    // 设置消费者分组
                    .setConsumerGroup("MY_FIFO_GROUP")
                    // 设置主题与消费者之间的订阅关系
                    .setSubscriptionExpressions(Collections.singletonMap("MY_FIFO_TOPIC", filterExpression))
                    .setMessageListener(messageView -> {
                        System.out.println(messageView);
                        ByteBuffer rs = messageView.getBody();
                        byte[] rsByte = new byte[rs.limit()];
                        rs.get(rsByte);
    
                        System.out.println("Message body:" + new String(rsByte));
                        // 处理消息并返回消费结果。
                        System.out.println("Consume message successfully, messageId=" + messageView.getMessageId());
    
                        return ConsumeResult.SUCCESS;
                    }).build();
    
            System.out.println(pushConsumer);
    
            // 如果不需要再使用 PushConsumer,可关闭该实例。
            // pushConsumer.close();
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    注:多验证几次后会发现,消费执行并没有严格的按照顺序执行,查找源码后发现,PushConsumer 的 builder 在构建 PushConsumer 的时候有个 Settings 对象,该对象的主题配置信息是从服务器获取,获取后有一个 isFifo 参数,此参数对应是否顺序消费,但是目前此值一直为false。此问题为消费者分组的问题,Remoting 协议方式无此问题,因为两种 Client 的实现是不一样的。

    解决办法

    在 MQ bin目录执行如下命令即可,具体的相关说明,我们将在后续章节中(《RocketMQ 消费者分类与分组》)详细说明。

    $> ./mqadmin updateSubGroup -n 127.0.0.1:9876 -g MY_FIFO_GROUP -o true -c DefaultCluster
    
    • 1

    解决后执行结果

    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000001, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543268, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_2], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000000, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543178, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_1], messageGroup=user2, deliveryTimestamp=null, properties={__SHARDINGKEY=user2}}
    Message body:{"success":true,"msg":"user1:key_2"}
    Message body:{"success":true,"msg":"user2:key_1"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000000
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000001
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000002, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543279, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_3], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    Message body:{"success":true,"msg":"user1:key_3"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000002
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000004, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543294, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_5], messageGroup=user2, deliveryTimestamp=null, properties={__SHARDINGKEY=user2}}
    Message body:{"success":true,"msg":"user2:key_5"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000004
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000003, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543288, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_4], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000005, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543301, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_6], messageGroup=user2, deliveryTimestamp=null, properties={__SHARDINGKEY=user2}}
    Message body:{"success":true,"msg":"user1:key_4"}
    Message body:{"success":true,"msg":"user2:key_6"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000005
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000003
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000006, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543313, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_7], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    Message body:{"success":true,"msg":"user1:key_7"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000006
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000007, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543320, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_8], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    Message body:{"success":true,"msg":"user1:key_8"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000007
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000008, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543331, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_9], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    Message body:{"success":true,"msg":"user1:key_9"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000008
    MessageViewImpl{messageId=010456E5ECA6F32F6C051313D700000009, topic=MY_FIFO_TOPIC, bornHost=DESKTOP-S1DMOAD, bornTimestamp=1694595543340, endpoints=ipv4:192.168.1.1:8081, deliveryAttempt=1, tag=ORDER_CREATE, keys=[key_10], messageGroup=user1, deliveryTimestamp=null, properties={__SHARDINGKEY=user1}}
    Message body:{"success":true,"msg":"user1:key_10"}
    Consume message successfully, messageId=010456E5ECA6F32F6C051313D700000009
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    注意:user1 和 user2 的操作顺序是一致的。因为我们不需要保证 user1 的操作必须在 user2 之前,只需要保证他们各自的操作为顺序的就可以。

    rocketmq-client 示例(Remoting 协议)

    生产者

    import com.yyoo.mq.rocket.MyMQProperties;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.shaded.commons.lang3.RandomUtils;
    
    import java.util.List;
    
    public class FifoProducerDemo {
    
        /**
         * 生产者分组
         */
        private static final String PRODUCER_GROUP = "FIFO_PRODUCER_GROUP";
    
        /**
         * 主题
         */
        private static final String TOPIC = "MY_FIFO_TOPIC";
    
        public static void main(String[] args) throws MQClientException {
    
    
            /*
             * 创建生产者,并使用生产者分组初始化
             */
            DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
    
            /*
             * NamesrvAddr 的地址,多个用分号隔开。如:xxx:9876;xxx:9876
             */
            producer.setNamesrvAddr(MyMQProperties.NAMESRV_ADDR);
    
            /*
             * 发送消息超时时间,默认即为 3000
             */
            producer.setSendMsgTimeout(3000);
    
            /*
             * 启动生产者,此方法抛出 MQClientException
             */
            producer.start();
    
    
            /*
             * 发送消息
             */
            for (int i = 1; i <= 10; i++) {
    
                try {
                    Message msg = new Message();
                    msg.setTopic(TOPIC);
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    msg.setKeys("messageKey");
                    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                    msg.setTags("ORDER_CREATE");
                    // 设置消息体
                    msg.setBody(("顺序消息" + i).getBytes());
    
                    // 这里 userId 取值为 1,2,3(模拟有3个用户的顺序操作)
                    int userId = RandomUtils.nextInt(1,4);
    
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // 这个arg就是对应userId
                            Integer userId = (Integer)arg;
                            // 我们按队列的数量,对每个user进行分组
                            int index = userId % mqs.size();
                            // 同一个user的消息放入同一个队列
                            return mqs.get(index);
                        }
                    },userId);
    
                    System.out.printf("%s%n", sendResult);
    
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("消息发送失败!i = " + i);
                }
    
            }
    
    
            // 如果生产者不再使用,则调用关闭
            // 异步发送消息注意:异步发送消息,建议此处不关闭或者在sleep一段时间后再关闭
            // 因为异步 SendCallback 执行的时候,shutdow可能已经执行了,生产者被关闭了
            // producer.shutdown();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    MessageQueueSelector 详解

    public interface MessageQueueSelector {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }
    
    • 1
    • 2
    • 3

    mqs:队列列表,我们前面说了,默认 8 个队列
    msg:当前消息
    arg:为我们 send 方法传的第三个参数,示例中就是 userId

    MessageQueueSelector 意为队列选择器,Remoting 协议客户端中没有 消息组的概念,所以需要我们手动的为消息进行分组(将需要严格顺序的消息放在同一个队列),这个接口就是完成此任务的,而且分组的逻辑需要我们自己实现。实际应用中我们可以使用 用户id、订单id等来为顺序消息分组。

    消费者

    import com.yyoo.mq.rocket.MyMQProperties;
    import org.apache.rocketmq.client.apis.ClientException;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    
    public class FifoConsumerDemo {
    
        public static void main(String[] args) throws MQClientException {
    
            // 初始化 consumer
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("REMOTING_FIFO_CONSUMER_GROUP");
    
            // 设置 namesrv 地址
            consumer.setNamesrvAddr(MyMQProperties.NAMESRV_ADDR);
            // 设置从开头开始读取消息
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            // 设置订阅的主题,以及过滤tag
            consumer.subscribe("MY_FIFO_TOPIC", "ORDER_CREATE || TagA || TagD || messageTag");
    
            consumer.registerMessageListener(new MessageListenerOrderly() {
    
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    
                    for(MessageExt msg : msgs){
                        System.out.println(new String(msg.getBody()));
                    }
    
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Consumer Started.%n");
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    注意:顺序消息消费者的监听类型为 MessageListenerOrderly ,注意与我们前面的示例 MessageListenerConcurrently 进行区分。

  • 相关阅读:
    HarmonyOS原子化服务开发-应用签名保存好的重要性
    python写的 自定义连点器 的开发全过程(抢票、信息轰炸等、游戏连招等)——思路及解析【内附完整源码】
    CCF领航计划第二期:英文学术论文日常积累+学习论文方法
    【MCAL_CANDriver】-1.3-FullCAN和BasicCAN的差异及配置使用
    GEE16: 区域日均降水量计算
    Maven依赖冲突
    奇妙的跨域错误-Access-Control-Request-Private-Network
    调用静态方法
    【Rust】使用HashMap解决官方文档中的闭包限制
    Flink集群运行模式
  • 原文地址:https://blog.csdn.net/forlinkext/article/details/132672445