• RocketMq最强总结 带你rocket从入门到入土为安


    docker下安装单例RocketMq和集群

    暂时之后再进行更新

    rocket 各个角色分工

    1. Producer:消息的发送者;举例:发信者

    2. Consumer:消息接收者;举例:收信者

    3. Broker:暂存和传输消息;举例:邮局

    4. NameServer:管理Broker;举例:各个邮局的管理机构

    5. Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息
      类似于 rabbitmq 的 路由键 ( topic交换机 )

    6. Message Queue:相当于是Topic的分区;用于并行发送和接收消息

    在这里插入图片描述

    消息发送和接收

    主题包含多个标签,例如主题春节,标签可以是放鞭炮,团年饭
    同一个消费组,给不同的消费者设置不同的tag时,后启动的消费者会覆盖先启动的消费者设置的tag。
    
    生产消息
    1.创建消息生产者producer,并制定生产者组名
    2.指定Nameserver地址
    3.启动producer
    4.创建消息对象,指定主题Topic、Tag和消息体
    5.发送消息 (这里可以使用发送异步消息 )
    6.关闭生产者producer
    
    
    消费消息 
    1.创建消费者Consumer,制定消费者组名
    2.指定Nameserver地址
    3.订阅主题Topic和Tag
    4.设置回调函数,处理消息
    5.启动消费者consumer
    ps: 这里的 消费者和rabbitmq 有很大的不同的, rabbitmq 是从队列中取出消费消费 , 这里是从订阅主题消费消息 ,和redis 消费消息的方式很像
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    下面的 案例 分组可以不同 “g1”组的消费者(DefaultMQPushConsumer) 可以接收 “g2”组的生产者发送的 消息 , tag和topic 必须相同才能接收消息 指定唯一消费路径

    发送同步消息

    //发送同步消息
    public class SyncProducer {
        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("g1");
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
            // 启动Producer实例
            producer.start();
    
            for (int i = 0; i < 10; i++) {
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                // 发送消息到一个Broker
                SendResult sendResult = producer.send(msg);
                // 通过sendResult返回消息是否成功送达
                System.out.println("%s%n"+ sendResult);
                System.out.println("发送状态:"+sendResult.getSendStatus());
                System.out.println("发送消息的id:"+sendResult.getMsgId());
                System.out.println("消息队列的id: "+sendResult.getMessageQueue().getQueueId());
    
                Thread.sleep(1000L);
            }
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    发送异步消息

    这里要注意 ,需要防止主线程先关闭 导致Topic 被删除,消息找不到topic 发送失败咯~~

        public static void main(String[] args) throws Exception {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("g2");
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
            // 启动Producer实例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
            for (int i = 0; i < 10; i++) {
                final int index = i;
                // 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            }
    
            // 防止主线程先关闭!!!
            Thread.sleep(60000);
            // 如果不再发送消息,关闭Producer实例。
            producer.shutdown();
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    在这里插入图片描述

    发送单项消息

    单向消息 是不需要关心结果的消息 ,例如日志收集

    
    public class OnewayProducer {
    	public static void main(String[] args) throws Exception{
            // 实例化生产者
    	    DefaultMQProducer producer = new DefaultMQProducer("g3");
    
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
    
            producer.start();
    
            for (int i = 0; i < 10; i++) {
                Message message = new Message("topic", "hi~~ one way message".getBytes(StandardCharsets.UTF_8));
                producer.sendOneway(message);
            }
    
            producer.shutdown();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    轮询接收消息

    这里的

         --------   消费者1 
        /*
        * 1.创建消费者Consumer,制定消费者组名
           2.指定Nameserver地址
           3.订阅主题Topic和Tag
           4.设置回调函数,处理消息
           5.启动消费者consumer
        * */
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
            // 设置NameServer的地址
            consumer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            consumer.setConsumeTimeout(1000000);
    
            // 消费 tag
            consumer.subscribe("topic","tag");
    
            // 设置回调函数 用来接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println(consumeConcurrentlyContext);
                    //返回消费 成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
           consumer.start();  
        }
    
    
    ------------  消费者2
    
     public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
            // 设置NameServer的地址
            consumer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            consumer.setConsumeTimeout(1000000);
    
            // 消费 tag
            consumer.subscribe("TopicTest","TagA");
    
            // 设置回调函数 用来接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt messageExt : list) {
                        log.info("我是消费者2 ,我消费了消息:{}",messageExt.getMsgId());
                        byte[] body = messageExt.getBody();
                        System.out.println(new String(body));
                    }
    
    
                    System.out.println(consumeConcurrentlyContext);
    
                    //返回消费 成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
           consumer.start();
           log.info(" ");
    
    
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    topic 和tag 相同 这里默认是轮询的接收消息 消费者1 接收1 3 5 ,消费者2 就接收2 4 6 …
    在这里插入图片描述
    在这里插入图片描述

    广播模式接收消息

    
    @Slf4j
    public class Consumer02 {
    
        /*
        * 1.创建消费者Consumer,制定消费者组名
           2.指定Nameserver地址
           3.订阅主题Topic和Tag
           4.设置回调函数,处理消息
           5.启动消费者consumer
        * */
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
            // 设置NameServer的地址
            consumer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            consumer.setConsumeTimeout(1000000);
    
    
            //设置广播模式
            consumer.setMessageModel(MessageModel.BROADCASTING);
    
            // 消费 tag
            consumer.subscribe("TopicTest","TagA");
    
            // 设置回调函数 用来接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt messageExt : list) {
                        log.info("我是消费者2 ,我消费了消息:{}",messageExt.getMsgId());
                        byte[] body = messageExt.getBody();
                        System.out.println(new String(body));
                    }
    
    
                    System.out.println(consumeConcurrentlyContext);
    
                    //返回消费 成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
           consumer.start();
           log.info(" ");
    
    
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    这里十条消息 有两个消费者 消费者1 (默认消息接收方式(默认轮询)). 消费者2(广播接收消息模式)
    消费者1:接收到了 8 4 6 5 9 0 7
    消费者2 :接收了 1–10 全部消息

    继续实验
    这里有3个消费者
    消费者1 消费者3 是默认轮询模式
    消费者2 是广播模式
    消费者1 3 平分了消息
    消费者2 接收了所有的消息
    在这里插入图片描述
    这里出现了个小bug,,,, 消费者1 和消费者2 加起来一共才消费了8 条消息 , 消费者3接收了10条消息
    第二次实验 消息9 不见了…

    实验3 ,关闭广播模式 的消费者 ,两个消费者不仅能正常监听到消息,还能把之前未监听到的消息 重新监听到…

    总结:当 轮询消费者和 广播消费者 混一起监听同一个topic 下的 tag的时候 ,广播模式的消费者可以监听全部的消息 ,
    所有 轮询模式消费者 加起来还是会有几条消息没有监听到 ,,,所以 rabbitmq 中的手动ack加消息回退 是多么的重要!!!(题外话)

    局部消息顺序

    在这里插入图片描述

    原始 , 消息的发送者会把消息 发送到各个不同的队列 ,消费者也是 同时监听所有队列
    局部消息顺序 ,将 具有顺序性的消息发送给一个队列 , 消费者专门开辟一条线程监听那个队列 ,这样就能实现消息的顺序性
    消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

    顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

    下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

    // 消息的生产者
    // 按照订单号 的不同 订单消息进入不同的 消息队列 (类比一下 哈希取余 )
     public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException,
                InterruptedException {
            // 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("g2");
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
            // 启动Producrde
            producer.start();
    
            //发送消息
            List<OrderStep> orderSteps = OrderStep.buildOrders();
    
            //构建消息集合  根据订单id 有选择的去发
            for (int i=0;i<orderSteps.size();i++) {
                byte[] bytes = JSON.toJSONString(orderSteps.get(i)).getBytes(StandardCharsets.UTF_8);
                Message message = new Message("orderTopic","order","i:"+i,bytes);
    
                //new MessageQueueSelector() :消息队列的选择器 ,根据消息的业务标识 实现方法,这里只要订单id 一样,进入的队列就是一样的
                // orderId: 业务标识
                producer.send(message, new MessageQueueSelector() {
                    /*
                    * list:队列集合
                    * message:消息对象
                    * arg :业务标识参数 === orderSteps.get(i).getOrderId()
                    * */
                  // 选择队列的方法
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
                        long orderId = (long) arg; // 订单id
                        long index = orderId % list.size();// 消息被发送的队列
                        return list.get((int) index);
                    }
                },orderSteps.get(i).getOrderId());
            }
                  producer.shutdown();
        }
    
    // 消费者 
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
            // 设置NameServer的地址
            consumer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            consumer.setConsumeTimeout(1000000);
    
            // 消费 tag
            consumer.subscribe("orderTopic","*");
    
          //注册监听器
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(" ThreadName:"+Thread.currentThread().getName()+" QueueID:"+msg.getQueueId()
                                +",获得的消息是:"+new String(msg.getBody()));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
    
            consumer.start();
            System.out.println("消费者启动");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    打印的结果 (我 区分了一下)

     ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"创建","orderId":1039}
     ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"付款","orderId":1039}
     ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"推送","orderId":1039}
     ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"完成","orderId":1039}
    
    ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"创建","orderId":7235}
     ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"付款","orderId":7235}
     ThreadName:ConsumeMessageThread_1 QueueID:3,获得的消息是:{"desc":"完成","orderId":7235}
    
    ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"创建","orderId":1065}
     ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"付款","orderId":1065}
     ThreadName:ConsumeMessageThread_2 QueueID:1,获得的消息是:{"desc":"完成","orderId":1065}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    可以很清楚的看到 线程id 和消息队列id 是一一对应的… 不会出现 一个消息队列里的消息同时被两条线程消费 ,这样打印出来的消息就是顺序的了

    延迟消息

    message.setDelayTimeLevel(2); 消息延迟两秒
    消息会在消息队列延迟2秒
    这里可以做成定时任务 ,
    实验了一下 ,,这里不能做消息队列
    我 模拟了一百条消息 , 每条消息都设置了随机 的延迟时间,并没有发现 延迟时间短的先出来 ,接收消息都是随机的,乱序的
    总之一句话 和rabbitmq 差别很大…
    在这里插入图片描述

    批量消息发送

    String topic = "BatchTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
    try {
       producer.send(messages);
    } catch (Exception e) {
       e.printStackTrace();
       //处理error
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
    解决方案

    public class ListSplitter implements Iterator<List<Message>> {
       private final int SIZE_LIMIT = 1024 * 1024 * 4;
       private final List<Message> messages;
       private int currIndex;
       public ListSplitter(List<Message> messages) {
               this.messages = messages;
       }
        @Override 
        public boolean hasNext() {
           return currIndex < messages.size();
       }
       	@Override 
        public List<Message> next() {
           int nextIndex = currIndex;
           int totalSize = 0;
           for (; nextIndex < messages.size(); nextIndex++) {
               Message message = messages.get(nextIndex);
               int tmpSize = message.getTopic().length() + message.getBody().length;
               Map<String, String> properties = message.getProperties();
               for (Map.Entry<String, String> entry : properties.entrySet()) {
                   tmpSize += entry.getKey().length() + entry.getValue().length();
               }
               tmpSize = tmpSize + 20; // 增加日志的开销20字节
               if (tmpSize > SIZE_LIMIT) {
                   //单个消息超过了最大的限制
                   //忽略,否则会阻塞分裂的进程
                   if (nextIndex - currIndex == 0) {
                      //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                      nextIndex++;
                   }
                   break;
               }
               if (tmpSize + totalSize > SIZE_LIMIT) {
                   break;
               } else {
                   totalSize += tmpSize;
               }
    
           }
           List<Message> subList = messages.subList(currIndex, nextIndex);
           currIndex = nextIndex;
           return subList;
       }
    }
    //把大的消息分裂成若干个小的消息
    ListSplitter splitter = new ListSplitter(messages);
    while (splitter.hasNext()) {
      try {
          List<Message>  listItem = splitter.next();
          producer.send(listItem);
      } catch (Exception e) {
          e.printStackTrace();
          //处理error
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    消息过滤

    利用tag实施过滤

    // 消息发送者
    	public static void main(String[] args) throws Exception{
            // 实例化生产者
    	    DefaultMQProducer producer = new DefaultMQProducer("g3");
    
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
    
            producer.start();
            String topic = "BatchTest";
    
            List<Message> messages = new ArrayList<>();
            messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "TagB", "OrderID002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "TagC", "OrderID003", "Hello world 2".getBytes()));
             producer.send(messages);
    
            producer.shutdown();
    
        }
    
    
    
    
    
    // 消息接收者 
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
            // 设置NameServer的地址
            consumer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            consumer.setConsumeTimeout(1000000);
    
            // 消费 tag  TagA||TagB :只消费 这两种tag , * 消费BatchTest下全部的tag
            consumer.subscribe("BatchTest","TagA||TagB");
    
            // 设置回调函数 用来接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt messageExt : list) {
                        log.info("我消费了消息:{},消息延迟了:{},消息队列是:{},消息TAG是:{}",messageExt.getMsgId(),
                                System.currentTimeMillis()-messageExt.getStoreTimestamp(),messageExt.getQueueId()
                        ,messageExt.getTags().toString()
                        );
    
                        byte[] body = messageExt.getBody();
                        System.out.println(new String(body));
                    }
    
    
                    System.out.println(consumeConcurrentlyContext);
    
                    //返回消费 成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
           consumer.start();
           log.info(" ");
        }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    利用SQL 进行过滤

    RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

    • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
    • 字符比较,比如:=,<>,IN;
    • IS NULL 或者 IS NOT NULL;
    • 逻辑符号 AND,OR,NOT;

    常量支持类型为:

    • 数值,比如:123,3.1415;
    • 字符,比如:‘abc’,必须用单引号包裹起来;
    • NULL,特殊的常量
    • 布尔值,TRUEFALSE

    只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

    Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
    需要更改 enablePropertyFilter 属性
    sql 过滤失效
    >RocketMQ通过sql过滤接收不到消息,的话 考虑下 你的 消费者和 生产者 是不是在同一个分组之下!!
    在这里插入图片描述
    这样就好了
    在这里插入图片描述
    消费者 和 生产者

    public static void main(String[] args) throws Exception{
            // 实例化生产者
    	    DefaultMQProducer producer = new DefaultMQProducer("g1");
    
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
    
            producer.start();
    
            for (int i = 0; i < 20; i++) {
                Random random = new Random();
                Message message = new Message("topic1", "TagA","hi~~ one way message".getBytes(StandardCharsets.UTF_8));
    
                // putUserProperty : 便于 消费者使用sql 语句进行 过滤 ~~
                message.putUserProperty("i", String.valueOf(i));
                producer.send(message);
            }
    
            producer.shutdown();
    
        }
    
    
    
     public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g1");
            // 设置NameServer的地址
            consumer.setNamesrvAddr("116.205.161.47:9876");
    
            //增大超时时间,防止超时
            consumer.setConsumeTimeout(1000000);
    
            //
            MessageSelector messageSelector = MessageSelector.bySql("i > 5");
            consumer.subscribe("topic1",messageSelector);
    
            // 设置回调函数 用来接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt messageExt : list) {
                        log.info("我消费了消息:{},消息队列是:{},消息TAG是:{},消息的properties是:{}",messageExt.getMsgId(),
                                messageExt.getTags().toString(),messageExt.getProperties().toString()
                        );
    
                        byte[] body = messageExt.getBody();
                        System.out.println(new String(body));
                    }
    
    
                    System.out.println(consumeConcurrentlyContext);
    
                    //返回消费 成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
           consumer.start();
           log.info(" ");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    执行结果

    6:44:52.956 [main] INFO com.example.rocketmq.Consumer -  
    16:50:06.561 [ConsumeMessageThread_1] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24DF50006,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=32, i=6, CONSUME_START_TIME=1661158204994, UNIQ_KEY=C0A82BF2316818B4AAC26FC24DF50006, WAIT=true, TAGS=TagA},消息的properties是:{}
    16:50:06.561 [ConsumeMessageThread_11] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC250560010,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=35, i=16, CONSUME_START_TIME=1661158205592, UNIQ_KEY=C0A82BF2316818B4AAC26FC250560010, WAIT=true, TAGS=TagA},消息的properties是:{}
    16:50:06.561 [ConsumeMessageThread_9] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24FEC000E,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=34, i=14, CONSUME_START_TIME=1661158205468, UNIQ_KEY=C0A82BF2316818B4AAC26FC24FEC000E, WAIT=true, TAGS=TagA},消息的properties是:{}
    16:50:06.562 [ConsumeMessageThread_8] INFO com.example.rocketmq.Consumer - 我消费了消息:C0A82BF2316818B4AAC26FC24FBB000D,消息队列是:TagA,消息TAG是:{MIN_OFFSET=0, MAX_OFFSET=34, i=13, CONSUME_START_TIME=1661158205421, UNIQ_KEY=C0A82BF2316818B4AAC26FC24FBB000D, WAIT=true, TAGS=TagA},消息的properties是:{}
    hi~~ one way message
    org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext@17042905
    hi~~ one way message
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    事务消息

    (1) 发送消息(half消息)。
    (2) 服务端响应消息写入结果。
    (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
    (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

    在这里插入图片描述

    当消息发送之后如果没有被 提交 消息是不能 被 消费者消费的!!
    executeLocalTransaction() 给消息设置是否能提交
    checkLocalTransaction() 回查
    在这里插入图片描述

    * TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
    * TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
    * TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
    
    • 1
    • 2
    • 3

    生产者

    import org.apache.rocketmq.client.producer.*;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.util.StringUtils;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Random;
    
    public class OnewayProducer {
    	public static void main(String[] args) throws Exception{
            // 实例化生产者
            TransactionMQProducer producer = new TransactionMQProducer("g1");
    
            // 设置NameServer的地址
            producer.setNamesrvAddr("116.205.161.47:9876");
    
    
            //设置监听者
            producer.setTransactionListener(new TransactionListener() {
    
                // 当消息被 mq接收到了 会触发该方法 ,对消息进行处理  COMMIT_MESSAGE(提交) ROLLBACK_MESSAGE(丢丢)UNKNOW(被checkLocalTransaction()处理)
                //该方法中执行本地事务 ROLLBACK_MESSAGE:消息会被丢掉
                @Override
                public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                   if (message.getTags().equals("A")){
                       return LocalTransactionState.COMMIT_MESSAGE;
                   }else if (message.getTags().equals("B")){
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                   }else if (message.getTags().equals("C")){
                       return LocalTransactionState.UNKNOW ;
                   }else {
                       return LocalTransactionState.UNKNOW ;
                   }
    
                 }
    
                //该方法 对事务状态的回查
                // unknow 的消息会被 checkLocalTransaction() 处理
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                    System.out.println("消息Tag:"+messageExt.getTags());
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
    
            //增大超时时间,防止超时
            producer.setSendMsgTimeout(1000000);
    
            producer.start();
    
            String[] tag = {"A","B","C"} ;
    
            for (int i = 0; i < 3; i++) {
    
                Message message = new Message("topic1", tag[i],"hi~~ one way message".getBytes(StandardCharsets.UTF_8));
    
                TransactionSendResult result = producer.sendMessageInTransaction(message, null);
                System.out.println("发送结果是: "+result.getSendStatus());
    
            }
    
           // producer.shutdown();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    1. 事务消息不支持延时消息和批量消息。
    2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
    3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
    4. 事务性消息可能不止一次被检查或消费。
    5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
    6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

    rocketmq-spring-boot-starter使用

    导入

       <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.0.1</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    
    @Test
        void contextLoads() {
            rocketMQTemplate.convertAndSend("springtest","hello Rocket");
        }
    
    
    @Component
    @Slf4j
    @RocketMQMessageListener(consumerGroup = "my-group", topic = "springtest")
    public class Mqlistener implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String s) {
         log.info("接收到的消息是:{}",s);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消息的存储和发送

    实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
    ```![在这里插入图片描述](https://img-blog.csdnimg.cn/7b0e0978ab8346eeb0ba5d450b8fadbd.png)
    
    # 消息重试
    >对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
     
    
    ```c
    # 异常重试
    public class MessageListenerImpl implements MessageListener {
        @Override
        public Action consume(Message message, ConsumeContext context) {
            //处理消息
            doConsumeMessage(message);
            //方式1:返回 Action.ReconsumeLater,消息将重试
            return Action.ReconsumeLater;
            //方式2:返回 null,消息将重试
            return null;
            //方式3:直接抛出异常, 消息将重试
            throw new RuntimeException("Consumer Message exceotion");
        }
    }
    
    # 异常不重试
    public class MessageListenerImpl implements MessageListener {
        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                doConsumeMessage(message);
            } catch (Throwable e) {
                //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
                return Action.CommitMessage;
            }
            //消息处理正常,直接返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    自定义消息重试次数

    • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
    • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
    • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置
    Properties properties = new Properties();
    //配置对应 Group ID 的最大消息重试次数为 20 次
    properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
    Consumer consumer =ONSFactory.createConsumer(properties);
    
    • 1
    • 2
    • 3
    • 4

    消费幂

    因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置, key存入 数据库 来搞幂等性:

    Message message = new Message();
    message.setKey("ORDERID_100");
    SendResult sendResult = producer.send(message);
    
    • 1
    • 2
    • 3
    consumer.subscribe("ons_test", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            String key = message.getKey()
            // 根据业务唯一标识的 key 做幂等处理
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    javaScript中的函数防抖和节流
    敏捷Scrum实施落地中的3大典型问题及解法
    [Linux]进程间通信--共享内存
    jenkins插件Publish Over SSH因安全问题下架
    ICRA2022 SLAM进展---激光SLAM
    Redis
    OWASP发布十大开源软件安全风险清单
    小程序微信支付API?小程序获取手机号?
    处理机器学习数据集中字符串列(pandas.get_dummies)
    ES6 的 class 类和Typescript 的 class 类的区别
  • 原文地址:https://blog.csdn.net/weixin_45699541/article/details/126453224