• Rocketmq--消息发送和接收演示


    使用Java代码来演示消息的发送和接收

    1. <dependency>
    2.   <groupId>org.apache.rocketmq</groupId>
    3.   <artifactId>rocketmq-spring-boot-starter</artifactId>
    4.   <version>2.0.2</version>
    5. </dependency>

    1 发送消息

            消息发送步骤:

    • 创建消息生产者, 指定生产者所属的组名
    • 指定Nameserver地址
    • 启动生产者
    • 创建消息对象,指定主题、标签和消息体
    • 发送消息
    • 关闭生产者
    1. //发送消息
    2. public class RocketMQSendTest {
    3.   public static void main(String[] args) throws Exception {
    4.     //1. 创建消息生产者, 指定生产者所属的组名
    5.     DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
    6.     //2. 指定Nameserver地址
    7.     producer.setNamesrvAddr("192.168.109.131:9876");
    8.     //3. 启动生产者
    9.     producer.start();
    10.     //4. 创建消息对象,指定主题、标签和消息体
    11.     Message msg = new Message("myTopic", "myTag",
    12.                 ("RocketMQ Message").getBytes());
    13. //5. 发送消息
    14.     SendResult sendResult = producer.send(msg,10000);
    15.     System.out.println(sendResult);
    16.     //6. 关闭生产者
    17.     producer.shutdown();
    18.  }
    19. }

    2 接收消息

            消息接收步骤:

    • 创建消息消费者, 指定消费者所属的组名
    • 指定Nameserver地址
    • 指定消费者订阅的主题和标签
    • 设置回调函数,编写处理消息的方法
    • 启动消息消费者
    1. //接收消息
    2. public class RocketMQReceiveTest {
    3.   public static void main(String[] args) throws MQClientException {
    4.     //1. 创建消息消费者, 指定消费者所属的组名
    5.     DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-
    6. group");
    7.     //2. 指定Nameserver地址
    8.     consumer.setNamesrvAddr("192.168.109.131:9876");
    9.     //3. 指定消费者订阅的主题和标签
    10.     consumer.subscribe("myTopic", "*");
    11.     //4. 设置回调函数,编写处理消息的方法
    12.     consumer.registerMessageListener(new MessageListenerConcurrently() {
    13.       @Override
    14.       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
    15. msgs,
    16.                         ConsumeConcurrentlyContext
    17. context) {
    18.         System.out.println("Receive New Messages: " + msgs);
    19.         //返回消费状态
    20.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    21.      }
    22.    });
    23.     //5. 启动消息消费者
    24.     consumer.start();
    25.     System.out.println("Consumer Started.");
    26.  }
    27. }

    3 案例

    接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

    3.1  订单微服务发送消息

    1 在 shop-order 中添加rocketmq的依赖

    1. <!--rocketmq-->
    2. <dependency>
    3.   <groupId>org.apache.rocketmq</groupId>
    4.   <artifactId>rocketmq-spring-boot-starter</artifactId>
    5.   <version>2.0.2</version>
    6. </dependency>
    7. <dependency>
    8.   <groupId>org.apache.rocketmq</groupId>
    9.   <artifactId>rocketmq-client</artifactId>
    10.   <version>4.4.0</version>
    11. </dependency>

    2 添加配置

    1. rocketmq:
    2. name-server: 192.168.109.131:9876  #rocketMQ服务的地址
    3. producer:
    4.  group: shop-order # 生产者组

    3 编写测试代码

    1. @RestController
    2. @Slf4j
    3. public class OrderController2 {
    4.   @Autowired
    5.   private OrderService orderService;
    6.   @Autowired
    7.   private ProductService productService;
    8.   @Autowired
    9.   private RocketMQTemplate rocketMQTemplate;
    10.   //准备买1件商品
    11.   @GetMapping("/order/prod/{pid}")
    12.   public Order order(@PathVariable("pid") Integer pid) {
    13.     log.info(">>客户下单,这时候要调用商品微服务查询商品信息");
    14.     //通过fegin调用商品微服务
    15.     Product product = productService.findByPid(pid);
    16.     if (product == null){
    17. Order order = new Order();
    18.       order.setPname("下单失败");
    19.       return order;
    20.    }
    21.     log.info(">>商品信息,查询结果:" + JSON.toJSONString(product));
    22.     Order order = new Order();
    23.     order.setUid(1);
    24.     order.setUsername("测试用户");
    25.     order.setPid(product.getPid());
    26.     order.setPname(product.getPname());
    27.     order.setPprice(product.getPprice());
    28.     order.setNumber(1);
    29.     orderService.save(order);
    30.     //下单成功之后,将消息放到mq中
    31.     rocketMQTemplate.convertAndSend("order-topic", order);
    32.     return order;
    33.  }
    34. }

  • 相关阅读:
    前端css实现水平居中、垂直居中、水平垂直居中【木鱼精简】
    什么是.NET的极限优化数值库?
    Codeforces Round 929 (Div. 3 ABCDEFG题) 视频讲解
    ElasticSearch学习(二): Mapping的数据类型和参数
    ClickHouse 数据插入、更新与删除操作 SQL
    unreal windows SDK version 关联错误
    zsh: command not found: bun (已解决)
    数据结构与算法(Java)-前后缀分解题单
    3.7 static关键字
    08.OpenWrt-连接wifi网络
  • 原文地址:https://blog.csdn.net/Lj_chuxuezhe/article/details/132991407