• RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)


    一、MQ基本介绍

    MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。

    功能1:流量消峰

    功能2:应用解耦

    功能3:异步处理

    MQ的分类:

    1.Kafka

    2.RabbitMQ

    RabbitMQ概念:

    四大核心概念:

    交换机:

    队列: 

     六大核心模式:

    1.简单模式。2.工作模式。3.发布订阅模式。4.路由模式。5.主题模式。6.发布确认模式。

    RabbitMQ工作原理:

    Channer:信道,发消息的通道。

    二、MQ下载安装

    下载:

    1. 官网地址:https://www.rabbitmq.com/download.html。参考的下载地址如下:Linux下安装RabbitMQ_rabbitmq下载_零碎de記憶的博客-CSDN博客

    2.安装Erlang环境

    yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers
    

    3.下载Erlang,方式1:找到下面网址,在网址中下载rpm文件:

    el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud

    或者直接输入下面指令下载rpm文件: 

    wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm
    

    然后输入下面的命令安装已下载的安装包:

    yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm
    

    4.下载RabbitMQ,输入下面的下载

    wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm
    

     输入下面的命令进行本地安装:

    yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm

    5. 下载socat,检查是否已下载:

    yum install socat -y

    注意以下的操作都要在 /usr/local/software目录下查看: 

    6.添加开机启动RabbitMQ服务:chkconfig rabbitmq-server on。启动rabbitmq /sbin/service rabbitmq-server start。

    7.查看服务状态 /sbin/service rabbitmq-server status

    8.停止服务 /sbin/service rabbitmq-server stop。重新查看服务状态。

    10.开启web管理界面 sudo rabbitmq-plugins enable rabbitmq_management

    11.查看防火墙状态:systemctl status firewalld。关闭防火墙:systemctl stop firewalld。关闭rabbit服务器输入:sudo rabbitmqctl stop。开启rabbit服务器输入:sudo rabbitmq-server -detached。

    12.在浏览器中输入地址:Linux服务器ip地址:15672,可访问web管理界面。

    13.用户名guest,密码默认,但无法登陆,无权限。

    14.rabbitmqctl list_users查看用户。创建账号 rabbitmqctl add_user admin 123。设置用户角色为管理员 rabbitmqctl set_user_tags admin administrator。设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"。

    15.再经尝试可以重新登录:

    三、创建Java开发环境

    1.创建1个新项目,命名atguigu-rabbitmq,然后创建模块Module。GroupId可以填写:com.atguigu.rabbitmq,ArtifactId可以填rabbitmq-hello,选择quickstart:

    导入依赖如下:

    1. <dependencies>
    2. <!--rabbitmq依赖客户端-->
    3. <dependency>
    4. <groupId>com.rabbitmq</groupId>
    5. <artifactId>amqp-client</artifactId>
    6. <version>5.8.0</version>
    7. </dependency>
    8. <!--操作文件流的依赖-->
    9. <dependency>
    10. <groupId>commons-io</groupId>
    11. <artifactId>commons-io</artifactId>
    12. <version>2.6</version>
    13. </dependency>
    14. <dependency>
    15. <groupId>org.apache.maven.plugins</groupId>
    16. <artifactId>maven-compiler-plugin</artifactId>
    17. <version>3.8.1</version> <!-- 根据你的需求指定版本号 -->
    18. </dependency>
    19. </dependencies>
    20. <build>
    21. <plugins>
    22. <plugin>
    23. <groupId>org.apache.maven.plugins</groupId>
    24. <artifactId>maven-compiler-plugin</artifactId>
    25. <configuration>
    26. <source>8</source>
    27. <target>8</target>
    28. </configuration>
    29. </plugin>
    30. </plugins>
    31. </build>

    在下图中,P是生产者,C是消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓存区。

    生产者代码

    1. public class producer {
    2. //队列名称
    3. public static final String QUEUE_NAME = "hello";
    4. //发消息
    5. public static void main( String[] args ) throws IOException, TimeoutException {
    6. //1步:创建一个连接工程
    7. ConnectionFactory factory = new ConnectionFactory();
    8. //2步:输入工厂IP,用户名和密码——连接RabbitMQd队列
    9. factory.setHost("192.168.182.136");
    10. factory.setUsername("admin");
    11. factory.setPassword("123");
    12. //3步:创建连接
    13. Connection connection = factory.newConnection();
    14. //4步:获取信道
    15. Channel channel = connection.createChannel();
    16. //5步:生成一个队列(队列名称,是否持久化,是否排他,自动删除,队列参数)
    17. //持久化:是否存储入磁盘,默认是将消息存储在内存中
    18. //排他:队列是否只供一个消费者消费,是否进行消息共享,true可以供多个消费者消费
    19. //自动删除:最后一个消费者断开连接后,该队列是否自动删除
    20. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    21. //6步:发消息,(交换机,路由key本次是队列名,参数,发送的消息)
    22. String message = "hello world";
    23. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    24. System.out.println("消息发送成功!!!");
    25. }
    26. }

    消费者代码

    1. public class consumer {
    2. public static final String QUEUE_NAME = "hello";
    3. public static void main(String [] args) throws IOException, TimeoutException {
    4. //1步:创建一个连接工程
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //2步:输入工厂IP,用户名和密码——连接RabbitMQd队列
    7. factory.setHost("192.168.182.136");
    8. factory.setUsername("admin");
    9. factory.setPassword("123");
    10. //3步:创建连接
    11. Connection connection = factory.newConnection();
    12. //4步:获取信道
    13. Channel channel = connection.createChannel();
    14. //5步:声明,接收消息
    15. DeliverCallback deliverCallback = (consumerTag,message)->{
    16. System.out.println(new String(message.getBody()));
    17. };
    18. //6步:取消消息时的回调
    19. CancelCallback cancelCallback = consumerTag->{
    20. System.out.println("消息消费被中断");
    21. };
    22. //7步:接收,(队列名,自动or手动,接收消息,回调)
    23. //1.消费哪个队列;2.消费成功后是否要自动应答true代表自动应答,false表示手动应答
    24. //3.消费者未成功消费的回调
    25. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    26. }
    27. }

    注意几点:1.确保rabbitmq处于开启的状态(开启方式见前面)2.最好让防火墙处于关闭的状态 3.最好通过方法左侧的开关按钮进行启动,确保启动是选择Current File。

     

    四、工作队列

    工作队列:又称任务队列,主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

    情况:生产者大量分发消息给队列,工作线程接收队列的消息,工作线程不止一个,三者关系时竞争关系,你一条我一条他一条,但要注意一个消息只能被处理一次,不能被处理多次。

    重复性的代码可以被抽取成为工具类。

    在java — com — atguigu — rabbitmq下创建utils包,工具类起名RabbitMqUtils,放入如下代码:

    1. public class RabbitMqUtils {
    2. public static Channel getChannel() throws Exception{
    3. //1步:创建一个连接工程
    4. ConnectionFactory factory = new ConnectionFactory();
    5. //2步:输入工厂IP,用户名和密码——连接RabbitMQd队列
    6. factory.setHost("192.168.182.137");
    7. factory.setUsername("admin");
    8. factory.setPassword("123");
    9. //3步:创建连接
    10. Connection connection = factory.newConnection();
    11. //4步:获取信道
    12. Channel channel = connection.createChannel();
    13. return channel;
    14. }
    15. }

    工作线程的更新后,worker01的代码如下:

    1. public static final String QUEUE_NAME = "hello";
    2. public static void main(String [] args) throws Exception {
    3. Channel channel = RabbitMqUtils.getChannel();
    4. channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明队列,没有会报错
    5. //消息接收
    6. DeliverCallback deliverCallback = (consumerTag,message)->{
    7. System.out.println("接收到的消息:" + new String(message.getBody()));
    8. };
    9. CancelCallback cancelCallback = (consumerTag)->{
    10. System.out.println(consumerTag + "消息被取消消费接口回调逻辑");
    11. };
    12. System.out.println("c1等待接收消息......");
    13. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    14. }

    重复利用one包下的consumer类,将其更改为c2工作线程:

    Task01作为生产者用于生产数据,与前面不同的是,Task01支持从IDEA控制台输入数据:

    1. public class Task01 {
    2. public static final String QUEUE_NAME="hello";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    6. //从控制台当中接收信息
    7. Scanner scanner = new Scanner(System.in); //扫描控制台输入内容
    8. while(scanner.hasNext()){
    9. String message = scanner.next();
    10. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    11. System.out.println("发送消息完成..");
    12. }
    13. }
    14. }

      

    五、消息应答

    概念:

    自动应答:

    手动应答:

    手动应答好处,建议不批量应答,选择false:

     消息自动重新入队:

    原本正常传输,C1突然失去连接,检测到C1断开连接,于是会让消息重新入队,原本的消息交由C2进行处理。

    实验思路:写1个生产者,2个消费者,当关闭掉其中1个工作线程,消息不丢失,还被另一个工作线程接收。消费在手动应答时不丢失、放回队列中重新消费。

    消息手动应答(生产者):

    1. public class Task2 {
    2. public static final String task_queue_name = "ack_queue";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.queueDeclare(task_queue_name,false,false,false,null);
    6. Scanner scanner = new Scanner(System.in);
    7. while(scanner.hasNext()){
    8. String message = scanner.next();
    9. channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));
    10. System.out.println("生产者发出消息:"+message);
    11. }
    12. }
    13. }

    消息手动应答(消费者):

    1. public class Work03 {
    2. public static final String task_queue_name = "ack_queue";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. System.out.println("C1等待接收消息处理时间较短");
    6. DeliverCallback deliverCallback = (consumerTag,message)->{
    7. SleepUtils.sleep(1);
    8. System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
    9. //1.消息的标记tag 2.是否批量应答
    10. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    11. };
    12. //采用手动应答
    13. boolean autoAck = false;
    14. channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{
    15. System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
    16. }));
    17. }
    18. }
    1. public class Work04 {
    2. public static final String task_queue_name = "ack_queue";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. System.out.println("C2等待接收消息处理时间较短");
    6. DeliverCallback deliverCallback = (consumerTag,message)->{
    7. SleepUtils.sleep(30);
    8. System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
    9. //1.消息的标记tag 2.是否批量应答
    10. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    11. };
    12. //采用手动应答
    13. boolean autoAck = false;
    14. channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{
    15. System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
    16. }));
    17. }
    18. }

    实现效果:在生产者输入AA BB CC DD EE等消息,消费者1接收速度快,会立即打印AA CC EE等消息,消费者2接收速度慢,会在一段时间后接收到BB,此时如果关闭消费者2,则消费者1输出DD,表明消费在手动应答时不丢失、放回队列中重新消费。

    六、持久化与分发

    如果报错,说明原本的队列就是不持久化,此时无法设定持久化,只能先将队列删除然后再重新设定。

    控制队列持久化,需要修改生产者声明函数的第2个参数:

    消息持久化:

    队列持久化和消息持久化不同,队列是MQ里的一个组件,消息是生产者发送的消息。

    如果要让消息持久化,在发消息的时候就要通知队列。

    更改的是生产者的信道的basicPublish的第3个参数,添加MessageProperties.PERSISTENT_TEXT_PLAIN

    不公平分发: 

    消费者处理任务的速度不一致,为了不让速度快的消费者长时间处于空闲状态,因此采用不公平分发。

    1. int prefetchCount = 1;
    2. channel.basicQos(prefetchCount);

    预取值:

    前面N条数据分别交给谁处理,如下图就是前7条数据中,2条给C1,5条给C2

    七、发布确认

    原理:

    1.设置要求队列必须持久化:就算服务器宕机,队列也不至于消失。

    2.设置要求队列中的消息也必须持久化。

    3. 发布确认,消息保存到磁盘上之后,队列要告知生产者。

    1. Channel channel = connection.createChannel();
    2. channel.confirmSelect();

    1. public static void main(String[] args){
    2. }

    单个发布确认:

    是一种同步确认发布的方式,发布消息-确认消息-发布消息...必须要确认后才能继续发布,类似于一手交钱一手交货,缺点是发布速度很慢。

    1. 创建com/atguigu/rabbitmq/four文件夹下的ConfirmMessage

    1. public static void publishMessageIndividually() throws Exception{
    2. Channel channel = RabbitMqUtils.getChannel(); //获取信道
    3. String queueName = UUID.randomUUID().toString();
    4. channel.queueDeclare(queueName,false,false,false,null);
    5. channel.confirmSelect();//开启发布确认
    6. long begin = System.currentTimeMillis();
    7. for(int i=0;i<MESSAGE_COUNT;i++){
    8. String message = i +"";
    9. channel.basicPublish("",queueName,null,message.getBytes());
    10. boolean flag = channel.waitForConfirms();
    11. if(flag){
    12. System.out.println("消息发送成功");
    13. }
    14. }
    15. long end = System.currentTimeMillis();
    16. System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");
    17. }

    批量发布确认:

    1. public static void publishMessageBatch() throws Exception{
    2. Channel channel = RabbitMqUtils.getChannel(); //获取信道
    3. String queueName = UUID.randomUUID().toString();
    4. channel.queueDeclare(queueName,false,false,false,null);
    5. channel.confirmSelect();//开启发布确认
    6. long begin = System.currentTimeMillis();
    7. int batchSize = 100; //批量确认消息的大小
    8. //批量发送消息,批量发布确认
    9. for(int i=0;i<MESSAGE_COUNT;i++){
    10. String message = i+"";
    11. channel.basicPublish("",queueName,null,message.getBytes());
    12. //判断达到100条消息的时候,批量确认一次
    13. if(i%batchSize==0) channel.waitForConfirms();
    14. }
    15. long end = System.currentTimeMillis();
    16. System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
    17. }

    异步发布确认:

    map序列,key是消息序号(deliveryTag是消息的标识,multiple是是否为批量),value是消息内容,将消息每一条都编号,broker会对消息进行应答,分为两种一种是确认应答,另一种是未确认应答。消息生产者不需要等待接收方的消息,只需要负责发送消息即可,消息是否应答最终会以异步的形式回传,也就是说确认的时间可以是稍后的。

    addConfirmListener单参数的是只能监听成功的,多参数的是可以监听成功也可以监听失败的,都是接口需要自己来写。

    1. public static void publishMessageAsync() throws Exception{
    2. Channel channel = RabbitMqUtils.getChannel(); //获取信道
    3. String queueName = UUID.randomUUID().toString();
    4. channel.queueDeclare(queueName,false,false,false,null);
    5. channel.confirmSelect();//开启发布确认
    6. long begin = System.currentTimeMillis();
    7. //消息确认成功,回调函数
    8. ConfirmCallback ackCallback = (deliveryTag, multiple)->{
    9. System.out.println("确认的消息:"+deliveryTag);
    10. };
    11. //消息确认失败回调函数
    12. ConfirmCallback nackCallback = (deliveryTag, multiple)->{
    13. System.out.println("未确认的消息:"+deliveryTag);
    14. };
    15. //准备消息的监听器,监听哪些消息成功了,哪些消息失败了
    16. channel.addConfirmListener(ackCallback,nackCallback);
    17. //批量发送消息
    18. for(int i=0;i<MESSAGE_COUNT;i++){
    19. String message="消息"+i;
    20. channel.basicPublish("",queueName,null,message.getBytes());
    21. //发布确认
    22. }
    23. long end = System.currentTimeMillis();
    24. System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");
    25. }

     处理异步未确认消息:

    最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

    1. public static void publishMessageAsync() throws Exception{
    2. Channel channel = RabbitMqUtils.getChannel(); //获取信道
    3. String queueName = UUID.randomUUID().toString();
    4. channel.queueDeclare(queueName,false,false,false,null);
    5. channel.confirmSelect();//开启发布确认
    6. /*线程安全有序的一个哈希表,适用于高并发的情况下
    7. 1.轻松地将序号与消息进行关联
    8. 2.轻松地批量删除条目只要给到序号
    9. 3.支持高并发(多线程)*/
    10. ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
    11. //消息确认成功,回调函数
    12. ConfirmCallback ackCallback = (deliveryTag, multiple)->{
    13. if(multiple){
    14. //2.删除掉已经确认的消息,剩下的就是未确认的消息
    15. ConcurrentNavigableMap<Long, String> confirmd =
    16. outstandingConfirms.headMap(deliveryTag);
    17. }else{
    18. outstandingConfirms.remove(deliveryTag);
    19. }
    20. System.out.println("确认的消息:"+deliveryTag);
    21. };
    22. //消息确认失败回调函数
    23. ConfirmCallback nackCallback = (deliveryTag, multiple)->{
    24. //3.打印一下未确认的消息都有哪些
    25. String message = outstandingConfirms.get(deliveryTag);
    26. System.out.println("未确认的消息是:"+message+"未确认的消息:"+deliveryTag);
    27. };
    28. //准备消息的监听器,监听哪些消息成功了,哪些消息失败了
    29. channel.addConfirmListener(ackCallback,nackCallback);
    30. long begin = System.currentTimeMillis();
    31. //批量发送消息
    32. for(int i=0;i<MESSAGE_COUNT;i++){
    33. String message="消息"+i;
    34. channel.basicPublish("",queueName,null,message.getBytes());
    35. //1.此处记录下所有发送的消息,消息的总和
    36. outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
    37. }
    38. long end = System.currentTimeMillis();
    39. System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");
    40. }
    41. }

    三种方式对比:

    八、交换机

    一个消息可以被消费多次,需要通过交换机,仍旧遵循队列中的消息只能被消费一次。

     生产者生产的消息从不会直接发送到队列。生产者将消息发送到交换机。交换机负责接收来自生产者的消息,将消息推入队列。

    Exchanges的类型:直接(direct),主题(topic),标题(headers),扇出(fanout)

    消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的。

    创建临时队列:

    String queueName = channel.queueDedare().getQueue();

    绑定:

    根据Routing key来确定消息要发给哪个队列,如果Routing Key相同消息就可以发送给多个队列。

    先添加一个队列queue1,再添加一个交换机exchange1,最后点击exchange1交换机,进入绑定菜单,然后输入绑定的队列是queue1,然后Routing key随便设置为123。

    Fanout交换机(广播)

    Fanout(扇出)是将接收到的所有消息广播到它知道的所有队列中。如果Routing Key相同则发送给队列以相同消息。

    生产者:

    1. public class EmitLog {
    2. public static final String EXCHANGE_NAME="logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    6. Scanner scanner = new Scanner(System.in);
    7. while(scanner.hasNext()){
    8. String message = scanner.next();
    9. channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
    10. System.out.println("生产者发出消息"+message);
    11. }
    12. }
    13. }

    消费者:

    1. public class ReceiveLogs01 {
    2. public static final String EXCHANGE_NAME="logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个交换机
    6. //声明一个队列临时队列,队列的名称是随机的,当消费者断开与队列的连接时候,队列就删除了
    7. String queueName = channel.queueDeclare().getQueue();
    8. //绑定交换机与队列
    9. channel.queueBind(queueName,EXCHANGE_NAME,"");
    10. System.out.println("等待接收消息,把接收到消息打印在屏幕上......");
    11. DeliverCallback deliverCallback = (consumerTag,message)->{
    12. System.out.println("ReceiveLogs01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    13. };
    14. channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    15. }
    16. }

    效果:实现广播的功能

    Direct路由交换机

    消费者1:

    1. public class ReceiveLogsDirect01 {
    2. public static final String EXCHANGE_NAME="direct_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    6. channel.queueDeclare("console",false,false,false,null);
    7. channel.queueBind("console",EXCHANGE_NAME,"info"); //队列名称,交换机名称,Routingkey
    8. DeliverCallback deliverCallback =(consumerTag,message)->{
    9. System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    10. };
    11. channel.basicConsume("console",true,deliverCallback,consumerTag->{});
    12. }
    13. }

    消费者2:

    1. public class ReceiveLogsDirect02 {
    2. public static final String EXCHANGE_NAME="direct_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    6. channel.queueDeclare("disk",false,false,false,null);
    7. channel.queueBind("disk",EXCHANGE_NAME,"error"); //队列名称,交换机名称,Routingkey
    8. DeliverCallback deliverCallback =(consumerTag,message)->{
    9. System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
    10. };
    11. channel.basicConsume("disk",true,deliverCallback,consumerTag->{});
    12. }
    13. }

    生产者: 

    1. public class DirectLogs {
    2. public static final String EXCHANGE_NAME="direct_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. Scanner scanner = new Scanner(System.in);
    6. while(scanner.hasNext()){
    7. String message = scanner.next();
    8. channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
    9. System.out.println("生产者发出消息"+message);
    10. }
    11. }
    12. }

    效果:

    如果【channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));】的第2个参数填info就只会发送消息给消费者1,填写error就只会发送消息给消费者2。

    Topics主题交换机

    发布(生产者)订阅(消费者)模式:

    消费者1:

    1. public class ReceiveLogsTopic01 {
    2. public static final String EXCHANGE_NAME="topic_logs";//交换机名称
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    6. String queueName="Q1";
    7. channel.queueDeclare(queueName,false,false,false,null);
    8. channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
    9. System.out.println("等待接收消息.....");
    10. DeliverCallback deliverCallback = (consumerTag,message)->{
    11. System.out.println(new String(message.getBody(),"UTF-8"));
    12. System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());
    13. };
    14. channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    15. }
    16. }

    消费者2:

    1. public class ReceiveLogsTopic02 {
    2. public static final String EXCHANGE_NAME="topic_logs";//交换机名称
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    6. String queueName="Q2";
    7. channel.queueDeclare(queueName,false,false,false,null);
    8. channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
    9. channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
    10. System.out.println("等待接收消息.....");
    11. DeliverCallback deliverCallback = (consumerTag,message)->{
    12. System.out.println(new String(message.getBody(),"UTF-8"));
    13. System.out.println("接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());
    14. };
    15. channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});
    16. }
    17. }

     生产者1:

    1. public class EmitLogTopic {
    2. public static final String EXCHANGE_NAME="topic_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. Map<String,String> bindingKeyMap = new HashMap<>();
    6. bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");
    7. bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");
    8. bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");
    9. bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
    10. bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
    11. bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
    12. bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
    13. bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
    14. for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
    15. String routingKey = bindingKeyEntry.getKey();
    16. String message = bindingKeyEntry.getValue();
    17. channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
    18. System.out.println("生产者发出消息:"+message);
    19. }
    20. }
    21. }

    结果:

    九、死信

    无法被消费消息被称为死信

    死信的来源:

    死信实战架构图:

    1个生产者2个消费者。生产者原本走正常交换机,消息走正常队列,被C1消费。当满足消息被拒绝,消息TTL过期,队列达到最大长度三者其一时,消息成为死信,会进入dead_exchange交换机,进入dead_queue死信队列,死信队列的信息由C2消费。

    消费者1:

    1. public class Consumer01 {
    2. //普通交换机的名称
    3. public static final String NORMAL_EXCHANGE ="normal_exchange";
    4. //死信交换机的名称
    5. public static final String DEAD_EXCHANGE = "dead_exchange";
    6. //普通队列的名称
    7. public static final String NORMAL_QUEUE = "normal_queue";
    8. //死信队列的名称
    9. public static final String DEAD_QUEUE = "dead_queue";
    10. public static void main(String[] args) throws Exception {
    11. Channel channel1 = RabbitMqUtils.getChannel();
    12. //声明死信和普通交换机,类型为direct
    13. channel1.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    14. channel1.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
    15. Map<String,Object> arguments = new HashMap<>(); //设置参数
    16. //正常队列设置死信交换机
    17. arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //****相当于正常的C1不能消费掉就通过这个交换机进行转发
    18. //设置死信RoutingKey
    19. arguments.put("x-dead-letter-routing-key", "lisi");
    20. //声明普通队列
    21. channel1.queueDeclare(NORMAL_QUEUE,false,false,false,arguments); //正常交换机不正常,需要将死信转发给死信队列
    22. //声明死信队列
    23. channel1.queueDeclare(DEAD_QUEUE,false,false,false,null);
    24. //绑定普通的交换机与队列
    25. channel1.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
    26. //绑定死信的交换机与死信的队列
    27. channel1.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    28. System.out.println("等待接收消息.....");
    29. DeliverCallback deliverCallback = (consumerTag,message)->{
    30. System.out.println("Consumer01接收的消息是:" + new String(message.getBody(),"UTF-8"));
    31. };
    32. channel1.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag->{});
    33. }
    34. }

    消费者2:

    1. public class Consumer02 {
    2. //死信队列的名称
    3. public static final String DEAD_QUEUE = "dead_queue";
    4. public static void main(String[] args) throws Exception {
    5. Channel channel1 = RabbitMqUtils.getChannel();
    6. System.out.println("等待接收消息.....");
    7. DeliverCallback deliverCallback = (consumerTag,message)->{
    8. System.out.println("Consumer02接收的消息是:" + new String(message.getBody(),"UTF-8"));
    9. };
    10. channel1.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
    11. }
    12. }

    生产者:

    1. public class Producer {
    2. public static final String NORMAL_EXCHANGE = "normal_exchange";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. //死信消息,设置TTL时间,time to live
    6. AMQP.BasicProperties properties = new AMQP.BasicProperties()
    7. .builder().expiration("10000").build();
    8. for (int i = 1; i < 11; i++) {
    9. String message = "info"+i;
    10. channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
    11. }
    12. }
    13. }

    实验步骤:首先确保rabbitMq网页中normal_queue和dead_queue队列都被删除干净,然后先启动Consumer01将交换机和队列声明,然后关闭Consumer01模拟无法接收的场景,然后启动Producer会发送10条消息,此时启动Consumer02,最终发送的消息会在10秒后变成死信,Consumer02会接收到这些信息。

    **易错点:假如队列已存在,会报错,我们可以点进队列,

    然后点击Delete Queue即可: 

    队列达到最大长度:

    更改Producer代码:

    更改Consumer01代码:

    实验步骤:首先确保rabbitMq网页中normal_queue和dead_queue队列都被删除干净,然后先启动Consumer01将交换机和队列声明,然后关闭Consumer01模拟假死,此时启动Consumer02和Producer,因为队列的最大容量为6,所以有4条消息会被发送到死信队列。

    拒绝应答:

    在Producer中确保设置TTL时间注释调,然后basicProperties改为null,

    在Consumer01中把长度限制注释掉,更改DeliverCallback后的代码,如下:

    1. DeliverCallback deliverCallback = (consumerTag,message)->{ //接收消息
    2. String msg = new String(message.getBody(), "UTF-8");
    3. if(msg.equals("info5")){
    4. System.out.println("此消息是被Consumer01拒绝的消息是:" +msg);
    5. channel1.basicReject(message.getEnvelope().getDeliveryTag(),false);
    6. } else {
    7. System.out.println("Consumer01接收的消息是:" +msg);
    8. channel1.basicAck(message.getEnvelope().getDeliveryTag(),false);
    9. }
    10. };
    11. //开启手动应答,如果自动应答了根本不存在拒绝的问题
    12. channel1.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag->{});

    注意一定要开启手动应答,手动拒绝消息info5,效果如下:

  • 相关阅读:
    学无止境,资深架构师—到底是如何阅读JDK源码的?
    Python类中的property的用法
    网络编程06-服务器编程非阻塞IO、多路复用
    2.1配置(AutoMapper官方文档翻译)
    罗克韦尔AB PLC 通过KEPServer实现与西门子1200PLC的以太网通信
    blender hardOps插件
    分布式事务最经典的八种解决方案
    基于Springboot+Vue的网上蛋糕销售系统(含源码数据库)
    删除字符串字符,使输出结果不包含回文串
    【MySQL入门实战1】-数据库三大范式
  • 原文地址:https://blog.csdn.net/RuanFun/article/details/133493479