• 15.federation


    federation和shovel

    federation-exchange

    问题的由来:

    城市A有rabbitmqA,城市B有rabbitmqB,当城市B的应用要发消息到exchangeA的时候,会因为网络原因,导致发送时间延时。

    federation-exchange的作用:

    federation提供了一个能力,让城市B的mq去接收exchangeA的消息,然后再把消息转发到城市A的exchangeA

    在这里插入图片描述

    案例演示

    • 准备两台rabbitmq服务,保证每台节点单独运行

      • rabbit@Jan
      • rabbit@Feb
    • 在每台机器上开启federation相关插件

      rabbitmq-plugins enable rabbitmq_federation --offline
      rabbitmq-plugins enable rabbitmq_federation_managemen --offline
      
      • 1
      • 2
    • 开启后在管理台页面发现新增选项卡

      在这里插入图片描述

    • 运行ConsumerFeb的代码

      • 通过ConsumerFeb代码创建了fed-queuefed-exchange
      /**
       * Feb 消息消费者
       */
      public class ConsumerFeb {
      
          private static final String  QUEUE_NAME="fed-queue";
          private static final String  EXCHANGE_NAME="fed-exchange";
      
          public static void main(String[] args) throws Exception {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setHost("172.16.140.131");
              connectionFactory.setUsername("admin");
              connectionFactory.setPassword("123456");
      
              Connection connection = connectionFactory.newConnection();
              Channel channel = connection.createChannel();
      
              channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
      
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
      
              channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"aaa");
              
              System.out.println("等待接收消息");
      
              //推送的消息如何进行消费的接口回调
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                String result = new String(message.getBody());
                  System.out.println("消费者接收到消息,消息内容为:"+result);
              };
      
              //取消消费的一个回调接口
              CancelCallback cancelCallback = consumerTag -> {
                  System.out.println("消息消费被中断");
              };
      
              channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
    • Feb添加upstream

    在这里插入图片描述

    • 添加policy

      在这里插入图片描述

    • 查看federation status

      在这里插入图片描述

    • 添加ConsumerJan消费者代码

      /**
       *  Jan 消息消费者
       */
      public class ConsumerJan {
      
          private static final String  QUEUE_NAME="federation-queue";
      
          public static void main(String[] args) throws Exception {
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setHost("172.16.140.130");
              connectionFactory.setUsername("admin");
              connectionFactory.setPassword("123456");
      
              Connection connection = connectionFactory.newConnection();
              Channel channel = connection.createChannel();
              channel.queueDeclare(QUEUE_NAME,false,false,false,null);
      
      
              System.out.println("等待接收消息");
      
              //推送的消息如何进行消费的接口回调
              DeliverCallback deliverCallback = (consumerTag, message) -> {
                String result = new String(message.getBody());
                  System.out.println("消费者接收到消息,消息内容为:"+result);
              };
      
              //取消消费的一个回调接口
              CancelCallback cancelCallback = consumerTag -> {
                  System.out.println("消息消费被中断");
              };
      
              channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
    • 添加生产者代码

      /**
       * federation-exchange-消息生产者
       */
      public class Producer {
      
          private static final String  EXCHANGE_NAME="fed-exchange";
      
          public static void main(String[] args) throws Exception {
              //创建一个连接工厂
              ConnectionFactory connectionFactory = new ConnectionFactory();
              connectionFactory.setHost("172.16.140.130");
              connectionFactory.setUsername("admin");
              connectionFactory.setPassword("123456");
              //获取连接
              Connection connection = connectionFactory.newConnection();
              Channel channel = connection.createChannel();
              String message = "hello world";
              channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes(StandardCharsets.UTF_8));
      
              System.out.println("消息发送完毕");
      
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
    • 启动消费者Feb,消费者Jan,再启动生产者

      • 发现生产者发送消息到Jan,通过federation将消息转发到Feb,在消费者Feb中得到输出

    federation-queue

    在这里插入图片描述

    shovel

    • 开启插件

      rabbitmq-plugins enable rabbitmq_shovel --offline
      rabbitmq-plugins enable rabbitmq_shovel_management --offline
      
      • 1
      • 2

    在这里插入图片描述

  • 相关阅读:
    递归与lamdba与高阶函数
    【IOC,AOP】spring的基础概念
    Arm Neoverse 路线图再添新品,基于四项关键原则打造
    【FreeRtos任务通知详解】
    C++ STL
    前端html实现带行号的文本编辑器
    想知道哪些软件可以做到免费去视频水印吗?这篇文章告诉你答案
    K折交叉验证——cross_val_score函数使用说明
    新手必会的静态站点生成器——Gridsome
    如何在macOS上使用最新版的Bison来构建项目,而不是Xcode工具链内嵌的2.3版本
  • 原文地址:https://blog.csdn.net/qq_34932086/article/details/125995470