问题的由来:
城市A有rabbitmqA,城市B有rabbitmqB,当城市B的应用要发消息到exchangeA的时候,会因为网络原因,导致发送时间延时。
federation-exchange的作用:
federation提供了一个能力,让城市B的mq去接收exchangeA的消息,然后再把消息转发到城市A的exchangeA
准备两台rabbitmq服务,保证每台节点单独运行
在每台机器上开启federation相关插件
rabbitmq-plugins enable rabbitmq_federation --offline
rabbitmq-plugins enable rabbitmq_federation_managemen --offline
开启后在管理台页面发现新增选项卡
运行ConsumerFeb
的代码
ConsumerFeb
代码创建了fed-queue
和fed-exchange
/**
* Feb 消息消费者
*/
public class ConsumerFeb {
private static final String QUEUE_NAME="fed-queue";
private static final String EXCHANGE_NAME="fed-exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("172.16.140.131");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"aaa");
System.out.println("等待接收消息");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String result = new String(message.getBody());
System.out.println("消费者接收到消息,消息内容为:"+result);
};
//取消消费的一个回调接口
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
Feb添加upstream
添加policy
查看federation status
添加ConsumerJan消费者代码
/**
* Jan 消息消费者
*/
public class ConsumerJan {
private static final String QUEUE_NAME="federation-queue";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("172.16.140.130");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("等待接收消息");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, message) -> {
String result = new String(message.getBody());
System.out.println("消费者接收到消息,消息内容为:"+result);
};
//取消消费的一个回调接口
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
添加生产者代码
/**
* federation-exchange-消息生产者
*/
public class Producer {
private static final String EXCHANGE_NAME="fed-exchange";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("172.16.140.130");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
//获取连接
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME, "aaa", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕");
}
}
启动消费者Feb,消费者Jan,再启动生产者
开启插件
rabbitmq-plugins enable rabbitmq_shovel --offline
rabbitmq-plugins enable rabbitmq_shovel_management --offline