• Springcloud之Rocketmq发送消息


    简介

    前面已经介绍了rocketmq的简单使用,本篇我们介绍使用订单服务下单成功之后发送短信消息。上篇docker下使用rocketmq,估计是版本问题,本篇使用一个低版本的rocketmq。

    rocketmq下载

    官网下载一个4.4的安装包,

    解压安装

    下载
    解压

    unzip rocketmq-all-4.4.0-bin-release.zip
    
    • 1

    修改名称

    MV rocketmq-all-4.4.0-bin-release rocketmq
    
    • 1

    目录

    启动namesvr

    启动

    nohup ./bin/mqnamesrv &
    
    • 1

    查看日志

    tail -f /root/logs/rocketmqlogs/namesrv.log
    
    • 1
    启动broker

    broker.conf需要修改增加

    namesrvAddr=127.0.0.1:9876
    brokerIP1=192.168.5.130 
    
    • 1
    • 2

    修改固定的jvm参数 JAVA_OPT=“${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” 根据实际情况改小点
    -Xms256m -Xmx256m -Xmn128m

    [root@elite rocketmq]#  vim bin/runbroker.sh 
    [root@elite rocketmq]# vim bin/runserver.sh 
    
    • 1
    • 2

    启动

    nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true -c ../conf/broker.conf  &
    
    • 1

    查看日志

    tail -f /root/logs/rocketmqlogs/broker.log
    
    • 1
    关闭服务
    bin/mqshutdown namesrv
    bin/mqshutdown broker
    
    • 1
    • 2
    测试

    发送消息

     bin/tools.sh  org.apache.rocketmq.example.quickstart.Producer
    
    • 1

    发送消息
    消费消息

    bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
    • 1

    消费消息

    手动创建topic
    ./mqadmin updateTopic -n localhost:9876  -b localhost:10911  -t order-topic
    
    • 1

    项目搭建

    依赖

    用户与订单模块中都需要添加

     <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-spring-boot-starterartifactId>
                <version>2.0.2version>
            dependency>
            <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-clientartifactId>
                <version>4.4.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    yml配置地址
    #rocketmq配置
    rocketmq:
      name-server: 192.168.5.130:9876
      producer:
        group: springcloud-order
    
    • 1
    • 2
    • 3
    • 4
    • 5
    订单模块
    package com.elite.springcloud.controller;
    
    import com.elite.springcloud.entity.Order;
    import com.elite.springcloud.entity.Product;
    import com.elite.springcloud.entity.User;
    import com.elite.springcloud.interfaces.ProductService;
    import com.elite.springcloud.service.IOrderService;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.math.BigDecimal;
    
    /**
     * 

    * 订单表 前端控制器 * 下单控制 *

    * * @author elite * @since 2022-09-10 */
    @RestController @RequestMapping("/springcloud/order/rocketmq") public class OrderRocketMqController { //订单服务 @Autowired IOrderService orderService; @Autowired ProductService productService; @Autowired RocketMQTemplate rocketMQTemplate; /** * 模拟下单 传入商品id,用户随机 * @param product_id * @return */ @GetMapping("/saveOrder/{product_id}") public String saveOrder(@PathVariable("product_id")Integer product_id ){ //获取商品 Product product = productService.getProductById(product_id); if (product == null){ return "商品信息不存在"; } //用户信息 User user = new User(); user.setUserId(1); //订单信息 Order order = new Order(); order.setOrderNo(10); order.setProductId(product_id); order.setUserId(user.getUserId()); order.setOrderNum(1); order.setOrderAmt(product.getProductPrice()); order.setOrderStatus("下单"); order.setPayStatus("支付成功"); order.setCreateBy("牛奶糖"); order.setUpdateBy("牛奶糖"); orderService.save(order); //下单成功之后,将消息放到mq中 rocketMQTemplate.syncSend("order-topic", order,6000); //发布下单消息 return "下单成功"+order.toString(); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    用户模块添加监听
    //监听消息
    @Slf4j
    @Service
    @RocketMQMessageListener(consumerGroup = "springcloud-user", topic = "order-topic")
    public class MessageLsner implements RocketMQListener<Order> {
        @Override
        public void onMessage(Order order) {
            log.info("订单信息:", JSON.toJSONString(order));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    测试模块

    发送下订单请求:
    发送订单请求
    存数据库订单
    保存订单信息
    监听消息
    消费者消息
    由于Sms开通有问题,这里就不讲解了。

    重点问题:

    1. 启动broker前修改内存配置

    修改固定的jvm参数 JAVA_OPT=“${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” 根据实际情况改小点
    ##-Xms256m -Xmx256m -Xmn128m

    [root@elite rocketmq]#  vim bin/runbroker.sh 
    [root@elite rocketmq]# vim bin/runserver.sh 
    
    • 1
    • 2
    1. 无法自动创建主题
    [root@elite conf]# cat broker.conf 
    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    namesrvAddr=localhost:9876 ## 添加namesrv
    brokerIP1=192.168.5.130 ##添加ip
    autoCreateTopicEnable=true ###开启自动创建topic
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
     启动broker加参数指向配置文件,否则配置文件不生效。
    
    • 1
    bin/mqbroker -n localhost:9876 -c conf/broker.conf
    
    • 1
    1. 发送消息提示内存不足
    org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14  DESC: service not available now, maybe disk full, CL:  0.97 CQ:  0.97 INDEX:  0.97, maybe your broker machine memory too small.
    For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
    
    • 1
    • 2
    ###切换到到 rocketmq 配置文件所在路径
    vim /usr/rocketmq/conf/2m-2s-async/broker-a.properties
    ###最后一行增加 diskMaxUsedSpaceRatio=99,表示剩余磁盘比例不足99%才报错
    diskMaxUsedSpaceRatio=99
    ###wq 保存退出
    
    • 1
    • 2
    • 3
    • 4
    • 5

    改了,重启namesrv,broker.

  • 相关阅读:
    十六、Vert.x、Actix-web、Warp、Axum 性能对比
    计算机网络-Traffic-Filter流量过滤策略
    微服务化的基石——持续集成
    关于ES5内置函数Object的新方法--Object.create()
    【Java进阶】包装类
    Spring Boot自动装配原理
    红黑树与AVL树
    计算机组成原理_浮点数的表示与运算
    windows无法使用hadoop报错:系统找不到路径
    嵌入式IDE(1):IAR中ICF链接文件详解和实例分析
  • 原文地址:https://blog.csdn.net/qq_37400096/article/details/126793331