• SpringBoot整合RabbitMQ


    在这里插入图片描述

    使用RabbitMQ

    • 1、引入amqp场景;RabbitAutoConfiguration就会自动生效
    • 2、给容器中自动配置了RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitNessagingTemplate
    • 3、@EnabLeRabbit: @EnableXxXxx、cachingconnectionFactory

    1.导入依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4

    2.启动MQ(启动类添加@EnableRabbit)

    /**
     *使用RabbitMQ
     *1、引入amqp场景;RabbitAutoConfiguration就会自动生效*
     *2、给容器中自动配置了*
     RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate;
     *
     所有的属性都是spring.rabbitmq
     *
     @configurationProperties(prefix = "spring.rabbitmq")
      *
     pubLic class RabbitProperties
      *
      *3、给配置文件中配置spring.rabbitmq信息*4J @EnableRabbit : @EnableXxxXx;开启功能
     */
    @EnableRabbit
    @EnableDiscoveryClient // 注册
    @SpringBootApplication
    public class GulimallOrderApplication {
        public static void main(String[] args) {
            SpringApplication.run(GulimallOrderApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3.配置application.properties

    spring.rabbitmq.host=192.168.56.10
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    
    • 1
    • 2
    • 3

    4.测试类,创建交换机、队列、绑定

    /**
         *1、如何创建Exchange、Queue、 Binding
         1)、使用AmqpAdmin进行创建
         *2、如何收发消息
         */
        @Autowired
        AmqpAdmin amqpAdmin;
    
        @Test
        void contextLoads() {
            // 创建交换机
            DirectExchange exchange = new DirectExchange("hello-java-exchange",true,false);
            amqpAdmin.declareExchange(exchange);
            log.info("Exchange[{}]创建成功","hello-java-exchange");
        }
    
        @Test
        void createQueue() {
            // 创建队列
            Queue queue = new Queue("hello-java-queue",true,false,false);
            amqpAdmin.declareQueue(queue);
            log.info("Queue[{}]创建成功","hello-java-queue");
        }
    
        @Test
        void createBinding() {
            // 交换机绑定队列
            // (String destination【目的地-队列】,
            //DestinationType destinationType【目的地类型】,/l /String exchange【交换机】,
            //String routingKey【路由键】,
            // Map<String, object> arguments【自定义参数】)
            //将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
            Binding binding = new Binding(
                    "hello-java-queue",
                    Binding.DestinationType.QUEUE,
                    "hello-java-exchange",
                    "hello.java",
                    null);
            amqpAdmin.declareBinding(binding);
            log.info("[{}]绑定成功","hello.java");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    5.测试如何向mq中发消息

    5.1 发送字符串

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    void sendMessageTest() {
        // 发消息给交换机
        String msg = "hello rabbitmq";
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
        log.info("消息发送完成{}",msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述

    5.2 发送对象

    1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现serializable

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Test
    void sendMessageTest() {
        // 发消息给交换机
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setCommentTime(new Date());
        orderEntity.setDeliveryCompany("阿里巴巴");
        orderEntity.setId(1L);
        
        String msg = "hello rabbitmq";
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
        log.info("消息发送完成{}",msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    2、发送的对象消息,可以是json

    • 自定义消息转换策略
    package com.atguigu.gulimall.order.config;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MyRabbitConfig {
        @Bean
        public MessageConverter messageConverter(){
         return new Jackson2JsonMessageConverter();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 发送消息
     @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        void sendMessageTest() {
        // 发消息给交换机
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setCommentTime(new Date());
        orderEntity.setDeliveryCompany("晚安,玛卡巴卡");
        orderEntity.setId(1L);
    
        String msg = "hello rabbitmq";
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
        log.info("消息发送完成{}",msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    6.测试如何向mq中接收消息

    查看端口占用情况:

    # 查看所有端口使用情况
    netstat -ano
    # 查看某个端口使用情况
    netstat -ano|findstr 9000
    # 查看进程
    tasklist|findstr 19796(进程id)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    @RabbitListener(注解):监听指定的队列
    使用监听接口,必须先启动RabbitMQ
    在这里插入图片描述

    收发消息,创建组件,不需要EnableMQ,监听必须 EnabLeRabbit: @EnableXxxxx;开启xxxx功能,监听消息:使用@RabbitListener;必须有@EnableRabbit

    • 假如现在有一个业务(order订单业务)需要监听消息,@RabbitListener可在在类和方法上使用,但是这个业务类必须要在容器中

    在这里插入图片描述

    接收消息版本1
    1. 在业务类中,写接收消息代码
    2. 启动业务类
    
    • 1
    • 2
       /**
         * 声明需要监听的所有队列
         * */
        @RabbitListener(queues = {"hello-java-queue"})
        public void reciveMessage(Object message){
            System.out.println("接收到消息...内容:"+message+"===>类型"+message.getClass());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1测试类发送消息Object
     @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        void sendMessageTest() {
            // 发消息给交换机
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setCommentTime(new Date());
            orderEntity.setDeliveryCompany("晚安,玛卡巴卡");
            orderEntity.setId(1L);
    
            String msg = "hello rabbitmq";
            rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity);
            log.info("消息发送完成{}",msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    一个客户端建立一个连接

    在这里插入图片描述

    消息在通道中传输

    在这里插入图片描述

    2业务接收消息
    接收到消息...内容:
    (Body:'{"id":1,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,
    "deliveryCompany":"晚安,玛卡巴卡","deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":1641523003050,"modifyTime":null}' 
    MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderEntity}, 
    contentType=application/json, 
    contentEncoding=UTF-8,
    contentLength=0, receivedDeliveryMode=PERSISTENT, 
    priority=0, 
    redelivered=false,
    receivedExchange=hello-java-exchange, 
    receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-5DTRiiCKDNSGNQAvgKmPQw, consumerQueue=hello-java-queue])
    ===>类型class org.springframework.amqp.core.Message
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    接收消息版本2,使用原生消息Message

     /**
         * 声明需要监听的所有队列
         * org .springframeworl. amqp.core.MNessage参数可以写一下类型
         * 1、Message message:原生消息详细信息。头+体
         * 2、T<发送的消息的类型>orderReturnReasonEntity content;零3、Channel channei:当前传输数据的通道
         * */
        @RabbitListener(queues = {"hello-java-queue"})
        public void reciveMessage(Message message,
                                  OrderEntity content,
                                  Channel channel){
            byte[] body = message.getBody();
            System.out.println(body);
            // 消息头属性信息
            MessageProperties messageProperties = message.getMessageProperties();
            System.out.println(messageProperties);
            System.out.println("接收到消息...内容:"+message+"===>类型:"+content);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    [B@31c5c73a
    MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderEntity}, 
    contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, 
    receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-5ORg0PCf2iEeuR_Rdg2UIA, consumerQueue=hello-java-queue]
    
    接收到消息...内容:(Body:'{"id":1,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":"晚安,玛卡巴卡","deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":1641536393492,"modifyTime":null}'
    MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-5ORg0PCf2iEeuR_Rdg2UIA, consumerQueue=hello-java-queue])
    ===>类型:OrderEntity(id=1, memberId=null, orderSn=null, couponId=null, createTime=null, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=晚安,玛卡巴卡, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=Fri Jan 07 14:19:53 CST 2022, modifyTime=null)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息场景:

    • 1)、订单服务客户端启动多个;同一个消息,只能有一个客户端收到
    • 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
    1. 复制项目配置(同一份代码,启动为两个项目)

    2. 选择项目,编辑配置
      在这里插入图片描述

    3. 选择项目进行复制
      在这里插入图片描述

    4. 对复制项目进行简单配置(–server.port=9091=等号两边不要写空格,不然默认端口号8080
      在这里插入图片描述

    5. 测试发送10条消息,观察两个订单处服务理消息的结果

    @RestController
    public class RabbitController {
    
     @Autowired
     private RabbitTemplate rabbitTemplate;
    
     @GetMapping("/sendMQ1")
     public String sendMQ1(@RequestParam(value = "num", required = false, defaultValue = "10") Integer num){
    
      OrderEntity entity = new OrderEntity();
      entity.setId(1L);
      entity.setCommentTime(new Date());
      entity.setCreateTime(new Date());
      entity.setConfirmStatus(0);
      entity.setAutoConfirmDay(1);
      entity.setGrowth(1);
      entity.setMemberId(12L);
    
      OrderItemEntity orderEntity = new OrderItemEntity();
      orderEntity.setCategoryId(225L);
      orderEntity.setId(1L);
      orderEntity.setOrderSn("mall");
      orderEntity.setSpuName("华为");
      for (int i = 0; i < num; i++) {
        orderEntity.setOrderSn("mall-" + i);
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java", orderEntity);
      }
      return "ok";
     }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    1. 访问:http://localhost:9011/sendMQ1

    结果:消息被接收后就被销毁了,不会被重复接收

    在这里插入图片描述
    在这里插入图片描述

    订单业务拿到消息,可能需要一点处理时间(比如减库存操作),我们让消息处理延迟3秒(模拟下订单操作),发现,消息是被一个一个的有序被处理完

    在这里插入图片描述

    7.@RabbitListener:标在类+方法上(监听哪些队列即可)@RabbitHandler:标在方法上(重载区分不同的消息)

    使用场景:比如我们接收到多种类型的消息

    消息分类发送

    @GetMapping("/sendMQ")
     public String sendMQ(@RequestParam(value = "num", required = false, defaultValue = "10") Integer num){
    
      OrderEntity entity = new OrderEntity();
      entity.setId(1L);
      entity.setCommentTime(new Date());
      entity.setCreateTime(new Date());
      entity.setConfirmStatus(0);
      entity.setAutoConfirmDay(1);
      entity.setGrowth(1);
      entity.setMemberId(12L);
    
      OrderItemEntity orderEntity = new OrderItemEntity();
      orderEntity.setCategoryId(225L);
      orderEntity.setId(1L);
      orderEntity.setOrderSn("mall");
      orderEntity.setSpuName("华为");
      for (int i = 0; i < num; i++) {
       if(i % 2 == 0){
        entity.setReceiverName("FIRE-" + i);
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity, new CorrelationData(UUID.randomUUID().toString().replace("-","")));
       }else {
        orderEntity.setOrderSn("mall-" + i);
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity, new CorrelationData(UUID.randomUUID().toString().replace("-","")));
        // 测试消息发送失败
    //				rabbitTemplate.convertAndSend(this.exchange, this.routeKey + "test", orderEntity);
       }
      }
      return "ok";
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    消息分类接收

    在这里插入图片描述

    @RabbitListener(queues = {"hello-java-queue"})
    @Service("orderItemService")
    public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
    
        @Override
        public PageUtils queryPage(Map<String, Object> params) {
            IPage<OrderItemEntity> page = this.page(
                    new Query<OrderItemEntity>().getPage(params),
                    new QueryWrapper<OrderItemEntity>()
            );
    
            return new PageUtils(page);
        }
        /**
         * 声明需要监听的所有队列
         * org .springframeworl. amqp.core.MNessage参数可以写一下类型
         * 1、Message message:原生消息详细信息。头+体
         * 2、T<发送的消息的类型>orderReturnReasonEntity content;零3、Channel channei:当前传输数据的通道
         * */
        @RabbitHandler
        public void reciveMessage(Message message,
                                  OrderEntity content,
                                  Channel channel){
            // 消息头属性信息
            System.out.println("接收到消息1...内容:"+message+"===>类型:"+content);
        }
    
        @RabbitHandler
        public void reciveMessage2(Message message,
                                  OrderItemEntity content,
                                  Channel channel){
            // 消息头属性信息
            System.out.println("接收到消息2...内容:"+message+"===>类型:"+content);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    结果

    在这里插入图片描述

  • 相关阅读:
    Flume 整合 Kafka
    在CentOS上安装Docker引擎
    {} >= {} 返回 true
    图解网络(三)——TCP篇01
    应用程序通过 Envoy 代理和 Jaeger 进行分布式追踪 —— Ingress Controller + Http服务 + Grpc服务(三)
    风控——利用决策树挖掘策略规则
    基于PHP+MySQL高校毕业设计管理系统的设计与实现
    Rasa-笔记
    递增删除图象
    HHDBCS监控功能
  • 原文地址:https://blog.csdn.net/shujuku____/article/details/125455340