• 【RabbitMQ】——发布订阅模式(Fanout)


    一、简介

    1. 发布订阅模式

    在前边的文章中已经介绍了rabbitmq的简单模式和工作模式,本文将介绍 发布订阅模式。
    将所有接收到的消息广播到它知道的所有队列中。

    实现方式采用Fanout类型的交换机,然后设置相同的routingKey 或者将routingKey设置为空字符串 “” 。

    生产者要点代码:

     /**
      * 声明一个交换机
      */
     channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
    • 1
    • 2
    • 3
    • 4
     /**
     * 发送消息
     * param1 交换机类型采用BuiltinExchangeType.FANOUT类型
     * param2 routingKey
     * param3 其他参数信息
     * param4 发送的消息体
     */
     hannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    消费者要点代码:

     /**
     * 绑定交换机和队列
     * param1 队列名称
     * param2 交换机名称
     * param3 routingkey
     */
     channel.queueBind(queue,EXCHANGE_NAME,"");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2. 系统默认的发布订阅交换机

    在这里插入图片描述

    二、实现

    在这里插入图片描述

    1. 生产者

    生产者首先声明一个BuiltinExchangeType.FANOUT类型的交换机,然后再设置routingKey为空。

    package com.rabbitmqDemo.rabbitmq.five;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 将消息发送到交换机
     */
    public class LogProducer {
    
        private static final String EXCHANGE_NAME = "logs_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                /**
                 * 发送消息
                 * param1 发送到哪个交换机
                 * param2 routingKey
                 * param3 其他参数信息
                 * param4 发送的消息体
                 */
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("message send end : " + message);
    
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    2. 消费者

    消费者首先声明一个临时队列,然后再将临时队列和发布订阅类型的交换机进行绑定。

    package com.rabbitmqDemo.rabbitmq.five;
    
    import com.rabbitmq.client.*;
    import com.rabbitmqDemo.rabbitmq.utils.RabbitMqUtils;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 将接收到的消息打印到控制台
     */
    public class Logworker01 {
    
       private static final String EXCHANGE_NAME = "logs_exchange";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Channel channel = RabbitMqUtils.getChannel();
    
            /**
             * 声明一个交换机
             */
             channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
            /**
             * 声明一个临时队列
             * 队列名称随机
             * 消费者断开与队列的连接时,队列就删除了
             */
            String queue = channel.queueDeclare().getQueue();
    
            /**
             * 绑定交换机和队列
             * param1 队列名称
             * param2 交换机名称
             * param3 routingkey
             */
            channel.queueBind(queue,EXCHANGE_NAME,"");
    
            System.out.println("wait receive message ,print message to console... ");
    
            //声明 消费者成功消费的回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Logworker01-message:" + new String(message.getBody(),"UTF-8"));
            };
            //声明 取消消息时的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("Logworker01-消息消费被中断-" + consumerTag);
            };
    
            /**
             * 消费者消费消息
             * param1 队列名称
             * param2 消费成功之后是否自动应答,true 代表自动应答,false表示不自动应答
             * param3 消费者成功消费的回调
             * param4 消费者取消消费回调
             */
            System.out.println("worker02等待接收消息......");
            channel.basicConsume(queue, true, deliverCallback, cancelCallback);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    生产者发布消息,所有消费者都会接收到相同的消息

  • 相关阅读:
    喹啉羧酸类 DHODH 抑制剂用于治疗急性髓系白血病
    OpenFeign
    【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch
    让学指针变得更简单(一)
    集群所有进程查看脚本xcall.sh
    阿里云云效 Maven
    【lua】tolua(Luajit) require 重复加载了的问题(路径混用斜杠/和点.造成的问题)
    如何全面升级spring-boot-2.x及Spring-security-oauth2
    C++项目——云备份-⑤-数据管理模块的设计与实现
    Java操作k8s api示例:使用kubeconfig文件认证;获取所有pod;获取pod内应用容器的启动日志
  • 原文地址:https://blog.csdn.net/qq_42000631/article/details/126342675