目录
在工作模式中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为 ”发布/订阅模式”.

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

有下面几大类型:
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
在工作模式中,我们没有使用到交换机(名字指定为 " "),但消息也能正常发送。这是因为我们使用了默认交换。消息从路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的(之前设置为了null)。
之前我们一直使用的都是特定名称的队列,如"hello"。其实我们可以创建一个具有随机名称 的队列,或者说让服务器为我们选择一个随机队列名称。一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和哪个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
Fanout 这种类型非常简单,它是将接收到的所有消息广播到它知道的所有队列中。
接下来就以下面的图为例进行演示:

ReceiveLogs01:
- class ReceiveLogs01 implements Callable {
- private static final String EXCHANGE_NAME ="logs";
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- //2.声明一个临时的队列
- String queueName = channel.queueDeclare().getQueue();
- //3.将临时队列与交换机绑定,参数:(队列名,交换机名,绑定key)
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- System.out.println("ReceiveLogs01等待接收消息...");
-
- //1).成功的回调消息
- DeliverCallback deliverCallback=(consumerTag, delivery)->{
- String message= new String(delivery.getBody());
- System.out.println("ReceiveLogs01收到了消息:"+message);
- };
- //2).取消的回调信息
- CancelCallback cancelCallback=(consumerTag)->{
- System.out.println("消息消费被中断");
- };
- //4.等待接受消息
- channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
- return null;
- }
- }
ReceiveLogs02:
- class ReceiveLogs02 implements Callable {
- private static final String EXCHANGE_NAME ="logs";
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- //2.声明一个临时的队列
- String queueName = channel.queueDeclare().getQueue();
- //3.将临时队列与交换机绑定,参数:(队列名,交换机名,绑定key)
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- System.out.println("ReceiveLogs02等待接收消息...");
-
- //1).成功的回调消息
- DeliverCallback deliverCallback=(consumerTag, delivery)->{
- String message= new String(delivery.getBody());
- System.out.println("ReceiveLogs02收到了消息:"+message);
- };
- //2).取消的回调信息
- CancelCallback cancelCallback=(consumerTag)->{
- System.out.println("消息消费被中断");
- };
- //4.等待接受消息
- channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
- return null;
- }
- }
EmitLog:
- class EmitLog implements Callable{
- private static final String EXCHANGE_NAME = "logs";
-
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- String msg="我在测试交换机的功能!";
- //2.发布消息
- channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8));
- return null;
- }
- }
测试:
- public static void main(String[] args) throws InterruptedException {
- ExecutorService service= Executors.newFixedThreadPool(10);
- service.submit(new ReceiveLogs01());
- service.submit(new ReceiveLogs02());
- Thread.sleep(1000);
- service.submit(new EmitLog());
- }
-
-
- //结果:
- ReceiveLogs01等待接收消息...
- ReceiveLogs02等待接收消息...
-
- ReceiveLogs02收到了消息:我在测试交换机的功能!
- ReceiveLogs01收到了消息:我在测试交换机的功能!
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储警告(warning)或信息(info)。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是消息只推送到它绑定的 routingKey 队列中去。

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black、green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
演示:
C1:
- class C1 implements Callable {
- private static final String EXCHANGE_NAME ="direct_logs";
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //2.声明一个队列
- channel.queueDeclare("Q1",false,false,false,null);
- //3.将队列与交换机绑定,参数:(队列名,交换机名,绑定key)
- channel.queueBind("Q1",EXCHANGE_NAME,"orange");
- System.out.println("C1等待接收消息...");
-
- //1).成功的回调消息
- DeliverCallback deliverCallback=(consumerTag, delivery)->{
- String message= new String(delivery.getBody());
- System.out.println("C1收到了消息:"+message);
- };
- //2).取消的回调信息
- CancelCallback cancelCallback=(consumerTag)->{
- System.out.println("消息消费被中断");
- };
- //4.等待接受消息
- channel.basicConsume("Q1",true,deliverCallback,cancelCallback);
- return null;
- }
- }
C2:
- class C2 implements Callable {
- private static final String EXCHANGE_NAME ="direct_logs";
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- //2.声明一个队列
- channel.queueDeclare("Q2",false,false,false,null);
- //3.将队列与交换机绑定,参数:(队列名,交换机名,绑定key)
- channel.queueBind("Q2",EXCHANGE_NAME,"black");
- channel.queueBind("Q2",EXCHANGE_NAME,"green");
- System.out.println("C2等待接收消息...");
-
- //1).成功的回调消息
- DeliverCallback deliverCallback=(consumerTag, delivery)->{
- String message= new String(delivery.getBody());
- System.out.println("C2收到了消息:"+message);
- };
- //2).取消的回调信息
- CancelCallback cancelCallback=(consumerTag)->{
- System.out.println("消息消费被中断");
- };
- //4.等待接受消息
- channel.basicConsume("Q2",true,deliverCallback,cancelCallback);
- return null;
- }
- }
EmitLogDirect:
- class EmitLogDirect implements Callable {
- private static final String EXCHANGE_NAME = "direct_logs";
-
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
- //2.发布消息
- //用map封装三种类型的消息,对应三个bingKey的情况.发送三次
- Map
msgs=new HashMap<>(); - msgs.put("orange","orange信息");
- msgs.put("black","black信息");
- msgs.put("green","green信息");
- for (Map.Entry
bindingKeyEntry: msgs.entrySet()) { - String bindingKey = bindingKeyEntry.getKey();
- String message = bindingKeyEntry.getValue();
- channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
- Thread.sleep(1000);
- }
- return null;
- }
- }
测试:
- public static void main(String[] args) throws InterruptedException {
- ExecutorService service= Executors.newFixedThreadPool(10);
- service.submit(new C1());
- service.submit(new C2());
- Thread.sleep(1000);
- service.submit(new EmitLogDirect());
- }
-
-
- //结果:
- C1等待接收消息...
- C2等待接收消息...
-
- C1收到了消息:orange信息
- C2收到了消息:green信息
- C2收到了消息:black信息
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的消息类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息(要求更严苛了),那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型
发送到类型是 topic 交换机的消息的 routing_key(bingingKey) 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。当然这个单词列表最多不能超过 255 个字节。
其中,
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
(当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了;
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了)

按照上面的图,若发送下面的消息,则消息的接收结果应该如下:
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
C1:
- class C1 implements Callable {
- private static final String EXCHANGE_NAME ="topic_logs";
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //2.声明一个队列
- channel.queueDeclare("Q1",false,false,false,null);
- //3.将队列与交换机绑定,参数:(队列名,交换机名,绑定key)
- channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
- System.out.println("C1等待接收消息...");
-
- //1).成功的回调消息
- DeliverCallback deliverCallback=(consumerTag, delivery)->{
- String message= new String(delivery.getBody());
- System.out.println("C1收到了消息:"+message);
- };
- //2).取消的回调信息
- CancelCallback cancelCallback=(consumerTag)->{
- System.out.println("消息消费被中断");
- };
- //4.等待接受消息
- channel.basicConsume("Q1",true,deliverCallback,cancelCallback);
- return null;
- }
- }
C2:
- class C2 implements Callable {
- private static final String EXCHANGE_NAME ="topic_logs";
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //2.声明一个队列
- channel.queueDeclare("Q2",false,false,false,null);
- //3.将队列与交换机绑定,参数:(队列名,交换机名,绑定key)
- channel.queueBind("Q2",EXCHANGE_NAME,"*.*.rabbit");
- channel.queueBind("Q2",EXCHANGE_NAME,"lazy.#");
- System.out.println("C2等待接收消息...");
-
- //1).成功的回调消息
- DeliverCallback deliverCallback=(consumerTag, delivery)->{
- String message= new String(delivery.getBody());
- System.out.println("C2收到了消息:"+message);
- };
- //2).取消的回调信息
- CancelCallback cancelCallback=(consumerTag)->{
- System.out.println("消息消费被中断");
- };
- //4.等待接受消息
- channel.basicConsume("Q2",true,deliverCallback,cancelCallback);
- return null;
- }
- }
EmitLogTopic:
- class EmitLogTopic implements Callable {
- private static final String EXCHANGE_NAME = "topic_logs";
-
- @Override
- public Object call() throws Exception {
- Channel channel = rabbitMQUtils.getChannel();
- //1.声明一个交换机,参数:(交换机名,模式)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- //2.发布消息
- //用map封装不同类型的消息,对应不同bingKey的情况.发送若干次
- Map
msgs=new HashMap<>(); - msgs.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
- msgs.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
- msgs.put("quick.orange.fox","被队列 Q1 接收到");
- msgs.put("lazy.brown.fox","被队列 Q2 接收到");
- msgs.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
- msgs.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
- msgs.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
- msgs.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
- for (Map.Entry
bindingKeyEntry: msgs.entrySet()) { - String bindingKey = bindingKeyEntry.getKey();
- String message = bindingKeyEntry.getValue();
- channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
- Thread.sleep(1000);
- }
- return null;
- }
- }
测试:
- public static void main(String[] args) throws InterruptedException {
- ExecutorService service= Executors.newFixedThreadPool(10);
- service.submit(new C11());
- service.submit(new C22());
- Thread.sleep(1000);
- service.submit(new EmitLogTopic());
- }
-
- //结果:
- C1等待接收消息...
- C2等待接收消息...
-
- C2收到了消息:被队列 Q1Q2 接收到
- C1收到了消息:被队列 Q1Q2 接收到
- C2收到了消息:被队列 Q2 接收到
- C2收到了消息:被队列 Q1Q2 接收到
- C1收到了消息:被队列 Q1Q2 接收到
- C1收到了消息:被队列 Q1 接收到
- C2收到了消息:虽然满足两个绑定但只被队列 Q2 接收一次
- C2收到了消息:是四个单词但匹配 Q2