在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅"。
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者。
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
总共有以下四种类型:
我们之前对交换机一无所知,但仍然可以将消息发送到队列,之前可以实现的原因就是因为我们使用的是默认交换,我们通过空字符串的形式(“”)进行标识。
channel.basicPubish("","hello",null,message.getBytes());
第一个参数就是交互机的名称。空字符串表示默认或者无名的交换机;消息能由路由发送到队列。是由routingKey(bindingkey)绑定key指定的,如果他存在的话
之前的章节我们使用的是具有特定名称的队列(还记得hello和ack_queue吗? )。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
String queueName = channel.queueDedare().getQueue();
什么是绑定呢?绑定(binding)其实是交换机(exchange)和队列(queue)之间的桥梁,他告诉我们exchange和哪个队列进行了绑定。比如下面的图告诉我们X与Q1和Q2进行了绑定
意思就是:
队列1和队列2可以看做是两个你的好友,生产者是你自己。你想把消息发给好友A(队列1),先通过交换机接收消息,因为好友A和好友B对交换机来说都是消费者,他需要明确的知道他要把消息发给谁,所以交换机和队列1通过RoutingKey
进行绑定,这时的RoutingKey
相当于路由,通过不同的路由,那么我就可以把消息发送给指定的人
Fanout(扇出)这种类型非常简单。正如从名称中猜到的那样,他是将接收到的所有消息广播到它知道的所有队列中。系统默认有些exchange类型
场景:生产者—发消息–>交换机–>消息队列(1\2)–>消费者(1\2)
Logs和临时队列的绑定关系:通过交换机进行绑定,routingKey为空串
package com.example.demo05;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 9:45
* 发送方,发消息给交换机
*/
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生产者成功发出消息:"+message);
}
}
}
package com.example.demo05;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 9:45
* 接收方1
*/
public class ReceiveLogs01 {
//交换机名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机,扇出类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个临时队列
//生成一个一个临时的队列,队列名称随机,当消费者断开连接时,自动删除该临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机与队列 ,routingKey为空串
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("队列1接收到的消息打印在屏幕上......");
//接收消息
channel.basicConsume(queue,true,RabbitMqUtils.getDeliverCallback(),
RabbitMqUtils.getCancelCallback());
}
}
package com.example.demo05;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 9:45
* 接收方2
*/
public class ReceiveLogs02 {
//交换机名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机,扇出类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个临时队列
//生成一个一个临时的队列,队列名称随机,当消费者断开连接时,自动删除该临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机与队列 ,routingKey为空串
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("队列2接收到的消息打印在屏幕上......");
//接收消息
DeliverCallback deliverCallback = (consumeTag,delivery)->{
// 保存在磁盘
// String message = new String(delivery.getBody(), "UTF-8");
// File file = new File("E:\\rabbitmq_info.txt");
// FileUtils.writeStringToFile(file,message,"UTF-8");
// System.out.println("写入数据文件成功");
System.out.println("队列2接收到的消息:"+new String(delivery.getBody(), "UTF-8"));
};
channel.basicConsume(queue,true,deliverCallback,RabbitMqUtils.getCancelCallback());
}
}
//EmitLog
11
生产者成功发出消息:11
22
生产者成功发出消息:22
3
生产者成功发出消息:3
4444
生产者成功发出消息:4444
5555
生产者成功发出消息:5555
//消费者1接收的消息
队列1接收到的消息打印在屏幕上......
接收到的消息:11
接收到的消息:22
接收到的消息:3
接收到的消息:4444
接收到的消息:5555
//消费者2接收的消息
队列2接收到的消息打印在屏幕上......
队列2接收到的消息:11
队列2接收到的消息:22
队列2接收到的消息:3
队列2接收到的消息:4444
队列2接收到的消息:5555
这就是广播(fanout/扇出)模式
在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
我们再次来回顾一下什么是bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定用参数: routingKey.来表示也可称该参数为binding key,创建绑定代码:channel.queueBind(queueName,EXCHANGE_NAME, "routingKey");
绑定之后的意义由其交换类型决定。
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定routingKey队列中去(指定routingKey传送)。
在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green.
在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定键为blackgreen.和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。
当然如果exchange的绑定类型是direct,但是他绑定的多个队列的key如果都相同,在这种情况下虽然绑定的是direct,但是他表现的就和fanout有点类似了,就跟广播差不多,如上图。
两个消费者,其中一个进行多重绑定(指定多个routingKey),另一个指定为其他的,具体参看5.2.2的介绍
package com.example.demo06;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 9:45
* 发送方,发消息给交换机
*/
public class DirectLogs {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
//指定routingKey绑定发送 error、info、warning
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes("UTF-8"));
System.out.println("生产者成功发出消息:"+message);
}
}
}
package com.example.demo06;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 15:07
*/
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "console";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机,直接交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//多重绑定,绑定为info、warning的routingKey
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
System.out.println("队列1接收到的消息打印在屏幕上......");
//接收消息
channel.basicConsume(QUEUE_NAME,true,RabbitMqUtils.getDeliverCallback("ReceiveLogsDirect01"),RabbitMqUtils.getCancelCallback());
}
}
package com.example.demo06;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 15:07
*/
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "disk";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机,直接交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定为info、warning的routingKey
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
System.out.println("队列2接收到的消息打印在屏幕上......");
//接收消息
channel.basicConsume(QUEUE_NAME,true,RabbitMqUtils.getDeliverCallback("ReceiveLogsDirect02"),RabbitMqUtils.getCancelCallback());
}
}
消费者1多重绑定了info、warning,消费者2仅仅绑定了一个error。
启动三个线程:
//DirectLogs发送
1
生产者成功发出消息:1
2
生产者成功发出消息:2
3
生产者成功发出消息:3
4
生产者成功发出消息:4
5
生产者成功发出消息:5
//ReceiveLogsDirect01接收,此时并未接收到消息
//ReceiveLogsDirect02接收
ReceiveLogsDirect02接收到的消息:1
ReceiveLogsDirect02接收到的消息:2
ReceiveLogsDirect02接收到的消息:3
ReceiveLogsDirect02接收到的消息:4
ReceiveLogsDirect02接收到的消息:5
//DirectLogs发送
1
生产者成功发出消息:1
2
生产者成功发出消息:2
3
生产者成功发出消息:3
4
生产者成功发出消息:4
5
生产者成功发出消息:5
//ReceiveLogsDirect01接收
ReceiveLogsDirect02接收到的消息:1
ReceiveLogsDirect02接收到的消息:2
ReceiveLogsDirect02接收到的消息:3
ReceiveLogsDirect02接收到的消息:4
ReceiveLogsDirect02接收到的消息:5
//ReceiveLogsDirect02接收
ReceiveLogsDirect02接收到的消息:1
ReceiveLogsDirect02接收到的消息:2
ReceiveLogsDirect02接收到的消息:3
ReceiveLogsDirect02接收到的消息:4
ReceiveLogsDirect02接收到的消息:5
//DirectLogs发送
1
生产者成功发出消息:1
2
生产者成功发出消息:2
3
生产者成功发出消息:3
4
生产者成功发出消息:4
5
生产者成功发出消息:5
//ReceiveLogsDirect01接收,此时并未接收到消息
ReceiveLogsDirect02接收到的消息:1
ReceiveLogsDirect02接收到的消息:2
ReceiveLogsDirect02接收到的消息:3
ReceiveLogsDirect02接收到的消息:4
ReceiveLogsDirect02接收到的消息:5
//ReceiveLogsDirect02接收,此时并未接收到消息
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的fanout,交换机,而是使用了 direct交换机,从而有能实现有选择性地接收日志。
尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个时候direct 就办不到了。这个时候就只能使用topic类型
发送到类型是topic交换机的消息的 routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说: “stock.usd.nyse”, “nyse.vmw”、“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过255个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的:
我们看看这张图的绑定关系:
routingKey | 接收关系 |
---|---|
quick.orange.rabbit | 被队列Q1Q2接收到 |
lazy.orange.elephant | 被队列Q1Q2接收到 |
quick.orange.fox | 被队列Q1接收到 |
lazy.brown.fox | 被队列Q2接收到 |
lazy.pink.rabbit | 虽然满足两个绑定但只被队列Q2接收一次 |
quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
lazy.orange.male.rabbit | 是四个单词但匹配Q2,Q2接收 |
当绑定关系是以下情况需要注意:
根据上方的表格,生产者发送对应的消息,routingKey对应上方表格的routingKey
package com.example.demo07;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import javax.swing.plaf.synth.SynthOptionPaneUI;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 21:01
*/
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//发送内容
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","被队列Q1Q2接收到");
map.put("lazy.orange.elephant","被队列Q1Q2接收到");
map.put("quick.orange.fox","被队列Q1接收到");
map.put("lazy.brown.fox","被队列Q2接收到");
map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit","是四个单词但匹配Q2,Q2接收");
map.forEach((k,v)->{
try {
channel.basicPublish(EXCHANGE_NAME,k,null,v.getBytes("UTF-8"));
System.out.println("生产者发送消息:"+v);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
package com.example.demo07;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 21:05
*/
public class ReceiveLogsTopic01 {
//交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
System.out.println("Q1等待接收消息");
channel.basicConsume(QUEUE_NAME,true,
RabbitMqUtils.getDeliverCallback("Q1",BuiltinExchangeType.TOPIC),RabbitMqUtils.getCancelCallback());
}
}
package com.example.demo07;
import com.example.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/19 21:05
*/
public class ReceiveLogsTopic02 {
//交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q2";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//多重绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
System.out.println("Q2等待接收消息");
channel.basicConsume(QUEUE_NAME,true,
RabbitMqUtils.getDeliverCallback("Q2",BuiltinExchangeType.TOPIC),RabbitMqUtils.getCancelCallback());
}
}
package com.example.utils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 我见青山多妩媚
* @date Create on 2022/8/14 16:29
* 连接信道的工具类
*/
public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("");
factory.setUsername("");
factory.setPassword("");
return factory.newConnection().createChannel();
}
/**
* 消息正常处理回调函数
* @return DeliverCallback
*/
public static DeliverCallback getDeliverCallback(){
return (consumerTag, message) -> {
//打印消息内容
System.out.println("接收到的消息:"+new String(message.getBody()));
};
}
public static DeliverCallback getDeliverCallback(String tips){
return (consumerTag, message) -> {
//打印消息内容
System.out.println(tips+"接收到的消息:"+new String(message.getBody()));
};
}
public static DeliverCallback getDeliverCallback(String tips,BuiltinExchangeType type){
return (consumerTag, message) -> {
//打印消息内容
System.out.println(tips+"接收到的消息:"+new String(message.getBody()));
System.out.println("绑定键:"+message.getEnvelope().getRoutingKey());
};
}
/**
* 消息异常处理回调函数
* @return CancelCallback
*/
public static CancelCallback getCancelCallback(){
return consumerTag ->{
System.out.println("消费消息中断:"+consumerTag);
};
}
}