• rocketmq


    🍓代码仓库

    https://gitee.com/xuhx615/rocket-mqdemo.git

    🍓基本概念

    • ⭐生产者(Producer):消息发布者
    • ⭐主题(Topic):topic用于标识同一类业务类型的消息
    • ⭐消息队列(MessageQueue):传输和存储消息的容器,是消息的最小存储单元
    • ⭐消费者(Consumer):消息订阅者
    • ⭐消费者组(ConsumerGroup):消息订阅者组,多个消费者之间进行负载均衡消费消息
    • nameServer:注册中心
    • Broker:消息中转站,用于接收生产者的消息并持久化,然后发送给对应的topic

    🍓下载安装rocketmq

    1. ⭐前往官网https://rocketmq.apache.org下载rocketmq安装包和rocketmq图形化界面rocketmq Dashboard
    2. ⭐解压rocketmq安装包
      [root@Centos101 rocketmq]# unzip rocketmq-all-5.1.3-bin-release.zip
      
      • 1
    3. ⭐修改namserver启动脚本runserverJVM内存参数(根据实际服务器资源设置,以下参数为学习时设置的参数)
       [root@Centos101 bin]# vi runserver.sh
       修改前:JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
       修改后:JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
      
      • 1
      • 2
      • 3
    4. ⭐启动nameserver
       [root@Centos101 bin]# ./mqnamesrv &
      
      • 1
    5. ⭐查看nameserver启动日志
      [root@Centos101 bin]# tail -100f nohup.out
      
      • 1
    6. ⭐修改broker启动脚本runbokerJVM内存参数
      [root@Centos101 bin]# vi runbroker.sh
      修改前:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
      修改后:JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
      
      • 1
      • 2
      • 3
    7. ⭐修改broker.conf配置文件
      默认配置
      #集群名称
      brokerClusterName = DefaultCluster
      #broker名称
      brokerName = broker-a
      #当前节点为主节点(主节点为0)
      brokerId = 0
      deleteWhen = 04
      fileReservedTime = 48
      brokerRole = ASYNC_MASTER
      flushDiskType = ASYNC_FLUSH
      
      新增以下配置
      #自动创建topic
      autoCreateTopicEnable = true
      #namesrvAddr地址
      namesrvAddr = 192.168.113.101:9876
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    8. ⭐启动broker
      [root@Centos101 bin]# ./mqbroker -c ../conf/broker.conf &
      
      • 1
    9. ⭐验证
      生产者:
         [root@Centos101 bin]# export NAMESRV_ADDR='192.168.113.101:9876'
         [root@Centos101 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
      消费者:
         [root@Centos101 bin]# export NAMESRV_ADDR='192.168.113.101:9876'
         [root@Centos101 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    10. ⭐关闭broker
      [root@Centos101 bin]# sh ./mqshutdown broker
      
      • 1
    11. ⭐关闭nameserver
      [root@Centos101 bin]# sh ./mqshutdown namesrv
      
      • 1

    🍓rocketmq集群安装

    1. ⭐主机名配置

      192.168.113.101 Centos101
      192.168.113.102 Centos102
      192.168.113.103 Centos103
      
      • 1
      • 2
      • 3
    2. ⭐免密登录

    3. 关闭防火墙

    4. 配置文件配置:

      • 📌2m-2s-async:2主2从异步刷盘(吞吐量较大,但消息可能会丢失)当生产者发送消息到主节点,主节点会直接给生产返回收到消息,然后异步同步给从节点
      • 📌2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但消息会更安全)当生产者发送消息到主节点,主节点会同步同步给从节点,然后才给生产者返回收到消息
      • 📌2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
      • 📌集群搭建架构
           Centos101:部署nameserver
           Centos102:部署nameserver  broker-a,broker-b-s
           Centos103:部署nameserver  broker-b,broker-a-s
        
        • 1
        • 2
        • 3
    5. ⭐集群启动

      • 📌nameserver服务启动
        分别在三个机器上启动nameserver
        [root@Centos101 bin]# ./mqnamesrv &
        [root@Centos102 bin]# ./mqnamesrv &
        [root@Centos103 bin]# ./mqnamesrv &
        
        • 1
        • 2
        • 3
      • 📌broker服务启动
        在Centos102机器上启动broker(broker-a主节点和broker-b-s从节点)
        [root@Centos102 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
        [root@Centos102 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
        在Centos103机器上启动broker(broker-b主节点和broker-a-s从节点)
        [root@Centos103 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
        [root@Centos103 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
      • 📌验证
        在Centos102上模拟生产者
        [root@Centos102 bin]# export NAMESRV_ADDR='Centos101:9876;Centos102:9876;Centos103:9876'
        [root@Centos102 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
        在Centos103上模拟消费者
        [root@Centos103 bin]# export NAMESRV_ADDR='Centos101:9876;Centos102:9876;Centos103:9876'
        [root@Centos103 bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6

    🍓安装rocketmq图形化管理界面:dashboard

    修改application.properties
    rocketmq.config.namesrvAddr=Centos101:9876;Centos102:9876;Centos103:9876
    修改logback.xml日志路径
    
    • 1
    • 2
    • 3

    🍓rocketmq的local模式启动,新增proxy模块(5.0后支持的模块)

    引入 Proxy 模块后,Proxy 承担了协议适配、权限管理、消息管理等计算功能,Broker 则更加专注于存储。这样存储和计算相分离,在云原生环境下可以更好地进行资源调度。

    [root@Centos102 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-a.properties --enable-proxy &
    [root@Centos103 bin]# ./mqbroker -c ../conf/2m-2s-async/broker-b.properties --enable-proxy &
    
    • 1
    • 2

    🍓部署模型

    在这里插入图片描述

    🍓消息发送过程

    在这里插入图片描述
    在这里插入图片描述

    🍓消息存储过程

    在这里插入图片描述

    🍓生产者

    生产者分为同步生产者和异步生产者以及单项生产者

    • ⭐同步生产者:生产者将消息推送Broker,等待Broker返回推送确认,再推送下一个
      1、可靠性要求高
      2、数据量级少
      3、实时响应
      import org.apache.rocketmq.client.producer.DefaultMQProducer;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.client.producer.SendStatus;
      import org.apache.rocketmq.common.message.Message;
      
      	DefaultMQProducer producer = null;
      	try {
      	    producer = new DefaultMQProducer("syncProducer");
      	    producer.setNamesrvAddr("192.168.113.101:9876");
      	    producer.start();
      	    for (int i = 0; i < 2; i++) {
      	        String body = "Hello zhang " + i;
      	        //参数一:主题、参数二:过滤、参数三:消息内容
      	        Message message = new Message("rocketmq_syncDemo","tag", body.getBytes("UTF-8"));
      	        //同步发送
      	        SendResult result = producer.send(message);
      	        String msgId = result.getMsgId();
      	        SendStatus sendStatus = result.getSendStatus();
      	        logger.info("{}消息发送状态为{}", msgId, sendStatus);
      	    }
      	} catch (Exception e) {
      	    logger.error("生产者发送消息失败!" ,e);
      	} finally {
      	    if (producer != null) {
      	        producer.shutdown();
      	    }
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
    • ⭐异步生产者:生产者将消息推送Broker,不会等待Broker返回推送确认,直接推送下一个,但是会回调方法告诉生产者消息是否发送成功。
      	try {
              //异步发送
              producer.send(message, new SendCallback() {
                  @Override
                  public void onSuccess(SendResult sendResult) {
                      String msgId = sendResult.getMsgId();
                      SendStatus sendStatus = sendResult.getSendStatus();
                      logger.info("{}消息发送状态为{}", msgId, sendStatus);
                  }
                  @Override
                  public void onException(Throwable throwable) {
                      logger.error("消息发送失败", throwable);
                  }
              });
      	} catch (Exception e) {
      	    logger.error("生产者发送消息失败!" ,e);
      	} finally {
      	    //异步发送不应该关闭,关闭了便无法回调方法
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • ⭐单项生产者:生产者将消息推送Broker,不会等待Broker返回推送确认,直接推送下一个。
      	//单向发送
      	producer.sendOneway(message);
      
      • 1
      • 2

    🍓消费者

    消费者分为推模式和拉模式

    • ⭐推模式:消费者等待Broker把消息推送过来(被动消费)

      import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
      import org.apache.rocketmq.client.consumer.listener.*;
      import org.apache.rocketmq.client.exception.MQClientException;
      import org.apache.rocketmq.common.message.MessageExt;
      
      	DefaultMQPushConsumer consumer = null;
      	try {
      	    consumer = new DefaultMQPushConsumer("group_rocketmq_syncDemo");
      	    consumer.setNamesrvAddr("192.168.113.101:9876");
      	    //参数一:topic、参数二:过滤(*表示不过滤)
      	    consumer.subscribe("rocketmq_syncDemo", "*");
      	    //设置消息监听
      	    //MessageListenerConcurrently 并发消费监听
      	    consumer.setMessageListener(new MessageListenerConcurrently() {
      	        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
      	            list.forEach(item -> {
      	                try {
      	                    logger.info("消息消费成功!消息ID={},消息内容:{}", item.getMsgId(), new String(item.getBody(), "UTF-8"));
      	                } catch (Exception e) {
      	                    logger.error("消息消费失败!", e);
      	                }
      	            });
      	            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      	        }
      	    });
      	    //消费者启动
      	    consumer.start();
      	} catch (MQClientException e) {
      	    logger.error("消费者消费异常!",e);
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
    • ⭐拉模式:消费者主动去Broker上拉取消息(主动消费)

      import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
      import org.apache.rocketmq.client.consumer.PullResult;
      import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
      import org.apache.rocketmq.client.exception.MQClientException;
      import org.apache.rocketmq.common.message.MessageQueue;
      
      	try {
      	    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group_rocketmq_asyncDemo");
      	    consumer.setNamesrvAddr("192.168.113.101:9876");
      	    Set<String> topicSet = new HashSet<>();
      	    topicSet.add("rocketmq_asyncDemo");
      	    consumer.setRegisterTopics(topicSet);
      	    consumer.start();
      	    //主题遍历
      	    while (true) {
      	        consumer.getRegisterTopics().forEach(item -> {
      	            try {
      	                Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(item);
      	                //消息队列
      	                messageQueues.forEach(item2 -> {
      	                    try {
      	                        long offset = consumer.getOffsetStore().readOffset(item2, ReadOffsetType.READ_FROM_MEMORY);
      	                        if (offset < 0) {
      	                            offset = consumer.getOffsetStore().readOffset(item2, ReadOffsetType.READ_FROM_STORE);
      	                        }
      	                        if (offset < 0) {
      	                            offset = consumer.maxOffset(item2);
      	                        }
      	                        if (offset < 0) {
      	                            offset = 0;
      	                        }
      	                        PullResult result = consumer.pull(item2, "*", offset, 32);
      	                        if (result != null) {
      	                            switch (result.getPullStatus()) {
      	                                case FOUND:{
      	                                    result.getMsgFoundList().forEach(item3 -> {
      	                                        try {
      	                                            logger.info("消息消费成功!消息ID={},消息内容:{}", item3.getMsgId(), new String(item3.getBody(), "UTF-8"));
      	                                            consumer.updateConsumeOffset(item2, result.getNextBeginOffset());
      	                                        } catch (Exception e) {
      	                                            logger.error("遍历消息信息失败!" , e);
      	                                        }
      	                                    });
      	                                    break;
      	                                }
      	                                case NO_NEW_MSG:{
      	                                    logger.info("没有最新消息!");
      	                                    break;
      	                                }
      	                                case NO_MATCHED_MSG: {
      	                                    logger.info("没有匹配的消息!");
      	                                    break;
      	                                }
      	                                case OFFSET_ILLEGAL: {
      	                                    logger.error("偏移量非法,当前偏移量为{}", offset);
      	                                    break;
      	                                }
      	                            }
      	                        }
      	                    } catch (Exception e) {
      	                        logger.error("遍历消息队列失败!", e);
      	                    }
      	                });
      	            } catch (MQClientException e) {
      	                logger.error("遍历主题失败!", e);
      	            }
      	        });
      	    }
      	} catch (MQClientException e) {
      	    logger.error("消息拉取失败!", e);
      	}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 📌随机获取消息队列消息
        DefaultLitePullConsumer consumer = null;
        try {
            consumer = new DefaultLitePullConsumer("group_rocketmq_asyncDemo");
            consumer.setNamesrvAddr("192.168.113.101:9876");
            consumer.subscribe("rocketmq_asyncDemo", "*");
            consumer.start();
            while (true) {
                List<MessageExt> messageExtList = consumer.poll();
                messageExtList.forEach(item -> {
                    try {
                        logger.info("获取消息成功!消息队列ID={},消息ID={},消息内容{}", item.getQueueId(),item.getMsgId(), new String(item.getBody(), "UTF-8"));
                    } catch (Exception e) {
                        logger.error("获取消息异常!",e);
                    }
                });
            }
        } catch (MQClientException e) {
            logger.error("获取消息异常!",e);
        } finally {
            if (consumer != null) {
                consumer.shutdown();
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
      • 📌指定消息队列获取消息
        	//指定第一个消息队列消费
            consumer.seek(messageQueueList.get(0), 10);
        
        • 1
        • 2

    🍓顺序消息

    • ⭐生产者需要将有序消息发送到同一个队列

    • ⭐消费者push模式,通过加锁的方式,使得一个队列同时只有一个消费者,每隔一段时间就会延长锁的时间(有超时机制),直到整个队列的消息全部消费

    • ⭐消费者pull模式,只要消费者自己能保证消息顺序消费就行

    • ⭐消费线程数需设置为1

    • ⭐生产者代码

      //i 队列序号
      for (int i = 0; i < 5; i++) {
          //j 消息序号
          for (int j = 0; j < 100; j++) {
              Message message = new Message("rocketmq_orderDemo", "tag", ("Hello world!" + j).getBytes("UTF-8"));
              producer.send(message, new MessageQueueSelector() {
                  /**
                   * 
                   * @param list 队列集合
                   * @param message 消息 (send函数第一个参数)
                   * @param o 队列序号 (send函数第三个参数)
                   * @return 消息队列
                   */
                  @Override
                  public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                      return list.get(Integer.parseInt(o.toString()));
                  }
              }, i);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • ⭐消费者代码

      //MessageListenerOrderly有序消息监听(不要使用并发消费监听)
      consumer.setMessageListener(new MessageListenerOrderly() {
          @Override
          public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
              list.forEach(item -> {
                  try {
                      logger.info("消息接收成功!消息队列={},消息ID={},消息内容={}", item.getQueueId(), item.getMsgId(), new String(item.getBody(), "UTF-8"));
                  } catch (Exception e) {
                      logger.error("消息接收异常!", e);
                  }
              });
              return ConsumeOrderlyStatus.SUCCESS;
          }
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14

    🍓广播消息

    生产者:

     //设置为广播模式
     consumer.setMessageModel(MessageModel.BROADCASTING);
    
    • 1
    • 2

    🍓延时消息

    生产者:

    //1-18 对应 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (消费者会跟生产者设置的时间延迟接收消息)
    //message.setDelayTimeLevel(3);
    
    //设置自定义时间,单位毫秒
    message.setDelayTimeMs(10000L);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    🍓批量消息

    • ⭐优点:减少网络OA,提高吞吐量

    • 限制:

      • 消息大小不能超过4M
      • 相同的topic
      • 相同的waitStoreMsgOk
      • 不能是延迟消息、事务消息等
    • ⭐切割消息工具

      /**
       * 消息集合切割
       * 消息大小 = 消息长度 + 主题长度 + 消息自定义属性key长度 + 消息自定义属性val长度 + 20(日志空余)
       * @author xuhaixiang
       * @date 2023-09-10
       */
      public class ListSplitter implements Iterator<List<Message>> {
      
          /**
           * 消息大小限制 1MB
           */
          private static final int SIZE_LIMIT = 10 * 1000;
      
          /**
           * 消息集合
           */
          private final List<Message> messageList;
      
          /**
           * 当前索引
           */
          private int currentIndex;
      
          public ListSplitter(List<Message> messageList) {
              this.messageList = messageList;
          }
      
      
          @Override
          public boolean hasNext() {
              return currentIndex < messageList.size();
          }
      
          @Override
          public List<Message> next() {
              int nextIndex = currentIndex;
              int totalSize = 0;
              for (; nextIndex < messageList.size(); nextIndex++) {
                  Message message = messageList.get(nextIndex);
                  int messageSize = message.getBody().length + message.getTopic().length();
                  Map<String, String> properties = message.getProperties();
                  for (String key : properties.keySet()) {
                      String val = properties.get(key);
                      messageSize += key.length() + val.length();
                  }
                  messageSize += 20;
                  totalSize += messageSize;
                  if (totalSize > SIZE_LIMIT) {
                      nextIndex = nextIndex - 1;
                      break;
                  }
              }
              List<Message> result = messageList.subList(currentIndex, nextIndex);
              currentIndex = nextIndex;
              return result;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
    • ⭐消息批量发送

      List<Message> messages = new ArrayList<>();
      for (int i = 0; i < 2000; i++) {
          String body = i + "Lorem ipsum dolor sit amet consectetur adipisicing elit. Quasi exercitationem laudantium repellendus quisquam aspernatur est neque quidem vitae nostrum! Quia voluptatibus vitae tempore! Repellendus quam aspernatur, nam neque hic esse!";
          //参数一:主题、参数二:过滤、参数三:消息内容
          Message message = new Message("rocketmq_syncDemo","tag", body.getBytes("UTF-8"));
          messages.add(message);
      }
      ListSplitter listSplitter = new ListSplitter(messages);
      while (listSplitter.hasNext()) {
          List<Message> messageList = listSplitter.next();
          SendResult result = producer.send(messageList);
          String msgId = result.getMsgId();
          SendStatus sendStatus = result.getSendStatus();
          logger.info("{}消息发送状态为{}", msgId, sendStatus);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15

    🍓过滤消息

    • tag过滤

      • 📌生产者
        String[] tagArr = {"tagA", "tagB", "tagC"};
        for (int i = 0; i < 2; i++) {
            for (String tag : tagArr) {
                String body = tag + ", Hello zhang " + i;
                //参数一:主题、参数二:过滤、参数三:消息内容
                Message message = new Message("rocketmq_syncDemo",tag, body.getBytes("UTF-8"));
                SendResult result = producer.send(message);
                String msgId = result.getMsgId();
                SendStatus sendStatus = result.getSendStatus();
                logger.info("{}消息发送状态为{}", msgId, sendStatus);
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
      • 📌消费者·
        //参数一:topic、参数二:过滤(*表示不过滤),多个tag可以使用||
        consumer.subscribe("rocketmq_syncDemo", "tagA || tagC");
        
        • 1
        • 2
    • SQL过滤

      • 📌生产者
        message.putUserProperty("type", "elg_" + i);
        
        • 1
      • 📌消费者(必须推模式)
         //过滤方式二(注意该sql里面字段是区分大小写的)
         //sql过滤方式,borker配置文件必须设置属性enablePropertyFilter=true,并且消费者必须是推模式
         //另外消息过滤行为是在broker端进行的,可以提升网络传输性能,但是会增加服务器的压力(将过滤sql推送给broker)
         consumer.subscribe("rocketmq_syncDemo", MessageSelector.bySql("TAGS is not null and TAGS in ('tagA','tagC') and type = 'elg_0'"));
        
        • 1
        • 2
        • 3
        • 4

    🍓事务消息

    • ⭐事务消息是分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是两个操作一起成功或者一起失败。

    • ⭐事务消息机制的关键是在发送消息时会将消息转为一个half消息,并存入rocketmq内部的一个Topic(RMQ_SYS_TRANS_HALF_TOPIC),这个topic对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标topic,这样消费者就可见了。

    • ⭐事务消息实现原理主要通过两个发送阶段和一个确认阶段来实现

    • ⭐本地事务消息执行器(本地事务执行和本地事务回查,用于向rocketmq发送提交、回滚、无状态三种结果)

      import org.apache.rocketmq.client.producer.LocalTransactionState;
      import org.apache.rocketmq.client.producer.TransactionListener;
      import org.apache.rocketmq.common.message.Message;
      import org.apache.rocketmq.common.message.MessageExt;
      import org.apache.rocketmq.logging.org.slf4j.Logger;
      import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
      
      /**
       * 本地事务实现类
       * @author xuhaixiang
       * @date 2023-09-11
       */
      public class TransactionListenerImpl implements TransactionListener {
      
          /**
           * 日志对象
           */
          private static  final Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);
      
          /**
           * 本地事务执行
           * @param message
           * @param o
           * @return
           */
          @Override
          public LocalTransactionState executeLocalTransaction(Message message, Object o) {
              String tags = message.getTags();
              logger.info("{}本地事务执行", tags);
              if ("tagA".equals(tags)) {
                  //tagA允许发送
                  return LocalTransactionState.COMMIT_MESSAGE;
              }
              if ("tagB".equals(tags)) {
                  //tagB消息回滚
                  return LocalTransactionState.ROLLBACK_MESSAGE;
              }
              //其他消息无状态,无状态消息会进行本地事务回查
              return LocalTransactionState.UNKNOW;
          }
      
          /**
           * 本地事务回查
           * @param messageExt
           * @return
           */
          @Override
          public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
              String tags = messageExt.getTags();
              logger.info("{}本地事务回查", tags);
              if ("tagC".equals(tags)) {
                  //tagC本地事务回查允许发送
                  return LocalTransactionState.COMMIT_MESSAGE;
              }
              return LocalTransactionState.UNKNOW;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
    • ⭐生产者

      TransactionMQProducer producer = null;
      try {
          producer = new TransactionMQProducer("transactionProductor");
          producer.setNamesrvAddr("192.168.113.101:9876");
          //开启异步线程,用于异步执行本地事务执行和回查两个动作
          ExecutorService service = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS ,new ArrayBlockingQueue<>(20000), new ThreadFactory(){
              @Override
              public Thread newThread(Runnable r) {
                  Thread thread = new Thread(r);
                  thread.setName("transaction");
                  return thread;
              }
          });
          producer.setExecutorService(service);
      
          //设置本地事务执行器
          producer.setTransactionListener(new TransactionListenerImpl());
          producer.start();
          String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
          for (int i = 0; i < 10; i++) {
              for (String tag : tags) {
                  Message message = new Message("rocketmq_transactionDemo", tag, (tag + " Hello world!" + i).getBytes("UTF-8"));
                  TransactionSendResult result = producer.sendMessageInTransaction(message, null);
                  logger.info("消息发送成功!消息ID={}" + result.getMsgId());
              }
          }
          //让生产者存活一段时间可以回调本地事务执行和本地事务回查
          Thread.sleep(60000);
      } catch (Exception e) {
          logger.error("消息发送异常!", e);
      } finally {
          if (producer != null) {
              producer.shutdown();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
    • ⭐消费者。事务与消费没有任何关系,消费者正常消费消息就行。

    🍓如何保证消息不丢失

    消息丢失的几种情况:

    1. 生产者将消息发送给broker,当网络发生异常,消息可能会丢失
      解决:消息发送后会有ack返回,当我们发现消息发送失败,可以做一个重试机制
    2. 消费者拿到消息,会立即发送ack告诉broker收到,但是在接下来处理消息时发生了异常,可能会导致消息丢失,消息无法重新消费
      解决:先处理完消息之后,再返回ackbroker
    3. broker存储消息阶段,异步刷盘可能会出现问题导致消息丢失
      解决:使用同步刷盘机制;集群模式采用同步复制

    🍓消息持久化机制

    rocketmq的消息持久化机制是指将消息存储在磁盘上,以确保消息能够可靠存储和检索
    rocketmq消息持久化涉及以下三个角色

    • CommitLog消息存储文件
      • 📌存储方式:
        • 🍁同步刷盘:消息存储到内存,再从内存存储到commitLog,然后返回生产者ack
        • 🍁异步刷盘:消息存储到内存,然后返回生产者ack,再异步存储到commitLog
      • 📌文件固定大小1G,超过则新开辟一个文件
    • ConsumeQueue
      存储commitLog当前读取的偏移量、消息大小、tags
    • IndexFile
      存储消息自定义的属性、与之对应的消息偏移量、时间参数、下一个Index偏移量

    🍓rocketmq保证消息有序

    • ⭐生产者需要将有序消息发送到同一个队列
    • ⭐消费者push模式,通过加锁的方式,使得一个队列同时只有一个消费者,每隔一段时间就会延长锁的时间(有超时机制),直到整个队列的消息全部消费
    • ⭐消费者pull模式,只要消费者自己能保证消息顺序消费就行
    • ⭐消费线程数需设置为1
  • 相关阅读:
    无叶风扇32位MCU单片机MM32SPIN0230
    期末Java题库--判断题之错误篇2
    第十二届蓝桥杯物联网试题(国赛)
    Python计算两个时间的时间差(工作笔记需要自取)
    (三十三)geoserver源码&添加新的数据存储
    MongoDB与Pymongo深度实践:从基础概念到无限级评论应用示例
    隐私计算从AI到BI:隐语SCQL数据分析引擎上线
    记ZooKeeper3.7在win下的单机部署
    3.0 设计模式汇总
    企业开发中名词解析— —加密与脱敏及部分开发规范
  • 原文地址:https://blog.csdn.net/qq_34191426/article/details/132864954