• Springboot整合RocketMq


    介绍

    MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。RocketMQ是⼀款阿⾥巴巴开源的消息中间件,主要用于限流,异步解耦操作,如付款之后短信通知,订单发货通知等等,都是异步进行执行。

    rocketmq搭建

    查找rockermq镜像
    docker search rocketmq
    
    • 1

    rocketmq

    拉取镜像
    docker pull rocketmqinc/rocketmq
    
    • 1

    下载成功

    创建目录
     mkdir -p  /docker/rocketmq/data/namesrv/logs   /docker/rocketmq/data/namesrv/store
    
    • 1
    构建namesvr容器
    #构建namesrv容器
    docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876  -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
    
    • 1
    • 2
    启动broker
    docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf
    
    • 1
    安装控制台
    docker pull pangliang/rocketmq-console-ng
    
    • 1
    控制台
    docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.5.130:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng
    
    • 1

    核心的概念

    • 消息(Message)
      消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
    • 队列(Queue)
      存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
    • 主题(Topic)
      Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位
    • 标签(Tag)
      为消息设置的标签,用于同一主题下区分不同类型的消息。
    • Producer
      消息生产者,负责一般由业务系统。
    • Consumer
      消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务
      处理。

    项目构建

    目录结构

    目录结构

    依赖引入
      <!--rocketmq-->
          
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.2.2
          
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    消息提供者
    yml配置
    server:
      port: 8022
    #应用名字
    spring:
      application:
        name: RocketMqOrderProvider
    #rocketmq配置
    rocketmq:
      name-server: 192.168.5.130:9876
      producer:
        group: order-producer-group
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    模板发送消息
    @RestController
    public class MqOrderController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        @RequestMapping("/sendOrderMessage")
        public String sendOrderMessage() {
            Order order = new Order();
            order.setOrderId(1);
            order.setOrderNo(1);
            order.setProductId(1);
            order.setUserId(1);
            order.setOrderNum(5);
            order.setOrderAmt(new BigDecimal("100.0"));
            order.setOrderStatus("下单");
            order.setPayStatus("未支付");
            order.setCreateUser("elite");
            order.setCreateTime(LocalDateTime.now());
            rocketMQTemplate.syncSend("order-topic",order.toString(),6000);
            return order.toString();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    消息接收者
    yml配置
    server:
      port: 8023
    #应用名字
    spring:
      application:
        name: RocketMqOrderConsumer
    rocketmq:
      name-server: 192.168.5.130:9876
      producer:
        group: order-consumer-group
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    消息监听
    @Component
    @RocketMQMessageListener(topic = "order-topic",consumerGroup = "my-consumer-group")
    @Slf4j
    public class ConsumerListener implements RocketMQListener<String> {
        @Override
        public void onMessage(String orderInfo) {
            System.out.println(orderInfo);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    测试连接不上服务问题

    See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6288]ms, Topic: order-topic, BrokersSent: [2e116b166e4d, 2e116b166e4d, 2e116b166e4d]
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause
    
    org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.17.0.4:10911 failed
    	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:407) ~[rocketmq-remoting-4.9.3.jar:4.9.3]
    
    • 1
    • 2
    • 3
    • 4
    • 5

    问题排查中。。。

  • 相关阅读:
    同花顺_代码解析_技术指标_Z_1
    tcpdump抓包
    Oracle修改数据之后提交事务如何回滚?
    【解题报告】CF练一下题 | 难度CF2500左右
    SpringCloudGateway--谓词(断言)
    阿里邮箱/网易邮箱个人版设置POP3使用
    洛谷题单 Part 2.5 贪心
    一、nginx自定义路径和模块安装[源码目录结构、自定义configure配置路径和模块]
    神舟电脑怎么清理缓存文件?介绍几种简单有效方法
    try catch finally代码块的作用
  • 原文地址:https://blog.csdn.net/qq_37400096/article/details/126679607