• SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息


    安装消息中间件

    Windows安装ErLang

    https://github.com/erlang/otp/releases/tag/OTP-25.0
    在这里插入图片描述

    在这里插入图片描述

    Windows安装RabbitMq

    https://www.rabbitmq.com/install-windows.html
    在这里插入图片描述
    在这里插入图片描述

    安装RabbitMq UI界面

    在这里插入图片描述
    打开RabbitMQ Command Prompt 进入命令行

    # 查看mq服务状态
    rabbitmqctl.bat status
    
    • 1
    • 2

    在这里插入图片描述

    # 安装ui界面
    rabbitmq-plugins enable rabbitmq_management
    
    • 1
    • 2

    在这里插入图片描述

    访问http://localhost:15672/
    默认账号密码guest/guest
    在这里插入图片描述

    安装延时消息插件

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
    在这里插入图片描述
    将 ez文件拷贝到安装目录rabbitmq_server-3.10.2\plugins下

    # 安装插件
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 1
    • 2

    在这里插入图片描述

    SpringBoot整合

    这里我直接用我先前建好的微服务
    order-service作为消息发送者,storage-service作为消息接收者
    在这里插入图片描述
    在这里插入图片描述

    消息发送端order-service

    添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    application.yml

    spring:
      rabbitmq:
        username: guest
        password: guest
        host: 127.0.0.1
        port: 5672
        # 消息确认(ACK)
        publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    RabbitMqConfig

    package top.fate.config;
    
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 11:41
     */
    @Configuration
    public class RabbitMqConfig {
    
        private static final Logger LOG = LogManager.getLogger();
    
        public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
        public static final String DIRECT_EXCHANGE = "direct_exchange"; //交换器名称
        public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键
    
        public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称
        public static final String DELAY_EXCHANGE = "delay_exchange"; //交换器名称
        public static final String DELAY_ROUTING_KEY = "delay_routing_key"; //路由键
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
        {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
    
            //设置Json转换器
            rabbitTemplate.setMessageConverter(jsonMessageConverter());
    
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            //确认消息送到交换机(Exchange)回调
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
            {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause)
                {
                    LOG.info("\n确认消息送到交换机(Exchange)结果:");
                    LOG.info("相关数据:" + correlationData);
                    LOG.info("是否成功:" + ack);
                    LOG.info("错误原因:" + cause);
                }
            });
    
            //确认消息送到队列(Queue)回调
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
            {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage)
                {
                    LOG.info("\n确认消息送到队列(Queue)结果:");
                    LOG.info("发生消息:" + returnedMessage.getMessage());
                    LOG.info("回应码:" + returnedMessage.getReplyCode());
                    LOG.info("回应信息:" + returnedMessage.getReplyText());
                    LOG.info("交换机:" + returnedMessage.getExchange());
                    LOG.info("路由键:" + returnedMessage.getRoutingKey());
                }
            });
            return rabbitTemplate;
        }
    
        /**
         * Json转换器
         */
        @Bean
        public Jackson2JsonMessageConverter jsonMessageConverter()
        {
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         * Direct交换器
         */
        @Bean
        public DirectExchange directExchange()
        {
            /**
             * 创建交换器,参数说明:
             * String name:交换器名称
             * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
             * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
             * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
             */
            return new DirectExchange(DIRECT_EXCHANGE, true, false);
        }
    
        /**
         * 队列
         */
        @Bean
        public Queue directQueue()
        {
            /**
             * 创建队列,参数说明:
             * String name:队列名称。
             * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
             * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
             * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
             * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
             * 当没有生产者或者消费者使用此队列,该队列会自动删除。
             * Map<String, Object> arguments:设置队列的其他一些参数。
             */
            return new Queue(DIRECT_QUEUE, true, false, false, null);
        }
    
        /**
         * 绑定
         */
        @Bean
        Binding directBinding(DirectExchange directExchange, Queue directQueue)
        {
            //将队列和交换机绑定, 并设置用于匹配键:routingKey路由键
            return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
        }
    
        /******************************延时队列******************************/
    
        @Bean
        public CustomExchange delayExchange()
        {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
        }
    
        @Bean
        public Queue delayQueue()
        {
            Queue queue = new Queue(DELAY_QUEUE, true);
            return queue;
        }
    
        @Bean
        public Binding delaybinding(Queue delayQueue, CustomExchange delayExchange)
        {
            return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159

    实体对象

    package top.fate.entity;
    
    import lombok.Data;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:26
     */
    @Data
    public class TestEntity {
    
        private String username;
        private String password;
    
        public TestEntity(String username, String password) {
            this.username = username;
            this.password = password;
        }
    
        public TestEntity() {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    生产者服务接口

    package top.fate.service;
    
    import java.util.Map;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:29
     */
    public interface ProducerService {
    
        /**
         * 发送json格式数据
         *
         * @param o
         */
        void sendTestJson(Object o);
    
        /**
         * 延时发送map格式数据
         *
         * @param map
         */
        void sendDelayTestMap(Map map);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    生产者服务实现类

    package top.fate.service.impl;
    
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import top.fate.config.RabbitMqConfig;
    import top.fate.service.ProducerService;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Map;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:30
     */
    @Service
    public class ProducerServiceImpl implements ProducerService {
    
        private static final Logger LOG = LogManager.getLogger();
    
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送json格式数据
         *
         * @param o
         */
        @Override
        public void sendTestJson(Object o) {
    
            rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY, o);
            LOG.info("json格式的数据发送成功 发送时间为" + formatter.format(new Date()));
        }
    
        /**
         * 延时发送map格式数据
         *
         * @param map
         */
        @Override
        public void sendDelayTestMap(Map map) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.DELAY_ROUTING_KEY, map, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
    
                    message.getMessageProperties().setHeader("x-delay", 5000);
    
                    return message;
                }
            });
            LOG.info("map格式的数据发送成功 发送时间为" + formatter.format(new Date()));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    测试Controller

    package top.fate.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import top.fate.entity.TestEntity;
    import top.fate.service.ProducerService;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:43
     */
    @RestController
    @RequestMapping(value = "producer")
    public class ProducerController {
    
        @Autowired
        private ProducerService producerService;
    
        @GetMapping("sendObject")
        public void sendObject(){
            producerService.sendTestJson(new TestEntity("user","123456"));
        }
    
        @GetMapping("sendMap")
        public void sendMap(){
            Map map = new HashMap();
            map.put("user1",new TestEntity("user1","123"));
            map.put("user2",new TestEntity("user2","123"));
            map.put("user3",new TestEntity("user3","123"));
            producerService.sendDelayTestMap(map);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    消息接收端storage-service

    添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    RabbitMqConfig

    package top.fate.config;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import top.fate.service.impl.AckReceiver;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:46
     */
    @Configuration
    public class RabbitMqConfig {
    
        public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
        public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称
    
        /**
         * 消息接收确认处理类
         */
        @Autowired
        private AckReceiver ackReceiver;
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        /**
         * 客户端配置
         * 配置手动确认消息、消息接收确认
         */
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer()
        {
            //消费者数量,默认10
            int DEFAULT_CONCURRENT = 10;
    
            //每个消费者获取最大投递数量 默认50
            int DEFAULT_PREFETCH_COUNT = 50;
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setConcurrentConsumers(DEFAULT_CONCURRENT);
            container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);
    
            // RabbitMQ默认是自动确认,这里改为手动确认消息
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            //添加队列,可添加多个队列
            container.addQueues(new Queue(DIRECT_QUEUE,true));
            container.addQueues(new Queue(DELAY_QUEUE,true));
    
            //设置消息处理类
            container.setMessageListener(ackReceiver);
    
            return container;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    消息接收接口

    package top.fate.service;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    
    import java.io.IOException;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:52
     */
    public interface ConsumerReceiver {
    
        void receiverJson(Message message, Channel channel) throws IOException;
    
        void receiverMap(Message message, Channel channel) throws IOException;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消息接收实现类

    package top.fate.service.impl;
    
    import com.alibaba.fastjson.JSON;
    import com.rabbitmq.client.Channel;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.springframework.amqp.core.Message;
    import org.springframework.stereotype.Service;
    import top.fate.entity.TestEntity;
    import top.fate.service.ConsumerReceiver;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:54
     */
    @Service
    public class ConsumerReceiverImpl implements ConsumerReceiver {
    
    
        private static final Logger LOG = LogManager.getLogger();
    
        @Override
        public void receiverJson(Message message, Channel channel) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try
            {
                //将JSON格式数据转换为实体对象
                TestEntity testEntity = JSON.parseObject(message.getBody(), TestEntity.class);
    
                LOG.info("接收者收到JSON格式消息:");
                System.out.println("账号:" + testEntity.getUsername());
                System.out.println("密码:" + testEntity.getPassword());
    
                /**
                 * 确认消息,参数说明:
                 * long deliveryTag:唯一标识 ID。
                 * boolean multiple:是否批处理,当该参数为 true 时,
                 * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
                 */
                channel.basicAck(deliveryTag, true);
    
                /**
                 * 否定消息,参数说明:
                 * long deliveryTag:唯一标识 ID。
                 * boolean multiple:是否批处理,当该参数为 true 时,
                 * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
                 * boolean requeue:如果 requeue 参数设置为 true,
                 * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
                 * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
                 * 而不会把它发送给新的消费者。
                 */
                //channel.basicNack(deliveryTag, true, false);
            }
            catch (Exception e)
            {
                /**
                 * 拒绝消息,参数说明:
                 * long deliveryTag:唯一标识 ID。
                 * boolean requeue:如果 requeue 参数设置为 true,
                 * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
                 * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
                 * 而不会把它发送给新的消费者。
                 */
                channel.basicReject(deliveryTag, false);
    
                e.printStackTrace();
            }
        }
    
        @Override
        public void receiverMap(Message message, Channel channel) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try
            {
                //将JSON格式数据转换为Map对象
                Map map = JSON.parseObject(message.getBody(), Map.class);
    
                LOG.info("接收者收到Map格式消息:");
    
                LOG.info(map.get("user1"));
                LOG.info(map.get("user2"));
                LOG.info(map.get("user3"));
    
                //确认消息
                channel.basicAck(deliveryTag, true);
    
                //否定消息
                //channel.basicNack(deliveryTag, true, false);
            }
            catch (Exception e)
            {
                //拒绝消息
                channel.basicReject(deliveryTag, false);
    
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    消息分发处理类

    package top.fate.service.impl;
    
    import com.rabbitmq.client.Channel;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import top.fate.config.RabbitMqConfig;
    import top.fate.service.ConsumerReceiver;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @auther:Wangxl
     * @Emile:18335844494@163.com
     * @Time:2022/5/26 14:50
     */
    @Service
    public class AckReceiver implements ChannelAwareMessageListener {
    
        private static final Logger LOG = LogManager.getLogger();
    
        /**
         * 用户消息接收类
         */
        @Autowired
        private ConsumerReceiver consumerReceiver;
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception
        {
            //时间格式
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            LOG.info("消息接收成功,接收时间:" + dateFormat.format(new Date()) + "\n");
    
            //获取队列名称
            String queueName = message.getMessageProperties().getConsumerQueue();
    
            //接收用户信息Json格式数据
            if (queueName.equals(RabbitMqConfig.DIRECT_QUEUE))
            {
                consumerReceiver.receiverJson(message, channel);
            }
    
            //延时接收用户信息Map格式数据
            if (queueName.equals(RabbitMqConfig.DELAY_QUEUE))
            {
                consumerReceiver.receiverMap(message, channel);
            }
            //多个队列的处理,则如上述代码,继续添加方法....
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    启动测试

    在这里插入图片描述
    项目启动的时候创建交换机绑定路由key,及创建队列
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    服务器配置openvpn,ssh连接断开
    Vue中组件化编码 完成任务的添加、删除、统计、勾选需求(实战练习三完结)
    微信小程序调起微信支付
    Elasticsearch:使用 Low Level Java 客户端来创建连接 - Elastic Stack 8.x
    PhiData 一款开发AI搜索、agents智能体和工作流应用的AI框架
    RFID技术助力汽车座椅生产
    [Unity独立/合作开发]实现背包系统中物品的拾取拖拽掉落还有换位置
    订水商城H5实战教程-02系统登录
    Kotlin实现微信界面切换(Fragment练习)
    C中的运算和数据类型
  • 原文地址:https://blog.csdn.net/weixin_43627706/article/details/124981001