• rabbit的扇出模式(fanout发布订阅)的生产者与消费者使用案例


    扇出模式 fanout 发布订阅模式
    生产者

    生产者发送消息到交换机(logs),控制台输入消息作为生产者的消息发送

    package com.esint.rabbitmq.work03;
    
    import com.esint.rabbitmq.RabbitMQUtils;
    import com.rabbitmq.client.Channel;
    
    import java.util.Scanner;
    
    public class EmitLog {
    
    
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMQUtils.getChannel();
    
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
            Scanner scanner = new Scanner(System.in);
    
            while(scanner.hasNext()){
                String mes = scanner.next();
                channel.basicPublish(EXCHANGE_NAME,"",null,mes.getBytes("UTF-8"));
                System.out.println("生产者发出消息:"+mes.toString());
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    交换机如下:
    在这里插入图片描述
    发布者如下:
    在这里插入图片描述

    消费者

    创建两个消费来同时消费生产者的发布消息。声明两个临时队列,与交换机做绑定,消费消息

    消费者01
    package com.esint.rabbitmq.work03;
    
    
    import com.esint.rabbitmq.RabbitMQUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    /**
     * 消费者01的消息接受
     */
    public class ReceiveLog01 {
    
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
    
            //获取信道
            Channel channel = RabbitMQUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
            /**
             *声明一个队列  队列名字随机
             * 消费者断开与队列的链接后 队列自动删除
             */
            String queue = channel.queueDeclare().getQueue();
    
            /**
             * 绑定交换机与队列
             * 1.队列名字
             */
            channel.queueBind(queue,EXCHANGE_NAME,"");
            System.out.println("wait print message...");
    
            DeliverCallback deliverCallback = (var,var2)->{
                System.out.println("01:"+new String(var2.getBody()));
            };
            channel.basicConsume(queue,true,deliverCallback, var->{});
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    消费者02
    package com.esint.rabbitmq.work03;
    
    
    import com.esint.rabbitmq.RabbitMQUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DeliverCallback;
    
    /**
     * 消费者02的消息接受
     */
    public class ReceiveLog02 {
    
        public static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws Exception {
    
            //获取信道
            Channel channel = RabbitMQUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
            /**
             *声明一个队列  队列名字随机
             * 消费者断开与队列的链接后 队列自动删除
             */
            String queue = channel.queueDeclare().getQueue();
    
            /**
             * 绑定交换机与队列
             * 1.队列名字
             */
            channel.queueBind(queue,EXCHANGE_NAME,"");
            System.out.println("wait print message...");
    
            DeliverCallback deliverCallback = (var,var2)->{
                System.out.println("02:"+new String(var2.getBody(),"UTF-8"));
            };
            channel.basicConsume(queue,true,deliverCallback, var->{});
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    两个临时队列:
    在这里插入图片描述

    运行测试生产者:
    在这里插入图片描述

    运行测试消费者:
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    由浅入深,从掌握Promise的基本使用到手写Promise
    Python+大数据-知行教育(三)-访问咨询主题看板_增量流程
    Springboot+基于微信小程序的商城 毕业设计-附源码191145
    Vue项目实战——【基于 Vue3.x + Vant UI】实现一个多功能记账本(项目演示、涉及知识点、源码分享)
    代码随想录Day24 LeetCode T491 递增子序列 LeetCode T46 全排列 LrrtCode T47 全排列II
    网络分析笔记08:解析TCP/IP的TCP协议
    TimeLine的使用
    使用html+css实现一个静态页面(含源码)
    查理复用-小笔记
    【Vue3】Vue3中监视watch和watchEffect使用(图文+代码)
  • 原文地址:https://blog.csdn.net/qq_17040587/article/details/134435505