RabbitMQ消息传递模型的核心思想:生产者生产的消息不会直接发送到队列上,生产者只能将消息发送到交换机(exchange),交换机的工作内容一方面接收来自生产者的消息,另一方面,将接收到的消息推入队列。另一方面将消息推入队列需要知道消息的处理策略,这就是由交换机的类型来决定了。
总共有以下几种类型
直接类型(direct),主题(topic),标题(headers),扇出(fanout)
Exchanges类型 | 别名 | routingKey | 策略 |
---|---|---|---|
Fanout(扇出) | 广播模式 | 空字符串 | 将接收到的消息广播到它知道的所有队列中 |
Direct(直接) | 直接模式 | black… | 消息支取到它绑定的routingKey队列中,当绑定的多个队列的Key都相同,那表现就与fanout类型相似了 |
Topic(主题) | 主题模式 | 单词组合,以点分开 *(星号)可以代替一个单词,#(井号)可以替代零个或多个单词 | 可以通过通配符的方式来发送消息 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了 |
我们之前在发送消息的时候,之所以能实现的原因是使用了默认交换机,通过空字符串""
进行标识。
/**
* 1. 交换机
* 2. 队列名
* 3. 参数 消息持久化(保存在磁盘上)
* 4. 消息
*/
channel.basicPublish("",QueueName.Ack_Queue.getName(), MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
第一个参数就是交换机的名称,空字符串则表示默认或无名交换机。
UI管理界面中可以看到
我们之前去使用一个队列,都会有一个队列的名称,那如果我们连接rabbitMQ的时候都需要一个随机名称的空队列,那么我们可以创建一个具有随机名称的队列,但当我们一点断开了消费者的连接,队列就会被自动删除。
创建临时队列的方式如下
/**
* 声明一个临时队列
* 生成一个零时队列,队列的名称是随机的
* 当消费者断开与队列的连接时,队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
创建出来 在UI中可以看到
bingding是exchange和queue之间的桥梁,它告诉我们exchange和哪个队列进行了绑定关系,它是一个绑定的关系值。
fanout这种类型非常的简单,它是将接收到的所有消息广播到它知道的所有的队列中。
Logs和临时队列的绑定关系如下图
消费者1 ReceiveLogs01
/**
* 消费者负责消费消息
*/
public class ReceiveLogs01 {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//声明一个交换机
channel.exchangeDeclare(ExchangeName.Log_Exchange.getName(), "fanout");
/**
* 声明一个临时队列
* 生成一个零时队列,队列的名称是随机的
* 当消费者断开与队列的连接时,队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queueName,ExchangeName.Log_Exchange.getName(), "");
//等待接收消息
DeliverCallback deliverCallback=(consumerTag,message)->{
System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
System.out.println("消费者A-正在接收消息");
channel.basicConsume(queueName,true,deliverCallback,(consumerTag,message)->{});
}
}
消费者2 ReceiveLogs02
/**
* 消费者负责消费消息
*/
public class ReceiveLogs02 {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//声明一个交换机
channel.exchangeDeclare(ExchangeName.Log_Exchange.getName(), "fanout");
/**
* 声明一个临时队列
* 生成一个零时队列,队列的名称是随机的
* 当消费者断开与队列的连接时,队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queueName,ExchangeName.Log_Exchange.getName(), "");
//等待接收消息
DeliverCallback deliverCallback=(consumerTag,message)->{
System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
System.out.println("消费者B-正在接收消息");
channel.basicConsume(queueName,true,deliverCallback,(consumerTag,message)->{});
}
}
生产者 EmitLog
/**
* 发消息给交换机
*/
public class EmitLog {
public static void main(String[] args)throws Exception {
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(ExchangeName.Log_Exchange.getName(), "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(ExchangeName.Log_Exchange.getName(), "",null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息: "+message);
}
}
}
生产者
消费者A
消费者B
在上一个部分我们将信息分享给所有的消费者,但如果我们希望将消息纷发给不同的消费者的话,fanout这种交换类型就不能给我们带来很大的灵活性,这里我们就要使用Direct类型进行替换,这种工作方式的策略是:消息只会去到它绑定的routingKey队列中去。
Direct_Logs绑定队列关系如下
消费者1 ReceiveLogsDirect01
public class ReceiveLogsDirect01 {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(ExchangeName.Direct_Logs.getName(), BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare(QueueName.Console_Queue.getName(), false,false,false,null);
channel.queueBind(QueueName.Console_Queue.getName(), ExchangeName.Direct_Logs.getName(),"info");
channel.queueBind(QueueName.Console_Queue.getName(), ExchangeName.Direct_Logs.getName(),"warning");
//接收消息
//等待接收消息
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
System.out.println("消费者A-正在接收消息 (接收info,warning)");
//消费者接收消息
channel.basicConsume(QueueName.Console_Queue.getName(), true,deliverCallback,consumerTag->{});
}
}
消费者2 ReceiveLogsDirect02
public class ReceiveLogsDirect02 {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(ExchangeName.Direct_Logs.getName(), BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare(QueueName.Disk_Queue.getName(), false,false,false,null);
channel.queueBind(QueueName.Disk_Queue.getName(), ExchangeName.Direct_Logs.getName(),"error");
// channel.queueBind(QueueName.Disk_Queue.getName(), ExchangeName.Direct_Logs.getName(),"warning");
//接收消息
//等待接收消息
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
};
System.out.println("消费者B-正在接收消息 (接收error)");
//消费者接收消息
channel.basicConsume(QueueName.Disk_Queue.getName(), true,deliverCallback,consumerTag->{});
}
}
生产者 EmitLogTwo
public class EmitLogTwo {
public static void main(String[] args)throws Exception {
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(ExchangeName.Direct_Logs.getName(), BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.print("请输入你需要发送的日志类型:");
String logType = scanner.next();
if("warning".equals(logType)||"info".equals(logType)||"error".equals(logType)){
System.out.print("请输入你需要发送的消息:");
String message = scanner.next();
channel.basicPublish(ExchangeName.Direct_Logs.getName(), logType,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息: "+message);
}else {
System.out.println("类型输入错误!请重新输入");
continue;
}
}
}
}
生产者
消费者A
消费者B
当exchange的类型为direct,但是它绑定的多个队列的key都相同,这种情况下虽然绑定的类型是direct,但是它的表现就和fanout有点类似了,和广播模式差不多,发送的消息会被相同绑定key的接收到。
当exchange向一个没有绑定queue(队列)的routingKey发送消息时,由于没有绑定关系,所以消息最终会被丢弃。
尽管direct交换机解决了部分纷发的局限性,topic类型在direct的基础上,可以在绑定routingkey的基础上可以使用通配符,更加的灵活。
Topic的要求
topic交换机的绑定key不可以随意填写,必须满足一定的要求
必须是一个单词列表,以点号隔开
可以使用通配符
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
Topic_Logs绑定队列关系如下
消费者1 ReceiveLogsTopic01
/**
* 主题模式消费者
*/
public class ReceiveLogsTopic01 {
public static void main(String[] args)throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(ExchangeName.Topic_Logs.getName(), BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QueueName.Q1_Queue.getName(), false,false,false,null);
//绑定队列
channel.queueBind(QueueName.Q1_Queue.getName(), ExchangeName.Topic_Logs.getName(), "*.orange.*");
System.out.println("主题模式消费者T1正在等待消息... (*.orange.*)");
//接收消息
channel.basicConsume(QueueName.Q1_Queue.getName(),true,(consumerTag,message)->{
System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
},consumerTag->{});
}
}
消费者2 ReceiveLogsTopic02
/**
* 主题模式消费者
* 通配符 *表示一个单词,#代表一个或多个单
*/
public class ReceiveLogsTopic02 {
public static void main(String[] args)throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//声明交换机
channel.exchangeDeclare(ExchangeName.Topic_Logs.getName(), BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QueueName.Q2_Queue.getName(), false,false,false,null);
//绑定队列
channel.queueBind(QueueName.Q2_Queue.getName(), ExchangeName.Topic_Logs.getName(), "*.*.rabbit");
channel.queueBind(QueueName.Q2_Queue.getName(), ExchangeName.Topic_Logs.getName(), "lazy.#");
System.out.println("主题模式消费者T2正在等待消息... (*.*.rabbit)(lazy.#)");
//接收消息
channel.basicConsume(QueueName.Q2_Queue.getName(),true,(consumerTag,message)->{
System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
},consumerTag->{});
}
}
生产者 EmitLogThree
public class EmitLogThree {
public static void main(String[] args)throws Exception {
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(ExchangeName.Topic_Logs.getName(), BuiltinExchangeType.TOPIC);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.print("请输入你需要发送的routingKey:");
String logType = scanner.next();
System.out.print("请输入你需要发送的消息:");
String message = scanner.next();
channel.basicPublish(ExchangeName.Topic_Logs.getName(), logType,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息: "+message);
}
}
}
生产者
消费者A
消费者B
注意:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就像是 direct 了