• RabbitMQ--交换机


    目录

    绑定 bindings

    Fanout exchange

    #Fanout 介绍

    Fanout 实战

    Direct exchange(直连模式)

    Topics exchange

    #Topic 的介绍

    Topic 匹配案例


    生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange)

    Exchanges 的类型:

    ​ 直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

     

    无名exchange:

    第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

     

    绑定 bindings

    binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系

     

    Fanout exchange

    #Fanout 介绍

    Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中。系统中默认有些 exchange 类型

    Fanout 实战

    ①ReceiveLogs01 将接收到的消息打印在控制台

    1. public class ReceiveLogs01 {
    2. //定义交换机的名字
    3. private static final String EXCHANGE_NAME = "logs";
    4. public static void main(String[] args) throws Exception {
    5. Channel channel= RabbitMQUtil.getChannel();
    6. //创建一个交换机
    7. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    8. /**
    9. * 生成一个临时的队列 队列的名称是随机的
    10. * 当消费者断开和该队列的连接时 队列自动删除
    11. */
    12. String queueName = channel.queueDeclare().getQueue();
    13. //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
    14. channel.queueBind(queueName, EXCHANGE_NAME, "");
    15. System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
    16. //发送回调
    17. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    18. String message = new String(delivery.getBody(), "UTF-8");
    19. System.out.println("控制台打印接收到的消息" + message);
    20. };
    21. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    22. }
    23. }

     

     

     ②ReceiveLogs02 把消息写出到文件

    1. public class ReceiveLogs02 {
    2. private static final String EXCHANGE_NAME = "logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    6. /**
    7. * 生成一个临时的队列 队列的名称是随机的
    8. * 当消费者断开和该队列的连接时 队列自动删除
    9. */
    10. String queueName = channel.queueDeclare().getQueue();
    11. //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
    12. channel.queueBind(queueName, EXCHANGE_NAME, "");
    13. System.out.println("等待接收消息,把接收到的消息写到文件........... ");
    14. //发送回调
    15. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    16. String message = new String(delivery.getBody(), "UTF-8");
    17. File file = new File("D:\\test\\rabbitmq_info.txt");
    18. FileUtils.writeStringToFile(file,message,"UTF-8");
    19. System.out.println("数据写入文件成功");
    20. };
    21. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    22. }
    23. }

    ③EmitLog 发送消息给两个消费者接收:

    1. public class EmitLog {
    2. private static final String EXCHANGE_NAME = "logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. /**
    6. * 声明一个 exchange
    7. * 1.exchange 的名称
    8. * 2.exchange 的类型
    9. */
    10. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    11. Scanner sc = new Scanner(System.in);
    12. System.out.println("请输入信息");
    13. while (sc.hasNext()) {
    14. String message = sc.nextLine();
    15. //将消息发送给交换机,再有交换机绑定队列,在传递给队列
    16. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
    17. System.out.println("生产者发出消息" + message);
    18. }
    19. }
    20. }

     

     

    ④启动运行

     

     ReceiveLogs01接收笑死

     ReceiveLogs02写入文件

     进入磁盘查看

     

    Direct exchange(直连模式)

    简单理解:需要满足条件才可接收,不满足的直接丢弃变成死信消息

     

    #Direct 实战

    ①创建ReceiveLogsDirect01消费者

     

    1. public class ReceiveLogsDirect01 {
    2. private static final String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    6. String queueName = "disk";
    7. //队列声明
    8. channel.queueDeclare(queueName, false, false, false, null);
    9. //队列绑定
    10. channel.queueBind(queueName, EXCHANGE_NAME, "error");
    11. System.out.println("等待接收消息...");
    12. //发送回调
    13. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    14. String message = new String(delivery.getBody(), "UTF-8");
    15. message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
    16. System.out.println("error 消息已经接收:\n" + message);
    17. };
    18. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    19. });
    20. }
    21. }

     

    ② 创建ReceiveLogsDirect02消费者

    1. public class ReceiveLogsDirect02 {
    2. private static final String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    6. String queueName = "console";
    7. //队列声明
    8. channel.queueDeclare(queueName, false, false, false, null);
    9. //队列绑定
    10. channel.queueBind(queueName, EXCHANGE_NAME, "info");
    11. channel.queueBind(queueName, EXCHANGE_NAME, "warning");
    12. System.out.println("等待接收消息...");
    13. //发送回调
    14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    15. String message = new String(delivery.getBody(), "UTF-8");
    16. message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
    17. System.out.println("info和warning 消息已经接收:\n" + message);
    18. };
    19. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    20. });
    21. }
    22. }

     ③EmitLogDirect生产者

    1. public class EmitLogDirect {
    2. private static final String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    6. //创建多个 bindingKey
    7. Map<String, String> bindingKeyMap = new HashMap<>();
    8. bindingKeyMap.put("info", "普通 info 信息");
    9. bindingKeyMap.put("warning", "警告 warning 信息");
    10. bindingKeyMap.put("error", "错误 error 信息");
    11. //debug 没有消费这接收这个消息 所有就丢失了
    12. bindingKeyMap.put("debug", "调试 debug 信息");
    13. for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
    14. //获取 key value
    15. String bindingKey = bindingKeyEntry.getKey();
    16. String message = bindingKeyEntry.getValue();
    17. channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
    18. System.out.println("生产者发出消息:" + message);
    19. }
    20. }
    21. }

     

    ④启动运行

     

     ReceiveLogsDirect01接收消息

     

     ReceiveLogsDirect02接收消息

     

    Topics exchange

    #Topic 的介绍

    发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表以点号分隔开。这些单词可以是任意单词

    在这个规则列表中,其中有两个替换符是大家需要注意的:

    • *(星号)可以代替一个单词
    • #(井号)可以替代零个或多个单词

    Topic 匹配案例

    ①ReceiveLogsTopic01消费者

    1. public class ReceiveLogsTopic01 {
    2. private static final String EXCHANGE_NAME = "topic_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    6. //声明 Q1 队列与绑定关系
    7. String queueName = "Q1";
    8. //声明
    9. channel.queueDeclare(queueName, false, false, false, null);
    10. //绑定
    11. channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
    12. System.out.println("等待接收消息........... ");
    13. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    14. String message = new String(delivery.getBody(), "UTF-8");
    15. System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
    16. };
    17. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    18. });
    19. }
    20. }

     

    ②ReceiveLogsTopic02消费者

    1. public class ReceiveLogsTopic02 {
    2. private static final String EXCHANGE_NAME = "topic_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    6. //声明 Q2 队列与绑定关系
    7. String queueName = "Q2";
    8. //声明
    9. channel.queueDeclare(queueName, false, false, false, null);
    10. //绑定
    11. channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
    12. channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
    13. System.out.println("等待接收消息........... ");
    14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    15. String message = new String(delivery.getBody(), "UTF-8");
    16. System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message);
    17. };
    18. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    19. });
    20. }
    21. }

     

    ③EmitLogTopic生产者

    1. public class EmitLogTopic {
    2. private static final String EXCHANGE_NAME = "topic_logs";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel= RabbitMQUtil.getChannel();
    5. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    6. /**
    7. * Q1-->绑定的是
    8. * 中间带 orange 带 3 个单词的字符串(*.orange.*)
    9. * Q2-->绑定的是
    10. * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
    11. * 第一个单词是 lazy 的多个单词(lazy.#)
    12. *
    13. */
    14. Map<String, String> bindingKeyMap = new HashMap<>();
    15. bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
    16. bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
    17. bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
    18. bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
    19. bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
    20. bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
    21. bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
    22. bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
    23. for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
    24. String bindingKey = bindingKeyEntry.getKey();
    25. String message = bindingKeyEntry.getValue();
    26. channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
    27. System.out.println("生产者发出消息:" + message);
    28. }
    29. }
    30. }

    ④启动运行

     ReceiveLogsTopic01接收消息

    ReceiveLogsTopic0接收消息

     

  • 相关阅读:
    来实现一个DataStore的封装吧
    基于微信小程序的语言课学习系统设计与实现(源码+lw+部署文档+讲解等)
    Rpc-实现Client对ZooKeeper的服务监听
    扫描线及其应用
    【Linux】进程基础
    【目标检测】SSD损失函数详解
    JVM(Java虚拟机模型、Java运行时数据区模型)
    力扣labuladong——一刷day05
    2022 年值得尝试的 7 个 MQTT 客户端工具
    HCIA网络课程第九周作业
  • 原文地址:https://blog.csdn.net/lu__lala/article/details/125456926