• 分布式消息队列RocketMQ的应用(代码实例1)


    一、普通消息
    1 .消息发送分类
    Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。

    同步发送消息:
    同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低。
    输入图片说明
    异步发送消息:
    异步发送消息是指,Producer发出消息后无需等待MQ返回响应,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
    输入图片说明
    单向发送消息
    单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的响应。该发送方式时MQ也不返回响应。该方式的消息发送效率最高,但消息可靠性较差。
    输入图片说明
    2.代码实例:
    创建一个Maven的Java工程rocketmq-test01
    导入rocketmq的client依赖如下:
    注意:导入的rocketmq依赖必须和使用的rocketmq版本一致。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    同步消息发送 生产者

    package cn.myrocketmq.general;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    /**
     * 同步消息发送  生产者
     */
    public class SyncProducer {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            //创建一个生产者,参数producerGroup01为生产者组名称
            DefaultMQProducer producer = new DefaultMQProducer("producerGroup01");
            //指定RocketMQ的nameServer地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //设置当发送失败时重试发送的次数为3次,默认为2次
            producer.setRetryTimesWhenSendFailed(3);
            //设置发送超时时限为5s,默认3s
            producer.setSendMsgTimeout(5000);
            //开启生产者
            producer.start();
            //生产并发送100条消息
            for (int i = 0; i < 100; i++){
                //创建要发送的消息体
                byte[] body = ("Hi," + i).getBytes();
                //创建消息,参数someTopic为主题topic,someTag为tag,body为需要发送的消息
                Message msg = new Message("someTopic", "someTag", body);
                // 为消息指定key
                msg.setKeys("key-" + i);
                // 发送消息
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            //关闭producer
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    **加粗样式**
    在这里插入图片描述

    在这里插入图片描述
    异步消息发送 生产者

    package cn.myrocketmq.general;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.concurrent.TimeUnit;
    
    public class AsyncProducer {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
            //创建一个生产者,参数producerGroupAsync为生产者组名称
            DefaultMQProducer producer = new DefaultMQProducer("producerGroupAsync");
            //指定RocketMQ的nameServer地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //指定异步发送失败后不进行重试发送
            producer.setRetryTimesWhenSendAsyncFailed(0);
            //指定新创建的Topic的Queue数量为2,默认为4
            producer.setDefaultTopicQueueNums(2);
            //开启生产者
            producer.start();
    
            for (int i = 0; i < 100; i++){
                //创建发送的消息
                byte[] body = ("Hi,"+i).getBytes();
                try{
                    Message message = new Message("myTopicA", "myTag", body);
                    //异步发送,指定回调
                    producer.send(message, new SendCallback() {
                        // 当producer接收到MQ发送来的响应后就会触发该回调方法的执行
                        //成功时触发
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println(sendResult);
                        }
    
                        //失败时触发
                        @Override
                        public void onException(Throwable throwable) {
                            throwable.printStackTrace();
                        }
                    });
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
    
            // sleep一会儿
            // 由于采用的是异步发送,所以若这里不sleep,消息还未发送就会将producer给关闭,会报错
            TimeUnit.SECONDS.sleep( 3 );
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    在这里插入图片描述
    单向消息发送 生产者

    package cn.myrocketmq.general;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class OnewayProducer {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
            //创建一个生产者,参数producerGroupOneway为生产者组名称
            DefaultMQProducer producer = new DefaultMQProducer("producerGroupOneway");
            //指定RocketMQ的nameServer地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //开启生产者
            producer.start();
            for (int i = 0 ; i < 10 ; i++) {
                //创建发送的消息
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("single", "someTag", body);
                // 单向发送(没有返回值)
                producer.sendOneway(msg);
            }
            //关闭producer
            producer.shutdown();
            System.out.println("producer shutdown");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在这里插入图片描述
    消费者 消费

    package cn.myrocketmq.general;
    
    import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.MessageQueueListener;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.List;
    
    /**
     * 消费者 消费
     */
    public class SomeConsumer {
        public static void main(String[] args) throws MQClientException {
    
            //定义一个pull消费者
    //        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroupA");
            //定义一个push消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupB");
            //指定RocketMQ的nameServer地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
            //指定从第一条消息开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //指定消费的主题topic和tag,消费主题名字为someTopic的主题和所有的tag
            consumer.subscribe("someTopic","*");
            //指定采用"广播模式"进行消费,默认为"集群模式"
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                // 一旦broker中有了其订阅的消息就会触发该方法的执行,其返回值为当前consumer消费的状态
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //逐条消费消息
                    for (MessageExt messageExt : list) {
                        System.out.print(messageExt);
                    }
                    //返回消费状态,消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 开启消费者消费
            consumer.start();
            System.out.println("Consumer Started");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    在这里插入图片描述
    二、顺序消息
    1.顺序消息:顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)

    默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

    2.为什么需要顺序消息
    例如,现在有TOPIC ORDER_STATUS(订单状态),其下有 4 个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

    根据以上订单状态,生产者从时序上可以生成如下几个消息:
    `订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败
    消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

    输入图片说明

    这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

    输入图片说明

    基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。

    3.有序性分类
    根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序全局有序

    全局有序:
    输入图片说明
    当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。

    在创建Topic时指定Queue的数量。有三种指定方式:
    1)在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
    2)在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
    3 )使用mqadmin命令手动创建Topic时指定Queue数量

    分区有序:
    输入图片说明
    如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。

    如何实现Queue的选择:
    在定义生产者Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论谁做选择key,都不能重复,都是唯一的。

    一般性的选择算法是,让选择key(或其hash值)与该主题Topic所包含的队列Queue的数量取模,其结果即为选择出的队列Queue的QueueId。

    取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被Consumer获取到。此时使用消息key作为选择key是比较好的做法。

    以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。

    分区有序 代码实现:

    package cn.myrocketmq.order;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.List;
    
    /**
     * 分区有序
     */
    public class OrderedProducer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            //创建一个生产者,参数producerGroupAsync为生产者组名称
            DefaultMQProducer producer = new DefaultMQProducer("orderedProducer");
            //指定RocketMQ的nameServer地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //若为全局有序,则需要设置队列的数量为1
            producer.setDefaultTopicQueueNums(1);
            //开启生产者
            producer.start();
    
            for (int i = 0; i < 100; i++){
                //为了演示简单,使用整型数作为orderId
                Integer orderId = i;
                byte[] body = ("Hi,"+i).getBytes();
                Message message = new Message("TopicA","TagA",body);
    
                //将orderId作为消息key
                message.setKeys(orderId.toString());
    
                //send()方法的第三个参数orderId值会传递给选择器的select()方法的第三参数temp
                //该send()为同步发送
                SendResult sendResult = producer.send(message, new MessageQueueSelector() {
    
                    //具体的选择算法在该方法中定义
                    @Override
                    public MessageQueue select(List<MessageQueue> mqList, Message message, Object temp) {
    
                        //以下是使用消息key作为选择的选择算法
                        String keys = message.getKeys();
                        Integer id = Integer.valueOf(keys);
    
                        //以下是使用temp作为选择key的选择算法
    //                    Integer id = (Integer)temp;
    
                        int index = id % mqList.size(); //mqList表示队列的数量,队列id index = 选择key除队列数量的余数
                        return mqList.get(index);
                    }
                },orderId);
    
                System.out.println(sendResult);
            }
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    三、延时消息
    **1.延时消息:**当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。

    采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。l例:电商交易中超时未支付关闭订单的场景, 12306 平台订票超时未支付取消订票的场景。

    在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在 30 分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。

    在 12306 平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在 45 分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。

    2 延时等级
    延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在RocketMQ服务端的MessageStoreConfig类中:
    在这里插入图片描述
    若指定的延时等级为 3 ,则表示延迟时长为10s,因为延迟等级是从 1 开始计数的,不是从0开始的。

    当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了 1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中
    在这里插入图片描述
    在这里插入图片描述
    3.延时消息实现原理
    输入图片说明
    Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相应的consumequeue。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;若有则需要经历一个复杂的过程,如下:

    1. 修改消息的Topic为SCHEDULE_TOPIC_XXXX
    2. 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId目录与consumequeue文件(如果没有这些目录与文件的话)。
    3. 延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1
      需要注意,在创建queueId目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕,而是用到哪个延迟等级创建哪个目录。
    4. 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到commitlog中的时间。投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息被发送到Broker时的时间戳。
    5. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中。

    2.1SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息排序方式:按照消息投递时间排序。一个Broker中同一等级的所有延时消息会被写入到consumequeue目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。

    投递延时消息:
    Broker内部有一个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息的投递时间,将延时消息投递到目标Topic中。不过,在投递之前会从commitlog中将原来写入的消息再次读出,并将其原来的延时等级设置为 0 ,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标Topic中。

    ScheuleMessageService:
    在Broker启动时,ScheuleMessageService会创建并启动一个定时器TImer,用于执行相应的定时任务。系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。若第一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);若第一条消息到期了,则将该消息投递到目标Topic,即消费该消息。

    将消息重新写入commitlog:
    延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索引条目,分发到相应Queue。这(延迟到期后的消息重新发送)其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类ScheuleMessageService。

    4代码举例
    定义DelayProducer类:

    package cn.myrocketmq.delay;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class DelayProducer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            for (int i = 0 ; i < 10 ; i++) {
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("TopicB", "someTag", body);
                // 指定消息延迟等级为 3 级,即延迟10s
                 msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                // 输出消息被发送的时间
                System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
                System.out.println(" ," + sendResult);
            }
            producer.shutdown();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    定义OtherConsumer类:

    package cn.myrocketmq.delay;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    
    public class DelayConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET );
            consumer.subscribe("TopicB", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        // 输出消息被消费的时间
                        System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
                        System.out.println(" ," + msg);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    四、事务消息
    工行用户A向建行用户B转账 1 万元。

    同步消息处理 方法:
    输入图片说明

    1. 工行系统发送一个给B增款 1 万元的同步消息M给Broker
    2. 消息被Broker成功接收后,向工行系统发送成功响应ACK
    3. 工行系统收到成功ACK后从用户A中扣款 1 万元
    4. 建行系统从Broker中获取到消息M
    5. 建行系统消费消息M,即向用户B中增加 1 万元

    这其中是有问题的:若第 3 步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了 1 万元。出现了数据不一致问题。

    2.解决思路
    让第 1 、 2 、 3 步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。

    在这里插入图片描述
    使用事务消息来处理该需求场景:

    1. 事务管理器TM向事务协调器TC发起指令,开启全局事务
    2. 工行系统发一个给B增款 1 万元的事务消息M给TC
    3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker。此时的建行系统是看不到Broker中的消息M的
    4. Broker会将预提交执行结果Report给TC。
    5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1 万元的操作
    6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态
    7. TC收到预扣款执行结果后,会将结果上报给TM。

    预扣款执行结果存在三种可能性: // 描述本地事务执行状态 public enum LocalTransactionState {
    COMMIT_MESSAGE, // 本地事务执行成功
    ROLLBACK_MESSAGE, // 本地事务执行失败
    UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
    }

    1. TM会根据上报结果向TC发出不同的确认指令:

    1.若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
    2.若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
    3… 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback

    1. TC在接收到指令后会向Broker与工行系统发出确认指令
    2. TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch
      Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认
    3. TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚

    以上方案就是为了确保消息投递与扣款操作能够在一个事务中,要成功都成功,有一个失败,则全部回滚。

    注意:这不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。

    3.基础
    分布式事务:
    对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。

    事务消息:
    RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。

    半事务消息:
    暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。

    本地事务状态:
    Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认令。

    // 描述本地事务执行状态 public enum LocalTransactionState {
    COMMIT_MESSAGE, // 本地事务执行成功
    ROLLBACK_MESSAGE, // 本地事务执行失败
    UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
    }

    消息回查:
    输入图片说明
    消息回查,即重新查询本地事务的执行状态。本例子就是重新到DB中查看预扣款操作是否执行成功。

    注意,消息回查不是重新执行回调操作。回调操作是进行预扣款操作,而消息回查则是查看预扣款操作执行的结果。

    引发消息回查的原因最常见的有两个:
    1)回调操作返回UNKNWON
    2)TC没有接收到TM的最终全局事务确认指令

    RocketMQ中的消息回查设置:
    关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

    1. transactionTimeout=20,指定TM在 20 秒内应将最终确认状态发送给TC,否则引发消息回查。默认为 60 秒
    2. transactionCheckMax=5,指定最多回查 5 次,超过后将丢弃消息并记录错误日志。默认 15 次。
    3. transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为 10 秒。默认为 60 秒。

    4.XA模式三剑客:
    XA协议:
    XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。

    XA模式中有三个重要组件:TC、TM、RM。

    TC:
    Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。

    RocketMQ中Broker充当着TC。

    TM:
    Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。

    RocketMQ中事务消息的Producer充当着TM。

    RM:
    Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

    RocketMQ中事务消息的Producer及Broker均是RM。

    5.XA模式架构:
    输入图片说明
    XA模式是一个典型的2PC,其执行原理如下:

    1. TM向TC发起指令,开启一个全局事务。
    2. 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。
    3. 各个RM在接收到指令后会在进行本地事务预执行。
    4. RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。
    5. TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指令。
    6. 若所有结果都是成功响应,则向TC发送Global Commit指令。
    7. 只要有结果是失败响应,则向TC发送Global Rollback指令。
    8. TC在接收到指令后再次向RM发送确认指令。

    事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。

    6.注意

    1. 事务消息不支持延时消息
    2. 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)

    7.代码举例
    定义工行事务监听器:

    package cn.myrocketmq.transaction;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class ICBCTransactionListener implements TransactionListener {
        // 回调操作方法
        // 消息预提交成功就会触发该方法的执行,用于完成本地事务
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println("预提交消息成功:" + msg);
            // 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败,
            // TAGC表示扣款结果不清楚,需要执行消息回查
            if (StringUtils.equals("TAGA", msg.getTags())) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (StringUtils.equals("TAGB", msg.getTags())) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            } else if (StringUtils.equals("TAGC", msg.getTags())) {
                return LocalTransactionState.UNKNOW;
            }
            return LocalTransactionState.UNKNOW;
        }
    
        // 消息回查方法
        // 引发消息回查的原因最常见的有两个:
        // 1)回调操作返回UNKNWON
        // 2)TC没有接收到TM的最终全局事务确认指令
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println("执行消息回查" + msg.getTags());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    定义事物消息生产者:

    package cn.myrocketmq.transaction;
    
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.concurrent.*;
    
    public class TransactionProducer {
        public static void main(String[] args) throws Exception {
            TransactionMQProducer producer =
                    new TransactionMQProducer("tpg");
            producer.setNamesrvAddr("127.0.0.1:9876");
    
            /**
             * 定义一个线程池
             * @param corePoolSize 线程池中核心线程数量
             * @param maximumPoolSize 线程池中最多线程数
             * @param keepAliveTime 这是一个时间。当线程池中线程数量大于核心线程数量是,多余空闲线程的存活时长
             * @param unit 时间单位
             * @param workQueue 临时存放任务的队列,其参数就是队列的长度
             * @param threadFactory 线程工厂
             */
            ExecutorService executorService = new ThreadPoolExecutor( 2 , 5 ,100 ,
                    TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>( 2000 ), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
            // 为生产者指定一个线程池
            producer.setExecutorService(executorService);
            // 为生产者添加事务监听器
            producer.setTransactionListener(new ICBCTransactionListener());
            producer.start();
            String[] tags = {"TAGA","TAGB","TAGC"};
            for (int i = 0 ; i < 3 ; i++) {
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("TTopic", tags[i], body);
                // 发送事务消息
                // 第二个参数用于指定在执行本地事务时要使用的业务参数
                SendResult sendResult =producer.sendMessageInTransaction(msg,null);
                System.out.println("发送结果为:" +sendResult.getSendStatus());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    定义消费者:直接使用普通消息的SomeConsumer作为消费者即可。

    package cn.myrocketmq.transaction;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class SomeConsumer {
        public static void main(String[] args) throws MQClientException {
            // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
            // 定义一个push消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
            // 指定nameServer
            consumer.setNamesrvAddr("127.0.0.1:9876");
            // 指定从第一条消息开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 指定消费topic与tag
            consumer.subscribe("TTopic", "*");
            // 指定采用“广播模式”进行消费,默认为“集群模式”
            // consumer.setMessageModel(MessageModel.BROADCASTING);
    
            // 注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                // 一旦broker中有了其订阅的消息就会触发该方法的执行,
                // 其返回值为当前consumer消费的状态
                @Override
                public ConsumeConcurrentlyStatus
                consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    // 逐条消费消息
                    for (MessageExt msg : msgs) {
                        System.out.println(msg);
                    }
                    // 返回消费状态:消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 开启消费者消费
            consumer.start();
            System.out.println("Consumer Started");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
  • 相关阅读:
    QT中文乱码解决方案与乱码的原因
    C++之智能指针
    25.leetcode---只出现一次的数字(Java版)
    Flink实时仓库-DWD层(流量域)模板代码
    python-爬虫(可直接使用)
    java笔试题含答案总结四
    Python 数据分析入门教程:Numpy、Pandas、Matplotlib和Scikit-Learn详解
    Python面试高频问题:浅拷贝和深拷贝
    【机器学习笔记13】softmax多分类模型【上篇】完整流程与详细公式推导
    YOLO目标检测——打电话数据集【含对应voc、coco和yolo三种格式标签】
  • 原文地址:https://blog.csdn.net/weixin_45703155/article/details/126554554