• [Java Framework] [MQ] SpringBoot 集成RabbitMQ


    方法 / 步骤

    前置环境

    🧲RabbitMQ 安装和配置述

    • rabbitMQ安装成功默认channels
      在这里插入图片描述

    ⛳项目集成

    • pom 依赖
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • yml配置
    spring:
      rabbitmq:
        host: mws.com
        port: 5672
        virtual-host: /
        password: admin
        username: admin
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 启动项目 已经连接
      在这里插入图片描述

    二: 配置调试

    2.1 普通队列

    2.1.1 初始化绑定配置

    /**
     * Description:
     * 该种方式不用手动创建,在项目启动搜索不到交换机和queue会自动创建
     * @author: YangGC
     */
    @Configuration
    public class RabbitMQBinderConfig {
    
    
        public static final String TEST_SPRING_EXCHANGE = "test.spring.exchange";
        public static final String TEST_SPRING_QUEUE = "test.spring.queue";
        public static final String TEST_SPRING_QUEUE_ROUTING_KEY = "test.spring.routingKey";
        @Bean
        public CustomExchange testSpringExchange() {return new CustomExchange(TEST_SPRING_EXCHANGE, "direct", true, false, new HashMap<>(1));}
        @Bean
        public Queue testSpringQueue() { return new Queue(TEST_SPRING_QUEUE, false);}
        @Bean
        public Binding bindingNotify1(@Qualifier("testSpringQueue") Queue queue, @Qualifier("testSpringExchange") CustomExchange customExchange){
            return BindingBuilder.bind(queue).to(customExchange).with(TEST_SPRING_QUEUE_ROUTING_KEY).noargs();}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2.1.2 服务发送

    public class ShoppingMallConsumerApplication implements CommandLineRunner {
        public static void main(String[] args) {
            SpringApplication.run(ShoppingMallConsumerApplication.class);
        }
        
        @Resource
        private RabbitTemplate rabbitTemplate;
        
        @Override
        public void run(String... args) throws Exception {
    
            /**
             * rabbitTemplate.convertAndSend 就是根据交换机和queue的不同类型实现不同的路由功能
             * 发送fanout(广播) / routing / topics 消息
             */
            // 只发队列消息 默认交换机类型为 direct 交换机的名称为空/路由键与队列同名
            rabbitTemplate.convertAndSend("test.spring.queue", "只发队列spring_queue的消息。");
            //rabbitTemplate.convertAndSend("test.spring.fanout.exchange", "", "spring fanout...");
           //rabbitTemplate.convertAndSend("test.spring.routing.exchange", "", "spring fanout...");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.1.2 服务监听消费

    @Component
    public class RMQConsumer {
    
        @RabbitListener(queues = RabbitMQBinderConfig.TEST_SPRING_QUEUE)
        public void receiveD2(Message message, Channel channel) throws IOException {
            try {
                String format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
                String msg = new String(message.getBody(), "utf-8");
                System.out.printf(format + " 接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
                        message.getMessageProperties().getReceivedExchange(),
                        message.getMessageProperties().getReceivedRoutingKey(),
                        message.getMessageProperties().getConsumerQueue(),
                        msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 收到消息
      在这里插入图片描述

    2.2 延时队列

    2.2.1 初始化绑定配置

    /**
     * Description:
     * 该种方式不用手动创建,在项目启动搜索不到交换机和queue会自动创建
     * @author: YangGC
     */
    @Configuration
    public class RabbitMQBinderConfig {
        /**
         * 延迟队列绑定器
         */
        public static final String TEST_SPING_DELAYED_EXCHANGE = "test.spring.delayed.exchange";
        public static final String TEST_SPING_DELAYED_QUEUE = "test.spring.delayed.queue";
        public static final String TEST_SPING_DELAYED_ROUTING_KEY = "test.spring.delayed.queue";
    
        @Bean
        public CustomExchange testSpringDelayedExchange() {
            // 延迟插件新增的 延迟交换机-类型参数
            Map<String, Object> args = new HashMap<>(1);
            args.put("x-delayed-type", "direct");
            return new CustomExchange(TEST_SPING_DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
        }
        @Bean
        public Queue testSpringDelayedQueue() { return new Queue(TEST_SPING_DELAYED_QUEUE, true);}
        @Bean
        public Binding bindingDelayedNotify1(@Qualifier("testSpringDelayedQueue") Queue queue, @Qualifier("testSpringDelayedExchange") CustomExchange customExchange){
            return BindingBuilder.bind(queue).to(customExchange).with(TEST_SPING_DELAYED_ROUTING_KEY).noargs();}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.2.2 发送延迟消息

    public class ShoppingMallConsumerApplication implements CommandLineRunner {
        public static void main(String[] args) {
            SpringApplication.run(ShoppingMallConsumerApplication.class);
        }
    
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Override
        public void run(String... args) throws Exception {
            /**
             * 发送延迟消息
             */
            rabbitTemplate.convertAndSend(RabbitMQBinderConfig.TEST_SPING_DELAYED_EXCHANGE, RabbitMQBinderConfig.TEST_SPING_DELAYED_ROUTING_KEY, String.valueOf("发送10s延迟消息"), val -> {
                val.getMessageProperties().setDelay(10 * 1000);
                System.out.println("............");
                return val;
            });
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    2.2.3 服务监听消费

    @Component
    public class RMQConsumer {
        @RabbitListener(queues = RabbitMQBinderConfig.TEST_SPING_DELAYED_QUEUE)
        public void receiveSpringDelayedQueue(Message message, Channel channel) throws IOException {
            try {
                String format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now());
                String msg = new String(message.getBody(), "utf-8");
                System.out.printf(format + " 接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
                        message.getMessageProperties().getReceivedExchange(),
                        message.getMessageProperties().getReceivedRoutingKey(),
                        message.getMessageProperties().getConsumerQueue(),
                        msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

    三: 业务使用

    3.1 开启消息确认

    如果消费一条消息如果不确认,将会是unacked状态,然后过期后将会又重新入队

    • default-requeue-rejected: false 不使用中间件的自动入列 (对应新手业务使用不当容易造成死循环),手动处理自动入列
    • 配置
        listener:
          type: simple
          simple:
            # 监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true(会重新放回队列) false(不会放回队列)
            default-requeue-rejected: false
            # manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。 none意味着没有任何的应答会被发送。 auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.1.1 服务监听消费

    • basicAck 方法

    参数说明:

    • long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
    • boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
      注意业务书写规范,
        @RabbitListener(queues = RabbitMQBinderConfig.TEST_SPING_DELAYED_QUEUE)
        public void receiveSpringDelayedQueue(Message message, Channel channel) throws IOException {
            try {
    			int i = 12/0;
            } catch (Exception e) {
                //todo 放入死信队列 或者添加告警业务
                e.printStackTrace();
            }
            //deliveryTag :每次消费或者重新投递requeue后,delivery_tag都会增加
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这里插入图片描述

    3.2 开启消息Nack (非确认处理)

    • basicNack 方法
      basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
    
    • 1

    参数说明:

    • long deliveryTag:唯一标识 ID。
    • boolean multiple:上面已经解释。
    • boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。

    参考资料 & 致谢

  • 相关阅读:
    Spring AOP(JavaEE进阶系列5)
    iOS 16.1新功能尝鲜:如何在iPhone上启用实时活动?
    抖音小店无货源,新手玩家应该怎么去操作?建议收藏
    javascript之for循环介绍
    使用Docker搭建ELK,并与SpringBoot集成
    .com和.cn有什么区别?
    4.1提出问题&4.2拉格朗日插值
    适用于初学者的 .NET MAUI
    闲聊最近招聘面试
    模糊神经网络算法matlab,模糊神经网络算法原理
  • 原文地址:https://blog.csdn.net/YangCheney/article/details/126143487