视频:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件_哔哩哔哩_bilibili
涉及代码:
https://github.com/user0819/rabbitMQ-hello
https://github.com/user0819/rabbitmq-springboot
MQ(message queue),消息队列。
是一种跨进程的通信机制,用于上下游传递消息。
流量削峰
在流量高峰期使用消息队列做缓冲,避免应用被激增的请求打挂,或者丢失了某部分的请求。
应用解耦
系统之间减少耦合关系。上游系统只需将消息丢进队列,无需关心下游系统何时处理及处理结果。

异步处理
有些服务间调用是异步的,例如 A 调用 B, A 需要知道 B 什么时候可以执行完。
普通方式:
可以使用MQ, A 调用 B 服务后,只需要监听 B 处理完成的消息,能及时的得到异步处理成功的消息。

| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
| 吞吐量 | 万级 | 万级 | 十万级 | 百万级 |
| 时效性 | ms 级 | us级 | ms 级 | ms级 |
| 可用性 | 高 主从架构 | 高 主从架构 | 非常高 分布式架构 | 非常高 分布式架构 |
| 消息可靠性 | 较低概率丢失数据 | 基本不丢 | 0 丢失 | 0 丢失 |
| 功能支持 | MQ 领域功能极其完备 | 基于 erlang 开发 并发能力很强 性能极好 延时很低 | MQ 功能较为完善 分布式的 扩展性好 | 功能较为简单 主要支持简单的 MQ 功能 在大数据领域被大规模使用 |
| 缺点 | 维护变少 高吞吐量场景使用较少 | 商业版需收费 学习成本高 | 支持的客户端语言有限:主Java 没有实现JMS等接口 | 响应时间随队列/分区变长 消费失败不支持重试 代理宕机会导致消息乱序 |
1.Kafka
基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。
如果有日志采集功能, 首选 kafka。
2.RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰。
RoketMQ 在稳定性上可能更值得信赖,已经经历了阿里双十一多次考验
3.RabbitMQ
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便。
如果数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
AMQP 是什么
AMQP,即Advanced Message Queuing Protocol。
准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ 就是 AMQP 协议的 Erlang 的实现 (当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 )
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。
AMQP 协议 3 层
AMQP 模型的组件


RabbitMQ使用Erlang语言开发等,所以必须先提供Erlang环境。
官方安装说明:Downloading and Installing RabbitMQ — RabbitMQ
Linux安装:
Installing on RPM-based Linux (RedHat Enterprise Linux, CentOS, Fedora, openSUSE) — RabbitMQ
Linux安装RabbitMQ详细教程_m0_67392273的博客-CSDN博客_linux安装rabbitmq
- [root@VM-4-16-centos rabbitmq]# rpm -ivh erlang-25.0.3-1.el8.x86_64.rpm
-
- [root@VM-4-16-centos rabbitmq]# rpm -ivh socat-1.7.4.1-1.el8.x86_64.rpm
-
- [root@VM-4-16-centos rabbitmq]# rpm -ivh rabbitmq-server-3.10.7-1.el8.noarch.rpm
rabbitmq-plugins enable rabbitmq_management - cd /etc/rabbitmq/
- vim rabbitmq.config
-
- [{rabbit,[{loopback_users,[]}]}].
- # 启动rabbitmq命令:
- systemctl start rabbitmq-server
-
- # 查看状态命令:
- systemctl status rabbitmq-server
-
- # 停止命令:
- systemctl stop rabbitmq-server
也可通过yum安装
- yum install erlang.x86_64
- yum install rabbitmq-server.noarch
MAC安装:
The Homebrew RabbitMQ Formula — RabbitMQ
使用homebrew安装
brew install rabbitmq
启动
brew services start rabbitmq
前台启动:
CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
安装后路径: /opt/homebrew/opt/rabbitmq
Windows安装
Installing on Windows — RabbitMQ
进入后下载erlang和rabbitmq的exe安装包即可
开启管理页面
安装完成后,需要开启管理页面方便后期查看管理
- //开启管理页面(http://127.0.0.1:15672)
- rabbitmq-plugins enable rabbitmq_management
- //创建用户
- rabbitmqctl add_user admin admin
- //设置角色
- rabbitmqctl set_user_tags admin administrator
- //设置用户权限
- rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'
- //查看用户角色
- rabbitmqctl list_users
maven依赖:
-
-
com.rabbitmq -
amqp-client -
5.8.0 -
生产者:
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
-
- //连接
- Connection connection = factory.newConnection();
-
- //信道
- Channel channel = connection.createChannel();
-
- //声明队列
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
-
- //发送消息
- channel.basicPublish("", QUEUE_NAME, null, "hello, world".getBytes(StandardCharsets.UTF_8));
-
- System.out.println("生成消息结束");
- }
消费者:
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMQUtil.getChannel();
-
- //分发回调
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println(consumerTag);
- System.out.println(new String(message.getBody()));
- };
-
- //取消回调
- CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
-
- //接收消息
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
- }
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。
当有多个工作线程时,这些工作线程将一起处理这些任务。
1、启动两个工作线程
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMQUtil.getChannel();
-
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("接收到消息:" + consumerTag + " " + new String(message.getBody()));
- };
-
- CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
-
- //发送消息
- System.out.println("worker3等待消费");
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
- }
2、启动一个发送线程
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMQUtil.getChannel();
-
- //声明队列
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
-
- //发送消息
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()){
- String next = scanner.next();
- channel.basicPublish("", QUEUE_NAME, null, next.getBytes(StandardCharsets.UTF_8));
- System.out.println("消息发送成功");
- }
- }
3、结果展示
发送线程生成AA、BB、CC、DD,woker1处理AA、CC,worker2处理BB、DD。
即消息会轮循的分发给多个工作线程处理。
消费者完成一个任务可能需要一段时间,如果消费者处理一个长的任务时只完成了部分突然挂掉,会发生什么情况?
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制:
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理,rabbitmq 可以把该消息删除了。
消息发送后立即被认为已经传送成功。
这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
另一方面这种模式消费者没有对传递的消息数量进行限制,使得消费者接收太多来不及处理的消息。这些消息的积压最终使得内存耗尽,最终崩溃。
所以这种模式,仅适用在消费者高效处理消息的情况下使用。
手动应答的好处是可以批量应答并且减少网络拥堵。
channel.basicAck(deliveryTag, false);
multiple解释:

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认。
RabbitMQ 将了解到消息未完全处理,并将对其重新排队。
如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。
这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。
消费者代码:
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMQUtil.getChannel();
-
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("接收到消息:" + consumerTag + " " + new String(message.getBody()));
- SleepUtil.sleep(3);
- //手动应答消息
- channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
- System.out.println("完成消费:" + new String(message.getBody()));
- };
-
- CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
-
- System.out.println("Consumer1等待消费");
- channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
- }
消息的发送是异步发送的,所以在任何时候channel 上肯定不止只有一个消息。
因此这里就存在一个未确认的消息缓冲区,开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

通过使用 basicQos() 方法设置预取计数值。
预取值为 1 是最保守的。当然这将使吞吐量变得很低。
channel.basicQos(1);
设置了预取值后,实际上也实现了不公平分发,而不再采用轮训分发。
默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息。
确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。
之前创建的队列都是非持久化的,rabbitmq 重启的话,该队列就会被删除掉。
如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化。
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。
- //队列持久化
-
- //Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map
arguments) throws IOException; - channel.queueDeclare( QUEUE_NAME, true, false, false, null );
要想让消息实现持久化,需要在消息生产者修改代码, 添加MessageProperties.PERSISTENT_TEXT_PLAIN这个属性。
- //消息持久化
- channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
将消息标记为持久化并不能完全保证不会丢失消息:
尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。
如果需要更强有力的持久化策略,则需要发布确认
生产者将信道设置成 confirm 模式,在该信道上面发布的消息会被指派唯一的 ID(从 1 开始)。
消息被投递到所有匹配的队列后,broker 会发送一个确认给生产者(包含消息唯一 ID),使得生产者知道消息到达队列。
如果消息和队列是可持久化的,那么确认消息在消息写入磁盘后发出。
confirm 模式是异步的,生产者可以在等信道返回确认的同时继续发送下一条消息。
当消息得到确认之后,生产者可以通过回调方法来处理该确认消息。
如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者同样可以在回调方法中处理该 nack 消息。
发布确认默认是没有开启的,需要调用 channel 上的confirmSelect()方法。
channel.confirmSelect();
同步确认发布的方式,发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
waitForConfirmsOrDie(long)方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没被确认,那么方法将抛出异常。
最大的缺点是: 发布速度特别的慢。
- /**
- * 单个确认
- * 每发送一条,就等待确认结果
- *
- * @throws Exception 异常
- */
- public static void sendMessageSingleConfirm() throws Exception {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = RabbitMQUtil.getChannel(connection);
-
- String queueName = "SINGLE_CONFIRM_QUEUE";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.confirmSelect();
-
- long start = System.currentTimeMillis();
- for (int i = 1; i <= MESSAGE_COUNT; i++) {
- String message = "message" + i;
- channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
- boolean flag = channel.waitForConfirms();
- System.out.println("消息【" + message + "】发送确认" + (flag ? "成功" : "失败"));
- }
- long end = System.currentTimeMillis();
- System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息执行时间:" + (end - start) + "ms");
-
- RabbitMQUtil.closeChannel(channel);
- RabbitMQUtil.closeConnection(connection);
- }
与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量。
缺点是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
这种方案仍然是同步的,也一样阻塞消息的发布。
- /**
- * 批量确认
- * 每发送n条,就等待确认n条结果
- *
- * @throws Exception 异常
- */
- public static void sendMessageBatchConfirm() throws Exception {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = RabbitMQUtil.getChannel(connection);
-
- String queueName = "BATCH_CONFIRM_QUEUE";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.confirmSelect();
-
-
- long start = System.currentTimeMillis();
- for (int i = 1; i <= MESSAGE_COUNT; i++) {
- String message = "message" + i;
- channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
- if (i % 100 == 0) {
- boolean flag = channel.waitForConfirms();
- System.out.println("消息【" + (i - 100) + "~" + i + "】发送确认" + (flag ? "成功" : "失败"));
- }
- }
- long end = System.currentTimeMillis();
- System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息执行时间:" + (end - start) + "ms");
-
- RabbitMQUtil.closeChannel(channel);
- RabbitMQUtil.closeConnection(connection);
- }
生产者发送完消息后,不会阻塞等待确认消息是否发布成功。而是提供回调方法,在回调方法中处理成功或失败的消息。

- /**
- * 异步确认
- * 提供一个回调方法,客户端不主动等待结果,而是等服务端主动回调
- *
- * @throws Exception 异常
- */
- public static void sendMessageSyncConfirm() throws Exception {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = RabbitMQUtil.getChannel(connection);
-
- String queueName = "SYNC_CONFIRM_QUEUE";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.confirmSelect();
-
- ConcurrentSkipListMap
messageMap = new ConcurrentSkipListMap<>(); - ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
- if (multiple) {
- //返回的是小于等于当前序列号的未确认消息
- ConcurrentNavigableMap
confirmed = messageMap.headMap(sequenceNumber, true); - //清除该部分未确认消息
- confirmed.clear();
- } else {
- //只清除当前序列号的消息
- messageMap.remove(sequenceNumber);
- }
- };
- ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
- String message = messageMap.get(sequenceNumber);
- System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
- };
- channel.addConfirmListener(ackCallback, nackCallback);
-
- long start = System.currentTimeMillis();
- for (int i = 1; i <= MESSAGE_COUNT; i++) {
- String message = "message" + i;
- messageMap.put(channel.getNextPublishSeqNo(), message);
- channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
- }
- System.out.println(messageMap.size());
- long end = System.currentTimeMillis();
- System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息执行时间:" + (end - start) + "ms");
-
- SleepUtil.sleep(10);
- //todo 这会儿还没发出去的消息即发送失败的消息,可以重试发送
- System.out.println(messageMap.size());
- RabbitMQUtil.closeChannel(channel);
- RabbitMQUtil.closeConnection(connection);
- }

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange)。
交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。
交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列,还是把它们路由到许多队列中,还是说应该丢弃它们。

总共有以下类型:
在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。
之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
channel.basicPublish("", QUEUE_NAME, null, "hello, world".getBytes(StandardCharsets.UTF_8));
第一个参数是交换机的名称。空字符串表示默认或无名称交换机。
默认交换机是direct类型,和所有队列都通过队列名作为routingKey。
Default exchange
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
队列的名称至关重要-需要指定消费者去消费哪个队列的消息。
做测试时,每当连接到 RabbitMq都需要一个全新的空队列,为此可以创建一个随机名称的队列。
其次一旦断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue()
创建出来之后长成这样

绑定(binding) 是 exchange 和 queue 之间的桥梁,它表示exchange 和哪个队列进行了绑定关系。
比如说下面这张图是 X 与 Q1 和 Q2 进行了绑定。
交换机和队列是通过routingKey绑定的。

Fanout 这种类型非常简单,它将接收的消息广播到绑定的所有队列中。
消费者1/消费者2:
- public class Consumer1 {
- private static final String EXCHANGE_NAME = "FANOUT_PUBLISH_EXCHANGE";
-
- /**
- * fanout扇出模式,其实就是广播模式
- * 会把消息路由到所有与它绑定的队列中
- * 所以在这个模式中,routingKey其实没有作用
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = connection.createChannel();
-
- //声明交换机和队列(其实应该放在生产者侧,但为了简便放在这)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT.getType());
- String queueName = channel.queueDeclare().getQueue();
- //fanout模式中routingKey无作用
- channel.queueBind(queueName, EXCHANGE_NAME, "456");
-
-
- //消费确认
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("Consumer1接收到消息:" + consumerTag + " " + new String(message.getBody()));
- };
- CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
-
- channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
- }
- }
生产者:
- public class Producer {
- private static final String EXCHANGE_NAME = "FANOUT_PUBLISH_EXCHANGE";
-
- /**
- * fanout扇出模式,其实就是广播模式
- * 会把消息路由到所有与它绑定的队列中
- * 所以在这个模式中,routingKey其实没有作用
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = connection.createChannel();
-
- //声明交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT.getType());
-
- //发送消息
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNextLine()){
- String message = scanner.nextLine();
- channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes(StandardCharsets.UTF_8));
- }
- }
- }
结果:
不管生产者生产什么消息,消费者1和消费者2都能收到。

消息只去到它绑定的 routingKey 队列中去。

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。
队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上:
消费者1
- public class Consumer1 {
- private static final String EXCHANGE_NAME = "DIRECT_PUBLISH_EXCHANGE";
-
- /**
- * direct直接模式,其实就是普通路由模式
- * 会把消息路由到绑定该routingKey的队列中
- * 在这个模式中,直接根据消息的routingKey,将消息路由到绑定该routingKey的队列中
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = connection.createChannel();
-
- //声明交换机和队列(其实应该放在生产者侧,但为了简便放在这)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT.getType());
- String queueName = channel.queueDeclare().getQueue();
- //direct模式中routingKey很重要
- channel.queueBind(queueName, EXCHANGE_NAME, "router1");
-
-
- //消费确认
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("Consumer1接收到消息:" + consumerTag + " " + new String(message.getBody()));
- };
- CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
-
- channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
- }
- }
消费者2:
channel.queueBind(queueName, EXCHANGE_NAME, "router2"); channel.queueBind(queueName, EXCHANGE_NAME, "router22");
生产者:
- public class Producer {
- private static final String EXCHANGE_NAME = "DIRECT_PUBLISH_EXCHANGE";
-
- /**
- * direct直接模式,其实就是普通路由模式
- * 会把消息路由到绑定该routingKey的队列中
- * 在这个模式中,直接根据消息的routingKey,将消息路由到绑定该routingKey的队列中
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = connection.createChannel();
-
- //声明交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT.getType());
-
- //发送消息
- channel.basicPublish(EXCHANGE_NAME, "router1", null, "message1".getBytes(StandardCharsets.UTF_8));
- channel.basicPublish(EXCHANGE_NAME, "router2", null, "message2".getBytes(StandardCharsets.UTF_8));
- channel.basicPublish(EXCHANGE_NAME, "router22", null, "message3".getBytes(StandardCharsets.UTF_8));
- channel.basicPublish(EXCHANGE_NAME, "router3", null, "message4".getBytes(StandardCharsets.UTF_8));
- }
- }
结果:
消费者1和2各自收到对应routingKey的消息

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。
这些单词可以是任意单词,比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符需要注意的
下图绑定关系如下:

看看他们之间数据接收情况是怎么样的
注意:
消费者1
- /**
- * topic主题模式
- * 交换机和队列通过通配符的routingKey绑定
- * 消息的routingKey可能会匹配到多个绑定关系,那该消息就会被路由到多个队列中
- * topic模式就是通过routingKey实现模糊匹配
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = connection.createChannel();
-
- //声明交换机和队列(其实应该放在生产者侧,但为了简便放在这)
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC.getType());
- String queueName = channel.queueDeclare().getQueue();
- //topic模式中routingKey是通配符:#代表零或多个单词,*代表单个单词
- channel.queueBind(queueName, EXCHANGE_NAME, "*.*.orange");
- channel.queueBind(queueName, EXCHANGE_NAME, "orange.#");
-
-
- //消费确认
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println("Consumer1接收到消息:" + consumerTag + " " + new String(message.getBody()));
- };
- CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
-
- channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
- }
消费者2
- //topic模式中routingKey是通配符:#代表零或多个单词,*代表单个单词
- channel.queueBind(queueName, EXCHANGE_NAME, "lazy.*");
- channel.queueBind(queueName, EXCHANGE_NAME, "*.rabbit.#");
生产者:
- public class Producer {
- private static final String EXCHANGE_NAME = "TOPIC_PUBLISH_EXCHANGE";
-
- /**
- * direct直接模式,其实就是普通路由模式
- * 会把消息路由到指定绑定该routingKey的队列中
- * 在这个模式中,routingKey直接决定消息入哪个队列
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = connection.createChannel();
-
- //声明交换机
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC.getType());
-
- //发送消息
- //1.2都会收到 (注意匹配到了Consumer2的两条规则,但只会发送一次消息)
- channel.basicPublish(EXCHANGE_NAME, "lazy.rabbit.orange", null, "message1".getBytes(StandardCharsets.UTF_8));
- //2会收到
- channel.basicPublish(EXCHANGE_NAME, "lazy.123456", null, "message2".getBytes(StandardCharsets.UTF_8));
- //1会收到
- channel.basicPublish(EXCHANGE_NAME, "orange.123456", null, "message3".getBytes(StandardCharsets.UTF_8));
- //1会收到
- channel.basicPublish(EXCHANGE_NAME, "*.*.orange", null, "message4".getBytes(StandardCharsets.UTF_8));
- //都不会收到
- channel.basicPublish(EXCHANGE_NAME, "lazy**", null, "message4".getBytes(StandardCharsets.UTF_8));
- }
- }
结果:
