• RocketMQ的常见应用样例及其实现


    引入依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    基本样例

    Producer

    我们经常会使用RocketMQ发送以下三种类型的消息:同步消息、异步消息和单向消息。
    其中前两种消息是可靠的,因为会有发送是否成功的应答。

    1、发送同步消息

    应用场景:这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

    package com.example.demo.example4;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    /**
     * 1、发送同步消息
     *
     * @author 流星
     */
    public class SyncProducer {
    	public static void main(String[] args) throws Exception {
        	// 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        	// 设置NameServer的地址
        	producer.setNamesrvAddr("127.0.0.1:9876");
        	// 启动Producer实例
            producer.start();
        	for (int i = 0; i < 100; i++) {
        	    // 创建消息,并指定Topic,Tag和消息体
        	    Message msg = new Message("TopicTest",
            	"SyncProducer" ,
            	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            	);
            	// 发送消息到一个Broker
                SendResult sendResult = producer.send(msg);
                // 通过sendResult返回消息是否成功送达
                System.out.printf("%s%n", sendResult);
        	}
        	// 如果不再发送消息,关闭Producer实例。
        	producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2、发送异步消息

    应用场景:异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    package com.example.demo.example4;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.CountDownLatch2;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.common.message.Message;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 2、发送异步消息
     *
     * @author 流星
     */
    public class AsyncProducer {
    	public static void main(String[] args) throws Exception {
        	// 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        	// 设置NameServer的地址
            producer.setNamesrvAddr("127.0.0.1:9876");
        	// 启动Producer实例
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
    
    	int messageCount = 100;
            // 根据消息数量实例化倒计时计算器
    	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
        	for (int i = 0; i < messageCount; i++) {
                    final int index = i;
                	// 创建消息,并指定Topic,Tag和消息体
                    Message msg = new Message("TopicTest",
                        "AsyncProducer",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // SendCallback接收异步返回结果的回调
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            countDownLatch.countDown();
                            System.out.printf("%-10d OK %s %n", index,
                                sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                            countDownLatch.countDown();
          	                System.out.printf("%-10d Exception %s %n", index, e);
          	                e.printStackTrace();
                        }
                	});
        	}
    	// 等待5s
    	countDownLatch.await(5, TimeUnit.SECONDS);
        	// 如果不再发送消息,关闭Producer实例。
        	producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    3、单向发送消息

    应用场景:这种方式主要用在不特别关心发送结果的场景,例如日志发送。

    package com.example.demo.example4;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.common.message.Message;
    
    /**
     * 3、单向发送消息
     *
     * @author 流星
     */
    public class OnewayProducer {
    	public static void main(String[] args) throws Exception{
        	// 实例化消息生产者Producer
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        	// 设置NameServer的地址
            producer.setNamesrvAddr("127.0.0.1:9876");
        	// 启动Producer实例
            producer.start();
        	for (int i = 0; i < 100; i++) {
            	// 创建消息,并指定Topic,Tag和消息体
            	Message msg = new Message("TopicTest" ,
                    "OnewayProducer" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            	);
            	// 发送单向消息,没有任何返回结果
            	producer.sendOneway(msg);
    
        	}
        	// 如果不再发送消息,关闭Producer实例。
        	producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    Consumer

    package com.example.demo.example4;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
    
    	public static void main(String[] args) throws InterruptedException, MQClientException {
    
        	// 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
        	// 设置NameServer的地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
        	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe("TopicTest", "*");
        	// 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // 标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    顺序消息样例

    消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

    顺序消费的原理

    1、在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列),而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。
    2、如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

    • 当发送和消费参与的queue只有一个,则是全局有序;
    • 如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

    以订单消费为例

    一个订单的顺序流程是:创建、付款、推送、完成。
    订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

    订单实体

    package com.example.demo.example5;
    
    /**
     * @author 流星
     */
    public class OrderStep {
           private long orderId;
           private String desc;
    
           public long getOrderId() {
               return orderId;
           }
    
           public void setOrderId(long orderId) {
               this.orderId = orderId;
           }
    
           public String getDesc() {
               return desc;
           }
    
           public void setDesc(String desc) {
               this.desc = desc;
           }
    
           @Override
           public String toString() {
               return "OrderStep{" +
                   "orderId=" + orderId +
                   ", desc='" + desc + '\'' +
                   '}';
           }
       }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    Producer

    package com.example.demo.example5;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    
    /**
    * Producer,发送顺序消息
     * @author 流星
     */
    public class Producer {
    
       public static void main(String[] args) throws Exception {
           DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    
           producer.setNamesrvAddr("127.0.0.1:9876");
    
           producer.start();
    
           String[] tags = new String[]{"TagA", "TagC", "TagD"};
    
           // 订单列表
           List<OrderStep> orderList = new Producer().buildOrders();
    
           Date date = new Date();
           SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           String dateStr = sdf.format(date);
           for (int i = 0; i < 10; i++) {
               // 加个时间前缀
               String body = dateStr + " Hello RocketMQ " + orderList.get(i);
               Message msg = new Message("Test", tags[i % tags.length], "KEY" + i, body.getBytes());
    
               SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                       //根据订单id选择发送queue
                       Long id = (Long) arg;
                       long index = id % mqs.size();
                       return mqs.get((int) index);
                   }
               }, orderList.get(i).getOrderId());
    
               System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                   sendResult.getSendStatus(),
                   sendResult.getMessageQueue().getQueueId(),
                   body));
           }
    
           producer.shutdown();
       }
    
       /**
        * 生成模拟订单数据
        */
       private List<OrderStep> buildOrders() {
           List<OrderStep> orderList = new ArrayList<>();
    
           OrderStep orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("推送");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           return orderList;
       }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118

    Consumer

    package com.example.demo.example5;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    /**
    * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
     * @author 流星
     */
    public class ConsumerInOrder {
    
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           /**
            * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
    * 如果非第一次启动,那么按照上次消费的位置继续消费 */
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("Test", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    延时消息样例

    应用场景: 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

    Consumer

    package com.example.demo.example6;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.List;
    
    /**
     * @author 流星
     */
    public class ScheduledMessageConsumer {
       public static void main(String[] args) throws Exception {
          // 实例化消费者
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
          // 设置NameServer的地址
          consumer.setNamesrvAddr("127.0.0.1:9876");
          // 订阅Topics
          consumer.subscribe("Topic", "*");
          // 注册消息监听者
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                  for (MessageExt message : messages) {
                      // Print approximate delay time period
                      System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
                  }
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
          // 启动消费者
          consumer.start();
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    Producer

    package com.example.demo.example6;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    /**
     * @author 流星
     */
    public class ScheduledMessageProducer {
       public static void main(String[] args) throws Exception {
          // 实例化一个生产者来产生延时消息
          DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
          // 设置NameServer的地址
          producer.setNamesrvAddr("127.0.0.1:9876");
          // 启动生产者
          producer.start();
          int totalMessagesToSend = 100;
          for (int i = 0; i < totalMessagesToSend; i++) {
              Message message = new Message("Topic", ("Hello scheduled message " + i).getBytes());
              // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
              message.setDelayTimeLevel(3);
              // 发送消息
              producer.send(message);
          }
           // 关闭生产者
          producer.shutdown();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    附:参考链接

    rocketmq官网

  • 相关阅读:
    Spring框架(十):Spring注解开发配置MyBatis框架等第三方框架
    【无标题】
    pyhanlp安装教程
    markdown math语法
    matlab 求不定积分
    条码扫描器:打开一个全新的数字世界
    内存管理的概念
    qml制作简单的播放器--MediaPlayer
    C语言中各种运算符用法
    千亿IT运维市场,产品要凭实力说话
  • 原文地址:https://blog.csdn.net/weixin_45974176/article/details/126380653