• rabbitmq


    用途

    • 流量削峰

    最大处理量如果是一秒一万条订单,一秒钟来了两万条,可以先存在消息队列里面,按照能力去消费处理

    • 应用解耦

    下单后,需要去调用很多其他系统,使用我们的发布订阅,让需要接受这条消息的服务监听这个queue

    • 异步处理

    在我们一些需要异步调用的场景中,回调

    核心概念

    生产者
    交换机(需要重点理解)接受生产者的消息,并按照规则推到队列里面,这些规则的配置可以实现不同场景的需求
    队列
    消费者

    安装

    docker

    docker run -d -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq --hostname=rabbitmqhostone  rabbitmq:management
    

    3.8.8 https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8
    22.3 https://www.erlang-solutions.com/downloads/

    # 安装erlang
    rpm -ivh esl-erlang_22.3.1-1_centos_7_amd64.rpm
    
    warning: esl-erlang_22.3.1-1_centos_7_amd64.rpm: Header V4 RSA/SHA1 Signature, key ID a14f4fca: NOKEY
    error: Failed dependencies:
    执行以下命令:
    
    yum install epel-release
    yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
    
    yum install socat -y
    
    #安装RabbitMQ
    rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
    
    #添加开机启动 RabbitMQ 服务
    chkconfig rabbitmq-server on
    
    #启动服务
    /sbin/service rabbitmq-server start
    
    #查看服务状态
    /sbin/service rabbitmq-server status
    
    #停止服务(选择执行)
    /sbin/service rabbitmq-server stop
    
    #开启 web 管理插件,rabbitmq 默认不开启
    rabbitmq-plugins enable rabbitmq_management
    
    # 现在登录如果使用ip是无法登录的
    # 添加配置文件,去掉 ip 限制
    cd /etc/rabbitmq
    
    vim rabbitmq-env.conf
    #  Specifies new style config file location
    CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf
    
    vim rabbitmq.conf
    
    loopback_users = none
    
    /sbin/service rabbitmq-server restart
    
    #创建账号
    rabbitmqctl add_user admin 123
    
    #设置用户角色
    rabbitmqctl set_user_tags admin administrator
    
    #设置用户权限
    # set_permissions [-p ]    
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    #户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
    
    #当前用户和角色
    rabbitmqctl list_users  
    
    # 关闭防火墙
    # 查看防火墙状态: 
    systemctl status firewalld.service
    
    # 关闭防火墙
    systemctl stop firewalld.service
    
    # 开机禁用防火墙
    systemctl disable firewalld.service
    

    hello world

    还是国际惯例,咱们来一个 hello world,实现的功能也很简单,创建一个生产者,发送一条 hello world 的消息,再创建一个 消费者,消费这条消息,并在控制台打印

    我们创建一个 maven 的简单项目,后面再去整合 SpringBoot, 只需要引入两个依赖

     <dependency>
                <groupId>com.rabbitmqgroupId>
                <artifactId>amqp-clientartifactId>
                <version>5.8.0version>
            dependency>
    
            <dependency>
                <groupId>commons-iogroupId>
                <artifactId>commons-ioartifactId>
                <version>2.6version>
            dependency>
    

    创建一个生产者

    /**
     * 生产者:发消息
     */
    public class Producer {
        // 队列名称
        public static final String QUEUE_NAME = "hello";
    
        // 发消息
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 工厂 IP 连接 RabbitMQ 的队列
            factory.setHost("172.16.0.28");
            // 用户名
            factory.setUsername("admin");
            // 密码
            factory.setPassword("123");
            // 创建连接
            Connection connection = factory.newConnection();
            // 获取信道
            Channel channel =  connection.createChannel();
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化(默认 false,内存)
             * 3.该队列是否值供一个消费者进行消费,是否进行消费共享, true 可以多个消费者消费
             * 4.是否自动删除  最后一个消费者断开连接后 该队列是否自动删除 false 不自动删除
             * 5.其他参数
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 发消息
            String message = "hello world";
    
            /**
             * 发送一个消费
             * 1.发送到哪个交换机
             * 2.路由的 key 值是哪个,本次是队列的名称
             * 3.其他参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    
            System.out.println("消息发送完毕");
        }
    
    }
    

    消费者

    /**
     * 消费者
     */
    public class Consumer {
    
        // 队列名称
        public static final String QUEUE_NAME = "hello";
    
        // 接收消息
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 工厂 IP 连接 RabbitMQ 的队列
            factory.setHost("172.16.0.28");
            // 用户名
            factory.setUsername("admin");
            // 密码
            factory.setPassword("123");
            // 创建连接
            Connection connection = factory.newConnection();
            // 获取信道
            Channel channel =  connection.createChannel();
    
            // 声明 接收消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
            };
    
            // 声明 取消消息的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("消息 消费被中断");
            };
    
            /**
             * 消费者消费消息:
             *  1。 消费哪个队列
             *  2. 消费成功后是否要自动应答,true 代表自动应答, false 代表手动应答
             *  3。消费者未成功消费的回调
             *  4。消费者取消消费的回调
             */
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        }
    }
    

    我们启动下


    我们来简单梳理下,在生产者中我们主要做的是,定义一个 队列,并往这个队列中发送消息,消费者中则是指定监听对应的 queue

    消息应答

    消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

    • 自动应答
    • 手动应答
      • Channel.basicAck(用于肯定确认)  RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
        • 是否批量应答

    批量应答的理解,不建议使用,可能会应答没有处理完的消息

    自动入队

    发生点在工作线程
    没有 ack 的消息会被重新放回队列被别的消费者消费

    文字说明,我们启动两个消费者,消费者c1,c2分别接收消息 m1, m2, 在c1 ack之前把c1关掉,这时m1会被c2重新消费

    /**
     * 消息在手动应答时是不丢失,放回队列中重新消费
     */
    public class Task02 {
    
        // 队列名称
        public static final  String TASK_QUEUE_NAME = "ack_queue";
    
        public static void main(String[] args) throws  Exception{
            Channel channel = RabbitMqUtils.getChannel();
    
            boolean durable = true;
    
            // 声明队列
            channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); // 解决中文编码
                System.out.println("生产者发送消息: " + message);
            }
        }
    }
    
    public class Work3 {
        // 队列名称
        public static final  String TASK_QUEUE_NAME = "ack_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("C1 等待接收消息处理时间较短");
    
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                // 沉睡 1 s
                SleepUtils.sleep(1);
                System.out.println("接收到消息: " + new String(message.getBody(), "UTF-8"));
                // 手动应答
                /**
                 * 1.消息的标记 tag
                 * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
    
            // 采用手动应答
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag)->{
                System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
            });
    
        }
    }
    
    public class Work4 {
        // 队列名称
        public static final String TASK_QUEUE_NAME = "ack_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
            System.out.println("C2 等待接收消息处理时间较长");
    
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                // 沉睡 30 s
                SleepUtils.sleep(30);
                System.out.println("接收到消息: " + new String(message.getBody(), "UTF-8"));
                // 手动应答
                /**
                 * 1.消息的标记 tag
                 * 2.是否批量应答 false 不批量应答信道中的消息 true: 批量
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
    
            // 采用手动应答
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
                System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
            });
    
        }
    }
    

    我们先启动 Task02 ,发送两条消息,后面分别启动 Work4, Work3,Work3 和 Work4根据轮训机制,会分别取到一条消息,然后再 ack 之前,我们把 Work4 关掉,会发现两条消息都被 Work3 消费了

    持久化

    这里的处理发生在 ,生产者发送消息的时候
    需要分别设置队列和消息的持久化


    这里存在一种情况,消息在落盘之前 宕机了,消息也会丢失,后面会讲到处理方式(需要发布确认)

    发布确认

    这一小节来处理上一小节提出的问题,确保消息能被发布
    发布确认总共有三种策略,下面我们我们分别说明,代码演示下,重点计算下每种策略所花的时间
    首先我们需要开启发布确认

    main 函数,下面我们分别写三个方法,分别实现 每种发布确认策略

    // 批量发消息的个数
        public static final int MESSAGE_COUNT = 1000;
    
        public static void main(String[] args) throws Exception {
    //        1. 单个确认
            publishMessageIndividually(); // 发布 1000个单独确认消息耗时 398ms
    //        2. 批量确认
    //        publishMessageBatch(); // 发布 1000个批量确认消息耗时 69ms
    //        3. 异步批量确认
    //        publishMessageAsync(); // 发布 1000个异步确认消息耗时 33ms
        }
    
    • 单个发布确认

    串行,一条消息发布确认后才可以开始下一条消息
    没发送一个消息调用一次 channel.waitForConfirms();

    // 单个确认
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //  队列的声明
    
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, true, false, false, null);
    
        // 开启发布确认
        channel.confirmSelect();
    
        // 开始时间
        long begin = System.currentTimeMillis();
    
        // 批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            // 单个消息马上进行发布确认
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息发送成功");
            }
        }
    
        long end = System.currentTimeMillis();
    
        System.out.println("发布 " + MESSAGE_COUNT + "个单独确认消息耗时 " + (end - begin) + "ms");
    }
    
    • 批量发布确认

    计算发送的消息,达到一定量之后调用一次 channel.waitForConfirms();
    本质上还是同步,而且会存在某些消息没有被发布的问题,这个实现其实个人感觉有点鸡肋

    // 批量发送确认
        public static void publishMessageBatch() throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            //  队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 开启发布确认
            channel.confirmSelect();
    
            // 开始时间
            long begin = System.currentTimeMillis();
    
            // 批量确认大小
            int batchSize = 100;
    
            // 未确认消息个数
    
            // 批量发消息 批量发布确认
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
    
                // 判断达到100条消息的时候,批量确认一次
                if (i % batchSize == 0) {
                    // 发布确认
                    channel.waitForConfirms();
                }
            }
    
            long end = System.currentTimeMillis();
    
            System.out.println("发布 " + MESSAGE_COUNT + "个批量确认消息耗时 " + (end - begin) + "ms");
        }
    
    • 异步发布确认


    这里是通过回调函数来异步确认

    // 异步发布确认
        public static void publishMessageAsync() throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            //  队列的声明
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 开启发布确认
            channel.confirmSelect();
    
            /**
             * 线程安全有序的一个哈希表 适用于高并发的情况下
             * 1. 轻松的将序号和消息进行关联
             * 2. 轻松批量删除条目 只要给到序号
             * 3. 支持高并发(多线程)
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms =
                    new ConcurrentSkipListMap<>();
    
            // 开始时间
            long begin = System.currentTimeMillis();
    
            // 准备消息的监听器 监听哪些消息成功了 哪些消息失败了
    
            // 消息确认成功 回调函数
            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                // 2. 删除已经确认的消息,剩下的就是未确认的消息
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                    confirmed.clear();
                } else {
                    outstandingConfirms.remove(deliveryTag);
                }
                System.out.println("确认的消息: " + deliveryTag);
            };
    
            // 消息确认失败 回调函数
            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                // 3. 打印一下未确认的消息都有哪些
                String message = outstandingConfirms.get(deliveryTag);
                System.out.println("未确认的消息是 " + message + "未确认的消息: " + deliveryTag);
            };
    
            /**
             * 1. 监听哪些消息成功了
             * 2. 监听哪些消息失败了
             */
            channel.addConfirmListener(ackCallback, nackCallback); // 异步通知
    
    
            // 批量发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
    
                // 1. 此处记录下所有要发送的消息 消息的总和
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
    
            }
    
            // 结束时间
            long end = System.currentTimeMillis();
    
            System.out.println("发布 " + MESSAGE_COUNT + "个异步确认消息耗时 " + (end - begin) + "ms");
        }
    

    有两个点需要说明

    1. channel.addConfirmListener(ackCallback, nackCallback); // 异步通知 这里添加回调函数
    2. ConcurrentSkipListMap 创建一个 并发集合,记录消息状态

    TODO 这里可以补充下哈,但还是感谢尚硅谷老师

    交换机

    这一小节会介绍几种常见交换机绑定队列的方式和几种常见交换机

    前面我们没有手动去指定交换机

    默认会给我们提供一个无名交换机

    类似的,如果我们没有给队列命名,我们采用的也就是临时队列

    绑定关系则是指的,路由与队列之间的映射关系

    下面我们来介绍不同类型的交换机

    fanout

    广播,会把接收到的消息 广播到它知道的所有队列中

    /**
     * 发消息
     */
    public class EmitLog {
    
        // 交换机名称
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            Scanner scanner = new Scanner(System.in);
    
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息: " + message);
            }
        }
    }
    
    /**
     * 消息接收
     */
    public class ReceiveLogs01 {
    
        // 交换机名称
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            // 声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 声明一个队列 临时队列
            // 队列的名称是随机的
            // 当消费者断开与队列的连接的时候 队列就自动删除
    
            String queueName = channel.queueDeclare().getQueue();
    
            /**
             * 绑定交换机与队列
             */
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
    
            // 接收消息
    
            // 消费者取消消息时回调接口
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("ReceiveLogs01接收到的消息:" + new String(message.getBody(), "UTF-8"));
            };
    
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    
    }
    
    /**
     * 消息接收
     */
    public class ReceiveLogs02 {
    
        // 交换机名称
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            // 声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 声明一个队列 临时队列
            // 队列的名称是随机的
            // 当消费者断开与队列的连接的时候 队列就自动删除
    
            String queueName = channel.queueDeclare().getQueue();
    
            /**
             * 绑定交换机与队列
             */
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
    
            // 接收消息
    
            // 消费者取消消息时回调接口
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("ReceiveLogs02接收到的消息:" + new String(message.getBody(), "UTF-8"));
            };
    
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
    

    我们可以看到,我们发送的m1和 m2,会被 两个队列全部接收

    direct

    发送的时候必须指定路由规则,exchange需要根据routingkey把消息发送给每一个匹配的queue
    如果多个队列具有相同的 routingkey,和 fanout 的情况就会类似

    public class DirectLog {
        // 交换机名称
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            Scanner scanner = new Scanner(System.in);
    
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息: " + message);
            }
        }
    }
    
    public class ReceiveLogsDirect01 {
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            // 声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 声明一个队列
            channel.queueDeclare("console", false, false, false, null);
    
            channel.queueBind("console", EXCHANGE_NAME, "info");
            channel.queueBind("console", EXCHANGE_NAME, "warning");
            
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("ReceiveLogsDirect01接收到的消息:" + new String(message.getBody(), "UTF-8"));
            };
    
            channel.basicConsume("console", true, deliverCallback, consumerTag -> {
            });
        }
    }
    
    public class ReceiveLogsDirect02 {
    
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            // 声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 声明一个队列
            channel.queueDeclare("disk", false, false, false, null);
    
            channel.queueBind("disk", EXCHANGE_NAME, "error");
    
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("ReceiveLogsDirect02接收到的消息:" + new String(message.getBody(), "UTF-8"));
            };
    
            channel.basicConsume("disk", true, deliverCallback, consumerTag -> {
            });
        }
    }
    

    这里在测试的时候,我们需要向不同的 routingKey 发消息,对应的消息就会根据  routingKey 进入到不同的队列

    topic

    可以理解为 是在 direct 的基础上加上了模糊匹配的规则,模糊匹配规则有如下两条

    • *可以代替一个单词
    • #可以替代零个或多个单词

    public class EmitLogTopic {
        // 交换机名称
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMqUtils.getChannel();
    
            Scanner scanner = new Scanner(System.in);
    
            /**
             * Q1-->绑定的是
             * 中间带 orange 带 3 个单词的字符串(*.orange.*)
             * Q2-->绑定的是
             * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
             * 第一个单词是 lazy 的多个单词(lazy.#)
             *
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
    
            bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
            for (Map.Entry<String, String> bindingKeyEntry :
                    bindingKeyMap.entrySet()) {
                String bindingKey =
                        bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
                        message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
    
        }
    }
    
    /**
     * 声明主题交换机 及相关队列
     *
     * 消费者 c1
     */
    public class ReceiveLogsTopic01 {
    
        // 交换机名称
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            // 声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // 声明一个队列
            String queueName = "Q1";
            channel.queueDeclare(queueName, false, false, false, null);
    
            channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody(), "UTF-8"));
                System.out.println("接收队列: " + queueName + " 绑定键: " + message.getEnvelope().getRoutingKey());
            };
    
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
    
    /**
     * 声明主题交换机 及相关队列
     * 

    * 消费者 c1 */ public class ReceiveLogsTopic02 { // 交换机名称 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明一个队列 String queueName = "Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody(), "UTF-8")); System.out.println("接收队列: " + queueName + " 绑定键: " + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }

    交换机和队列的声明方式

    基于注解和编程

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    }
    

    消息转换器

    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-xmlartifactId>
        <version>2.9.10version>
    dependency>
    
    @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }
    

    如果spring-boot-starter-web则无需重复引入

    可靠性

    生产者的可靠性

    重试机制
    spring:
      rabbitmq:
        connection-timeout: 1s # 设置MQ的连接超时时间
        template:
          retry:
            enabled: true # 开启超时重试机制
            initial-interval: 1000ms # 失败后的初始等待时间
            multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
            max-attempts: 3 # 最大重试次数
    

    阻塞重试,建议禁用
    验证方式,发送消息的时候把 rabbitmq 停用

    生产者消息确认机制

    publisher confirm->生产者把消息成功发送给了 exchange,ack 和 nack
    publisher return->exchange路由消息失败会触发
    如何开启

    spring:
      rabbitmq:
        publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
        publisher-returns: true # 开启publisher return机制
    
    @Test
        void testPublisherConfirm() {
            // 1. 创建 CorrelationData
            CorrelationData cd = new CorrelationData();
            // 2. 给 future 添加 ConfirmCallback
            cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
                @Override
                public void onFailure(Throwable ex) {
                    // 2.1 Future 异常 基本不会出现
                    log.error("send message fail", ex);
                }
    
                @Override
                public void onSuccess(CorrelationData.Confirm result) {
                    // 2.2 Future 接收到回执的处理逻辑,参数中的 result 就是回执内容
                    if (result.isAck()) {
                        log.info("发送消息成功,收到 ack");
                    } else {
                        log.error("发送消息失败,收到nack,reason: {}", result.getReason());
                    }
                }
            });
            // 3. 发送消息
            rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
        }
    
        @PostConstruct
        public void init(){
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returned) {
                    log.error("触发return callback,");
                    log.info("exchange: {}", returned.getExchange());
                    log.info("routingKey: {}", returned.getRoutingKey());
                    log.info("message: {}", returned.getMessage());
                    log.info("replyCode: {}", returned.getReplyCode());
                    log.info("replyText: {}", returned.getReplyText());
                }
            });
        }
    

    这个案例routingkey是匹配不到 queue 的,所有会返回 ack,然后触发 returnCallback

    生产建议

    不建议开启 publisher return ,最多仅仅开启 publisher confirm

    mq本身的可靠性

    数据持久化
    • 交换机持久化
    • 队列持久化
    • 消息持久化

    如果在开启持久化的同时开启 ack,会在持久化完成后才ack,但是由于持久化是批量的,所以建议 ack 使用异步

    惰性队列

    直接把消息发到磁盘,而不是先到内存再到磁盘

    消费者的可靠性

    处理模式
    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: none # 不做处理
    

    消费处理完消息后的三种回执

    • ack 成功处理 rabbitmq删除这条消息
    • nack 消息处理失败 重新投递
    • reject 消息处理失败并拒绝该消息 删除

    三种处理模式

    • none 投递完以后 ack
    • manual 手动模式 手动设置 ack 或者 reject
    • auto spring amqp 帮我们做了增强,正常 ack,业务异常 nack, 消息处理或者校验异常 reject
    @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            log.info("spring 消费者接收到消息:【" + msg + "】");
            if (true) {
    //            throw new MessageConversionException("故意的"); // reject
                throw new RuntimeException(""); // 会重试
            }
            log.info("消息处理完成");
        }
    

    测试方式,先测试 none 模式,会发现直接删掉了。再测试 auto ,分别测试 MessageConversionExceptionRuntimeException,前者删掉,后者触发重试

    失败重试机制

    默认是重新在mq中入队出队

    spring:
      rabbitmq:
        listener:
          simple:
            retry:
              enabled: true # 开启消费者失败重试
              initial-interval: 1000ms # 初识的失败等待时长为1秒
              multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
              max-attempts: 3 # 最大重试次数
              stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
    

    可配置在客户端重试
    重试三次后,返回 reject 删掉了消息

    失败处理策略

    MessageRecovery定义
    默认是丢弃RejectAndDontRequeueRecoverer
    ImmediateRequeueMessageRecoverer重新入队
    RepublishMessageRecoverer

    package com.itheima.consumer.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.context.annotation.Bean;
    
    @Configuration
    @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
    public class ErrorMessageConfig {
        @Bean
        public DirectExchange errorMessageExchange(){
            return new DirectExchange("error.direct");
        }
        @Bean
        public Queue errorQueue(){
            return new Queue("error.queue", true);
        }
        @Bean
        public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
            return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
        }
    
        @Bean
        public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
            return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
        }
    }
    
    业务幂等性
    1. 唯一消息 id,业务处理成功后把id保存到数据库,处理前查询判断这条消息是否处理过
    @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jjmc.setCreateMessageIds(true);
        return jjmc;
    }
    
    1. 业务幂等

    死信队列

    存放没有被消费的消息的队列
    概念当中比较重要的是死信的来源,有三个

    • 消息 ttl 过期
    • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    • 消息被拒(basic.reject或basic.nack) 并且 requeue=false

    这三种情况后面会分别模拟,值得说一下的是第三种情况,这里可以看一下之前讲到的 消息未应答时可以重新入队,如果这里配置不入队,就可以被添加到死信队列当中

    注意一个点即可,配置的是 普通队列 与 死信交换机之间的关系

    /**
     * 死信队列  生产者
     */
    public class Producer {
    
        // 普通交换机的名称
        public static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            // 死信消息 设置 ttl 单位是 ms
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .expiration("10000")
                    .build();
    
            for (int i = 0; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
            }
    
        }
    }
    
    **
     * 普通队列消费者
     */
    public class Consumer01 {
    
        // 普通交换机的名称
        public static final String NORMAL_EXCHANGE = "normal_exchange";
    
        // 死信交换机的名称
        public static final String DEAD_EXCHANGE = "dead_exchange";
    
        // 普通队列的名称
        public static final String NORMAL_QUEUE = "normal_queue";
    
        // 死信队列的名称
        public static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            // 声明死信和普通交换机, 类型为 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 声明普通队列和死信队列
            Map<String, Object> arguments = new HashMap<>();
            // 过期时间 不在这里设置 改为在生产者设置消息的 ttl 
            // arguments.put("x-message-ttl", 10000);
            // 正常队列设置死信交换机
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            // 设置死信 routing-key
            arguments.put("x-dead-letter-routing-key", "lisi");
    
            // 设置正常队列长度的限制
    //        arguments.put("x-max-length", 6);
    
            channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
    
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
    
            // 绑定普通交换机与队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
    
            // 绑定死信交换机与队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
    
            System.out.println("等待接收消息....");
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String s = new String(message.getBody(), "UTF-8");
    //            if (s.equals("info5")) {
    //                System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8") + "此消息被拒绝");
    //                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    //            } else {
                    System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8"));
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    //            }
            };
            // 开启手动应答
            channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
            });
        }
    }
    
    /**
     * 死信队列消费者
     */
    public class Consumer02 {
    
        // 死信队列的名称
        public static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            System.out.println("等待接收消息....");
    
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), "UTF-8"));
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
    
            channel.basicConsume(DEAD_QUEUE, false, deliverCallback, consumerTag -> {
            });
        }
    }
    


    我们可以看到 普通队列与死信交换机之间的关系
    情况一模拟:TTL

     AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .expiration("10000")
                    .build();
    

    设置发送的消息的 ttl
    模拟方式很简单,先启动 c1 然后关闭,然后启动消费者


    情况2 超出队列大小
    我们运行一次c2 ,把死信队列里面的消息消费掉

    重新开始测试,为避免干扰我们去掉消息的ttl

    设置队列最大长度为6,所以按照推测,如果发送11条消息,会有5条(超出部分)进入到死信队列

    注:我们这里需要删除原来的队列,因为队列的参数被修改了


    管理面板中删除即可
    我们再次启动 c1 然后关闭 c1再开启 p

    结果符合预期
    情况3:
    我们首先还是排除干扰,先开启c2 消费掉死信中的消息,然后删除队列normal,再然后注释掉 队列长度的配置
    模拟方式也很简单,我们把 info5 这条消息 ,basicReject 给拒绝掉,看这条消息会不会进入到我们的死信队列



    延迟队列

    延迟队列的应用场景是很多的,订单十分钟内未付款取消等等
    延迟队列的实现很简单,其实利用前面我们说到的消息的 ttl 属性就可以实现了
    这里说一下 队列设置 ttl 和消息设置 ttl 的区别

    这里的整合我们用 SpringBoot
    版本 2.3.8.RELEASE (大版本尽量一致)

    <dependencies>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starterartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.47version>
            dependency>
    
            <dependency>
                <groupId>io.springfoxgroupId>
                <artifactId>springfox-swagger2artifactId>
                <version>2.9.2version>
            dependency>
            <dependency>
                <groupId>io.springfoxgroupId>
                <artifactId>springfox-swagger-uiartifactId>
                <version>2.9.2version>
            dependency>
    
            
            <dependency>
                <groupId>org.springframework.amqpgroupId>
                <artifactId>spring-rabbit-testartifactId>
                <scope>testscope>
            dependency>
    
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
            dependency>
    
        dependencies>
    
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    Swagger 配置类

    @Configuration
    @EnableSwagger2
    public class SwaggerConfig {
    
        public Docket webApiConfig() {
            return new Docket(DocumentationType.SWAGGER_2)
                    .groupName("webApi")
                    .apiInfo(webApiInfo())
                    .select()
                    .build();
        }
    
        private ApiInfo webApiInfo() {
            return new ApiInfoBuilder()
                    .title("rabbitmq 接口文档")
                    .description("本文档描述了 rabbitmq 微服务接口定义")
                    .version("1.0")
                    .contact(new Contact("enjoy6288", "http://atguigu.com",
                            "1551388580@qq.com"))
                    .build();
        }
    }
    


    配置类

    @Configuration
    public class TtlQueueConfig {
    
        public static final String X_EXCHANGE = "X";
        public static final String QUEUE_A = "QA";
        public static final String QUEUE_B = "QB";
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        public static final String DEAD_LETTER_QUEUE = "QD";
    
        // 声明 xExchange
        @Bean("xExchange")
        public DirectExchange xExchange() {
            return new DirectExchange(X_EXCHANGE);
        }
    
        // 声明 xExchange
        @Bean("yExchange")
        public DirectExchange yExchange() {
            return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
        }
    
        //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
        @Bean("queueA")
        public Queue queueA() {
            Map<String, Object> args = new HashMap<>(3);
    //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
    //声明当前队列的死信路由 key
            args.put("x-dead-letter-routing-key", "YD");
    //声明队列的 TTL
            args.put("x-message-ttl", 10000);
            return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
        }
    
    
        // 声明队列 A 绑定 X 交换机
        @Bean
        public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                     @Qualifier("xExchange") DirectExchange xExchange) {
            return BindingBuilder.bind(queueA).to(xExchange).with("XA");
        }
    
        //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
        @Bean("queueB")
        public Queue queueB() {
            Map<String, Object> args = new HashMap<>(3);
    //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
    //声明当前队列的死信路由 key
            args.put("x-dead-letter-routing-key", "YD");
    //声明队列的 TTL
            args.put("x-message-ttl", 40000);
            return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
        }
    
        //声明队列 B 绑定 X 交换机
        @Bean
        public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                      @Qualifier("xExchange") DirectExchange xExchange) {
            return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
        }
    
        //声明死信队列 QD
        @Bean("queueD")
        public Queue queueD() {
            return new Queue(DEAD_LETTER_QUEUE);
        }
    
        //声明死信队列 QD 绑定关系
        @Bean
        public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                            @Qualifier("yExchange") DirectExchange yExchange) {
            return BindingBuilder.bind(queueD).to(yExchange).with("YD");
        }
    
    }
    

    消费者

    @Slf4j
    @Component
    public class DeadLetterQueueConsumer {
    
        @RabbitListener(queues = "QD")
        public void receiveD(Message message, Channel channel) throws Exception {
            String msg = new String(message.getBody());
            log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
        }
    }
    

    控制层 生产者

    @Slf4j
    @RequestMapping("ttl")
    @RestController
    public class SendMsgController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("sendMsg/{message}")
        public void sendMsg(@PathVariable String message) {
            log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
            rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
            rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
        } 
    }
    
    GET http://localhost:8080/ttl/sendMsg/aaa
    

    优化 队列ttl存在硬编码

    创建一条新的队列QC,不在队列上配置 ttl, 在消息上配置 ttl

    @Component
    public class MsgTtlQueueConfig {
        public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
        public static final String QUEUE_C = "QC";
    
        //声明队列 C 死信交换机
        @Bean("queueC")
        public Queue queueB() {
            Map<String, Object> args = new HashMap<>(3);
            //声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
            //声明当前队列的死信路由 key
            args.put("x-dead-letter-routing-key", "YD");
            //没有声明 TTL 属性
            return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
        }
    
        //声明队列 B 绑定 X 交换机
        @Bean
        public Binding queueBindingC(@Qualifier("queueC") Queue queueC,
                                     @Qualifier("xExchange") DirectExchange xExchange) {
            return BindingBuilder.bind(queueC).to(xExchange).with("XC");
        }
    }
    

    生产者

    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
        public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
            rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
                correlationData.getMessageProperties().setExpiration(ttlTime);
                return correlationData;
            });
            log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
        }
    
    ###
    GET http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
    
    ###
    GET http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
    

    存在问题,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
    (Callback相关的不用管哈)

    使用rabbitmq插件 实现延迟队列

    rabbitmq_delayed_message_exchange 解压存放到 plugins 目录

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    @Configuration
    public class DelayedQueueConfig {
    
        public static final String DELAYED_QUEUE_NAME = "delayed.queue";
        public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
        public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    
        @Bean("delayedQueue")
        public Queue delayedQueue() {
            return new Queue(DELAYED_QUEUE_NAME);
        }
    
        //自定义交换机 我们在这里定义的是一个延迟交换机
        @Bean("delayedExchange")
        public CustomExchange delayedExchange() {
            Map<String, Object> args = new HashMap<>();
            //自定义交换机的类型
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
        }
    
        @Bean
        public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                           @Qualifier("delayedExchange") CustomExchange
                                                   delayedExchange) {
            return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    

    我们指定创建延迟交换机

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
        public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
            rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                    correlationData -> {
                        correlationData.getMessageProperties().setDelay(delayTime);
                        return correlationData;
                    });
            log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new
                    Date(), delayTime, message);
        }
    


    现在正常了

    补充 win

    官网下载
    下载完成后不要勾选启动
    先执行安装插件

    rabbitmq-plugins enable rabbitmq_management
    

  • 相关阅读:
    Vue的生命周期快速入门
    了解ASEMI代理英飞凌TLE6208-6G其功能和应用的综合指南
    LeetCode657.机器人能否返回原点
    C++ 指针
    【面试题】—— 笔试题(4题)
    golang语言_2
    Revit修改:网格角度,体量轮廓,梁随斜板
    实用工具在线网站
    软件测试框架的面试题讲解
    承装修试电力设施许可证怎么办理?承装修试电力设施许可证办理应该注意哪些?
  • 原文地址:https://blog.csdn.net/qq_39007838/article/details/140373155