• 了解MQ和安装使用RabbitMQ


    什么是消息队列

    本质是一个队列,队列中出存放的是跨进程的通信机制,用于上下游传递消息.

    MQ是常见的上下游"逻辑解耦 + 物理解耦"的消息通信服务,在使用MQ之后,消息发送上只需要依赖MQ,不用依赖其他服务.

    功能

    1 流量削峰

    👱‍♀举个例子

    系统最多处理一万订单,在正常时段是没问题的,我们下单一秒就能返回结果.

    但是在高峰期,如果有两万的下单操作,我们的系统无法处理,只能限制订单超过一万后不允许用户下单.

    使用MQ做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的消息

    2 应用解耦

    以电商应用为例,应用中有订单系统、库存系统、支付系统。用户创建订单后如果耦合调用库存系统和支付系统这些系统,任何一个子系统出了故障,都会造成下单操作异常。
    请添加图片描述

    使用MQ,将订单系统和其余系统完成解耦,不必担心其它系统出现故障,当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如库存系统发生故障,需要几分钟修复,在这几分钟,库存系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成,当库存系统恢复后,继续处理订单信息即可,提高系统的可用性。
    请添加图片描述

    3 异步处理

    有些服务间的调用并不是同步的,而是异步执行,例如,A调用B,B需要花费很长时间执行,此时A需要知道B什么时间可以执行完成,在未使用MQ时,一般会有两种方法实现,1.A不断地轮询查看B是否完成。2、就是A提供一个调用接口,当B执行完成之后,调用A的回调接口,以此实现。

    MQ很好的解决这个问题

    A调用B后,只需要监听B处理完成消息,当B处理完后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样就省去了A的轮询或者B对A的回调。A也能够即使得到异步处理消息。
    在这里插入图片描述

    MQ分类

    AcitveMQ

    优点: 单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据

    缺点: 维护少,高吞吐量的场景较少使用

    Kafka

    使用场景

    他的特点是基于pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。

    💛 优点: 性能卓越,单机写入TPS约在百万条/秒,吞吐量高,时效性高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。

    功能比较简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使。

    ❌缺点: Kafka单机超过64个队列,Load会发生明显的标高现象,队列越多,load越高,发生消息响应时间变长,使用短轮询的方式,实时性取决于轮询间隔时间,消息失败不支持重试,支持消息顺序,但是一台代理宕机,会产生消息乱序

    RocketMQ

    使用场景:

    为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入后,后端可能无法及时处理的情况。

    💛优点: 单机吞吐量十万级,而可用性高,分布式架构,消息可以做到0丢失。

    MQ功能完善,还是分布式,支持10亿级别的消息堆积,不会因为堆积导致性能下降

    ❌缺点: 支持客户端语言不多,目前是java,没有在MQ黑犀牛中实现JMS等接口,有些系统要迁移需要修改大量代码。

    RabbitMQ

    是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

    适应场景

    结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便

    💛 优点: 由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言,支持AJAX文档齐全;开源提供的管理界面;更新频率相当高

    RabbitMQ

    如何使用

    1 安装Erlang语言搭建运行环境

    1. Erlang下载(下载地址https://www.erlang.org/downloads)并安装
    2. 配置环境变量: 变量名:ERLANG_HOME 变量值:(自己的安装路径)
      然后在系Path变量名选中点击编辑,新建内容 %ERLANG_HOME%\bin
    3. Win+R,输入erl查看版本

    2 安装RabbitMQ

    1. RabbitMQ去官网下载https://www.rabbitmq.com/download.html 进入页面点击右侧菜单列表中Install: Windows选项,在下载页面找到Direct Downloads下载项选择下载
    2. RabbitMQ安装,安装步骤和Erlang一样一直下一步就可以,安装完成后RabbitMQ会在系统开始菜单中添加服务快捷键

    3 启动服务

    找到开始菜单中的RabbitMQ Service - start 如果没有点击展开就可以看到,如果提示没有此服务需要安装服务点击RabbitMQ Service - (re)install安装服务

    4 开启web管理界面

    1. Win+R 输入cmd打开命令行 cd到RabbitMQ安装目录sbin目录下输入下面指令
      rabbitmq-plugins.bat enable rabbitmq_management
    2. 重启RabbitMQ服务 先停止服务 点击开始菜单中的 RabbitMQ Service - stop 停止完成后 再次启动 RabbitMQ Service - start
    3. 重启服务后 在浏览器中输入http://127.0.0.1:15672 进入web管理界面 默认账号密码 guest/guest

    5 测试代码

    导入依赖(SpringBoot的版本我使用的是2.6.6)

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    

    在application.yml中添加RabbitMQ的地址:

    spring:
      rabbitmq:
        host: 192.168.18.130
        username: admin
        password: admin
    

    编写Send消息生产者

    public static void main(String[] args) {
        //定义连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置服务地址
        connectionFactory.setHost("localhost");
        try {
            //通过工厂获取连接
            Connection connection = connectionFactory.newConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare("hello", false, false, false, null);
            //消息
            String msg = "Hello World";
            channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者已发送" + msg);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
    

    编写Recv消费者

    public class Recv {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            //定义连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //设置服务地址
            connectionFactory.setHost("localhost");
    
            //通过工厂获取连接
            Connection connection = connectionFactory.newConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            /*DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body);
                    System.out.println(" 消费者 接收到消息'" + msg + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME,true,consumer);*/
            DeliverCallback deliverCallback = (consumerTag, deliver) -> {
                String message = new String(deliver.getBody(), "UTF-8");
                System.out.println(" 消费者1 接收到消息'" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        }
    }
    

    先启动消费者再启动生产者,web端会有变化,且消费者会受到生产者发来的消息

    在这里插入图片描述
    在这里插入图片描述

    消息模型

    创建一个连接工具类

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class MQConnectionUtil {
    
        public static Connection getConnection() throws IOException, TimeoutException {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("127.0.0.1");
            //端口
            factory.setPort(5672);
            //设置账户信息
            factory.setUsername("guest");
            factory.setPassword("guest");
            //通过工厂获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    基本消息模型

    在这里插入图片描述
    生产者发送消息

    public class send {
    
        private final static String QUEUE_NAME = "testQueue";
    
        public static void main(String[] args) {
            try {
                //通过工厂获取连接
                Connection connection = MQConnectionUtil.getConnection();
                //建立通道
                Channel channel = connection.createChannel();
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //消息
                String msg = "Hello World";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
                System.out.println("生产者已发送" + msg);
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    消费者消费消息

    public class Recv {
    
        private final static String QUEUE_NAME = "testQueue";
    
        public static void main(String[] args) throws Exception {
            //与mq服务建立连接
            Connection connection = MQConnectionUtil.getConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //获取消息,并且处理,有消息时会自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是消息体
                    String msg = new String(body);
                    System.out.println(" 消费者 接收到消息'" + msg + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    消息的接收与消费使用都需要在一个匿名内部类DefaultConsumer中完成.

    注意: 队列需要提前声明,如果未声明就是用队列,则会报错.

    生产者和消费者都声明队列,队列的创建会保证幂等性,两个都声明同一个队列,就只会创建一个队列.

    WorkQueues 工作队列模型

    在基本消息模型中,一个生产者对应一个消费者,而实际生产过程中,往往消息生产会发送很多条消息,如果消费者只有一个的话效率就会很低,因此rabbitmq有另外一种消息模型,这种模型下,一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。
    在这里插入图片描述

    生产者生产20条消息

    public class send {
    
        private final static String QUEUE_NAME = "testQueue";
    
        public static void main(String[] args) {
            try {
                //通过工厂获取连接
                Connection connection = MQConnectionUtil.getConnection();
                //建立通道
                Channel channel = connection.createChannel();
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                for (int i = 0; i < 20; i++) {
                    //消息
                    String msg = "task.." + i;
                    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
                    System.out.println("生产者已发送" + msg);
                    Thread.sleep(500);
                }
                channel.close();
                connection.close();
            } catch (IOException | TimeoutException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    消费者1和消费者2

    public class Recv1 {
    
        private final static String QUEUE_NAME = "testQueue";
    
        public static void main(String[] args) throws Exception {
            //与mq服务建立连接
            Connection connection = MQConnectionUtil.getConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //获取消息,并且处理,有消息时会自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是消息体
                    String msg = new String(body);
                    System.out.println(" 消费者1 接收到消息'" + msg + "'");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    public class Recv2 {
    
        private final static String QUEUE_NAME = "testQueue";
    
        public static void main(String[] args) throws Exception {
            //与mq服务建立连接
            Connection connection = MQConnectionUtil.getConnection();
            //建立通道
            Channel channel = connection.createChannel();
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //获取消息,并且处理,有消息时会自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //body就是消息体
                    String msg = new String(body);
                    System.out.println(" 消费者2 接收到消息'" + msg + "'");
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    订阅模式

    订阅模式中,可以实现一条消息被多个消费者获取。在这种模型下,消息传递过程中比之前多了一个exchange交换机,生产者不是直接发送消息到队列,而是先发送给交换机,经由交换机分配到不同的队列,而每个消费者都有自己的队列:

    在这里插入图片描述

    交换机的类型

    1. Fanout:广播模式,交换机将消息发送到所有与之绑定的队列中去
    2. Direct:定向,交换机按照指定的Routing Key发送到匹配的队列中去
    3. Topics: 通配符,与定向大致相同,不同在于Routing Key可以根据通配符进行匹配
    广播模式Fanout
    1. 多个消费者,独立队列
    2. 需要与exchange绑定
    3. 生产者发消息给exchange
    4. exchange将消息发送到所有绑定的队列中
    5. 消费者从各自的队列获取消息

    生产者

    public class Send {
    
        private static final String EXCHANGE_NAME = "fanout_exchange";
    
        public static void main(String[] args) {
            try {
                Connection connection = MQConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                String msg = "广播模式1";
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                System.out.println("广播发送消息:" + msg);
                channel.close();
                connection.close();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    消费者

    public class Consumer1 {
    
        //独立队列
        private static final String QUEUE_NAME = "fanout_queue_1";
        private static final String EXCHANGE_NAME = "fanout_exchange";
    
        public static void main(String[] args) {
            try {
                Connection connection = MQConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                //消费者1声明自己的队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //声明Exchange,类型为fanout
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                //消费者将队列与交换机绑定
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body);
                        System.out.println("消费者1获得消息:" + msg);
                    }
                });
    
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    其他消费者只需修改QUEUE_NAME即可

    注意:exchange与队列一样都需要提前声明,如果未声明就使用交换机,则会报错。如果不清楚生产者和消费者谁先声明,为了保证不报错,生产者和消费者都声明交换机,同样的,交换机的创建也会保证幂等性。

    定向模式Direct

    路由模式,可以实现不同的消息被不同队列消费,在direct中,交换机不在将消息发送给所有绑定的队列,而是根据Routing Key将消息发送给指定的队列,队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息也需要携带一个Routing Key.
    在这里插入图片描述

    消费者C1的队列与交换机绑定时设置的Routing Key是“error”, 而C2的队列与交换机绑定时设置的Routing Key包括三个:“info”,“error”,“warning”,假如生产者发送一条消息到交换机,并设置消息的Routing Key为“info”,那么交换机只会将消息发送给C2的队列。

    生产者

    public class Send {
    
        private static final String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) {
            try {
                Connection connection = MQConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
    
                String msg = "新增订单";
                //生产者发送消息时,设置消息的Routing Key : insert
                channel.basicPublish(EXCHANGE_NAME, "insert", null, msg.getBytes());
                System.out.println("生产者发送消息:" + msg);
    
                String msg1 = "异常订单";
                //生产者发送消息时,设置消息的Routing Key : Error
                channel.basicPublish(EXCHANGE_NAME, "error", null, msg1.getBytes());
                System.out.println("生产者发送消息:" + msg1);
    
                String msg2 = "订单信息";
                //生产者发送消息时,设置消息的Routing Key : info
                channel.basicPublish(EXCHANGE_NAME, "info", null, msg2.getBytes());
                System.out.println("生产者发送消息:" + msg2);
                channel.close();
                connection.close();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    

    消费者

    public class InfoConsumer {
    
        //独立队列
        private static final String QUEUE_NAME = "direct_queue_3";
        private static final String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) {
            try {
                Connection connection = MQConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                //消费者1声明自己的队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //声明Exchange,类型为direct
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                //消费者将队列与交换机绑定,设置Routing Key为insert
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
                channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body);
                        System.out.println("insert消费者获得消息:" + msg);
                    }
                });
    
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    public class InsertConsumer {
    
        //独立队列
        private static final String QUEUE_NAME = "direct_queue_1";
        private static final String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) {
            try {
                Connection connection = MQConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                //消费者1声明自己的队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //声明Exchange,类型为direct
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                //消费者将队列与交换机绑定,设置Routing Key为insert
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
                channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body);
                        System.out.println("insert消费者获得消息:" + msg);
                    }
                });
    
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class ErrorConsumer {
    
        //独立队列
        private static final String QUEUE_NAME = "direct_queue_2";
        private static final String EXCHANGE_NAME = "direct_exchange";
    
        public static void main(String[] args) {
            try {
                Connection connection = MQConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
                //消费者1声明自己的队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //声明Exchange,类型为fanout
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                //消费者将队列与交换机绑定,Routing Key: Error
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
                channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body);
                        System.out.println("Error消费者获得消息:" + msg);
                    }
                });
    
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
    
    发布订阅Topics

    Topic类型的Exchange与Direct相比,都是根据Routing Key把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定Routing Key的时候使用通配符

    Routing Key由一个或者多个单词组成,多个单词用"."分割

    通配符规则:

         #:匹配一个或多个词
    
         *:匹配不多不少恰好1个词
    

    举例:

         audit.#:能够匹配audit.irs.corporate 或者 audit.irs
    
         audit.*:只能匹配audit.irs
    

    声明交换机时,将类型设为BuiltinExchangeType.TOPIC(topic)

    如何防止消息丢失

    消息确认机制(ACK)

    RabbitMQ有一个ACK机制,消费者在接收到消息后会向mq服务发送回执ACK,告知消息已被接收。这种ACK分为两种情况:

    • 自动ACK:消息一旦被接收,消费者会自动发送ACK
    • 手动ACK:消息接收后,不会自动发送ACK,而是需要手动发送ACK

    如果消费者没有发送ACK,则消息会一直保留在队列中,等待下次接收。但这里存在一个问题,就是一旦消费者发送了ACK,如果消费者后面宕机,则消息会丢失。因此自动ACK不能保证消费者在接收到消息之后能够正常完成业务功能,因此需要在消息被充分利用之后,手动ACK确认

    自动ACK,basicConsume方法中将autoAck参数设为true即可:

    channel.basicConsume(QUEUE_NAME,true, consumer);
    

    手动ACK

    channel.basicConsume(QUEUE_NAME,fasle, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body);
            System.out.println("insert消费者获得消息:" + msg);
            channel.basicAck(envelope.getDeliveryTag(),false);
        }
    });
    

    如果设置了手动ack,但又不手动发送ACK确认,消息会一直停留在队列中,可能造成消息的重复获取

    持久化

    消息确认机制(ACK)能够保证消费者不丢失消息,但假如消费者在获取消息之前mq服务宕机,则消息也会丢失,因此要保证消息在服务端不丢失,则需要将消息进行持久化。队列、交换机、消息都要持久化。

    队列持久化

    将第二个参数设置为true

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    exchange持久化

    在最后添加参数 true

    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);
    
    消息持久化

    MessageProperties.PERSISTENT_TEXT_PLAIN,将字符串类型的消息进行持久化

     channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));   
    

    生产者确认

    生成者在发送消息过程中也可能出现错误或者网络延迟灯故障,导致消息未成功发送到交换机或者队列,或重复发送消息,为了解决这个问题,rabbitmq中有多个解决办法:

    事务
     try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            channel.txCommit();
     } catch (Exception e) {
         channel.txRollback();
     }
    
    Confirm模式

    在发送代码前执行channel.confirmSelect(),如果消息未正常发送,就会进入if代码块,可以进行重发也可以对失败消息进行记录

    channel.confirmSelect();
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
    if (!channel.waitForConfirms()){
        System.out.println("message failed");
    }
    
    异步confirm

    生产者发送消息后不用等待服务端回馈发送状态,可以继续执行后面的代码,对于失败消息重发进行异步处理:

    channel.confirmSelect();
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
    channel.addConfirmListener(new ConfirmListener() {
        @Override
        public void handleAck(long l, boolean b) throws IOException {
            //消息成功处理
        }
    
        @Override
        public void handleNack(long l, boolean b) throws IOException {
            //消息丢失
        }
    });
    
    Spring AMQP中添加配置

    生产者确认机制,确保消息正确发送,如果发送失败会有错误回执,从而触发重试

    spring:
      rabbitmq:
        publisher-confirms: true
    

    参考文章: 如何使用RabbitMQ: https://www.cnblogs.com/corwyn/p/13628012.html
    RabbitMQ使用详解: https://www.cnblogs.com/ithushuai/p/12443460.html
    消息队列(MQ)简介: https://blog.csdn.net/weixin_56289362/article/details/125125615

  • 相关阅读:
    【数据结构】B树、B+树的知识点学习总结
    Mybatis总结--传参二
    自注意力机制(Self-attention)【第四章】
    13-security其他.md
    Nacos源码系列—关于服务注册的那些事
    纯内网环境中k8s下onlyOffice启用https
    高性能 Java 计算服务的性能调优实战
    中国石油大学(北京)-《钻井液工艺原理》第二阶段在线作业
    「前端+鸿蒙」鸿蒙应用开发-ArkTS语法说明-组件声明
    网络与信息安全基础知识--网络的协议与标准
  • 原文地址:https://blog.csdn.net/XiaoFanMi/article/details/127041214