• RabbitMQ官方案例学习记录


    官方文档RabbitMQ教程 — RabbitMQ (rabbitmq.com)

    一、安装RabbitMQ服务

    直接使用docker在服务器上安装

    docker run -it -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management

    安装完成后,访问15672端口,默认用户名和密码都是 guest,即可进入 

    二、hello world——梦开始的地方

    1. 介绍

            RabbitMQ 是一个消息代理:它接受并转发消息。 你可以把它想象成一个邮局:当你把你想要邮寄的邮件放在邮箱里时, 您可以确定,邮递员最终会将邮件递送给您的收件人。 在这个类比中,RabbitMQ是一个邮政信箱,一个邮局和一个信件载体。RabbitMQ和邮局的主要区别在于它不处理纸张, 相反,它接受、存储和转发数据的二进制 blob - 消息

    2. 一些术语

    生产者:消息的发送方

    队列queue:本质上是一个大型的消息缓冲区

    消费者:消息的使用方

    Channel 频道:理解为操作消息队列的 client(比如 jdbcClient、redisClient),提供了和消息队列 server 建立通信的传输方法(为了复用连接,提高传输效率)。程序通过 channel 操作 rabbitmq(收发消息)

    3. 编写代码

    用 Java 编写两个程序;一个发送单个消息的生产者,一个接收的使用者并将消息打印出来。

    (1)消息生产者:

    编码过程:

    先创建连接工厂,然后通过工厂创建连接,再通过连接创建channel。通过channel来绑定队列或者交换机,再用channel来生产或者消费消息。

    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

    对于这行代码,可以看到消息是根据QUEUE_NAME路由到对应的队列。

    1. import com.rabbitmq.client.ConnectionFactory;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.Channel;
    4. public class MQProducer {
    5. //设置队列名
    6. private final static String QUEUE_NAME = "hello-zy";
    7. public static void main(String[] argv) throws Exception {
    8. //创建连接工厂
    9. ConnectionFactory factory = new ConnectionFactory();
    10. //设置连接的服务器IP和端口号
    11. factory.setHost("123.249.112.12");
    12. factory.setPort(5672);
    13. //创建一个连接和通道,这里使用 try-with-resources语句,
    14. // 因为Connection和Channel都实现了java.lang.AutoCloseable,不需要在代码中再显式的关闭他们
    15. try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
    16. /*
    17. 创建队列 channel.queueDeclare()用于声明一个队列
    18. queue(队列名称):指定要声明的队列的名称。
    19. durable(持久化):指定队列是否是持久化的。当 RabbitMQ 重新启动时,持久化的队列将被保留下来。如果将该参数设置为 true,则队列将被持久化;如果设置为 false,则队列不会被持久化。注意,这里指的是队列本身的持久化,而不是队列中的消息。
    20. exclusive(排他性):指定队列是否是排他的。如果将该参数设置为 true,则该队列只能被当前连接的消费者使用,并且在连接关闭时会自动删除该队列。如果设置为 false,则队列可供多个消费者使用。
    21. autoDelete(自动删除):指定队列在不再被使用时是否自动删除。如果将该参数设置为 true,则当队列不再被消费者使用时,将自动删除该队列。如果设置为 false,则队列不会自动删除。
    22. arguments(参数):指定队列的其他属性和参数,以键值对的形式提供。
    23. */
    24. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    25. //发送消息
    26. String msg = "hello,world! RabbitMQ!";
    27. //channel.basicPublish("", QUEUE_NAME, null, msg.getBytes())
    28. // 这行代码的作用是将 msg 消息发布到默认交换器(空字符串)并使用QUEUE_NAME作为路由键,
    29. // 消息的属性设置为默认值,消息的内容为 msg.getBytes(),即将 msg 转换为字节数组后发送。
    30. channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    31. System.out.println(" 生产者发送消息:'" + msg + "'");
    32. }
    33. }
    34. }

     然后运行这个生产者代码,去网页端查看:

    (2)消息消费者:

    1. import com.rabbitmq.client.Channel;
    2. import com.rabbitmq.client.Connection;
    3. import com.rabbitmq.client.ConnectionFactory;
    4. import com.rabbitmq.client.DeliverCallback;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class MQConsumer {
    8. //声明队列 和消息发送方保持一致
    9. private final static String QUEUE_NAME = "hello-zy";
    10. public static void main(String[] args) throws IOException, TimeoutException {
    11. ConnectionFactory factory = new ConnectionFactory();
    12. //设置rabbitMQ服务端ip
    13. factory.setHost("123.249.112.12");
    14. //这里不用try-with-resources 因为消费方需要一致保持监听,不要关闭
    15. Connection connection = factory.newConnection();
    16. Channel channel = connection.createChannel();
    17. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    18. System.out.println(" 等待接收消息,退出请按 CTRL+C");
    19. //创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时,将自动执行该方法。
    20. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    21. String message = new String(delivery.getBody(), "UTF-8");
    22. System.out.println(" 消费了消息:'" + message + "'");
    23. };
    24. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    25. }
    26. }

    去网页端查看消息是否被消费,可以看到多了一个消费者,并且消息已经没了。 

    三、工作队列 WorkQueue

            工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它要完成。相反,我们将任务安排在以后完成。我们将任务封装为消息并将其发送到队列。正在运行的工作进程 在后台将弹出任务并最终执行工作。当您运行许多工作线程时,任务将在它们之间共享。

    WorkQueue的模型跟前面第一个案例Hello,World!的模型,最明显的区别其实就是,第一个案例他只有一个消费者。我们知道RabbitMQ他的消息是阅完即焚,即消费者一旦接收,这个消息直接就从Queue中被弹出了。
    而现在这个案例,他有两个消费者(画两个只是方便,他当然也可以有3个、4个),他的消息应该是通过某种算法做负载均衡送到不同的消费者,让消费者进行处理,让消息不至于处理不过来,从而导致滞留在Queue中的消息被弹出。

    思路如下:
    1、我们先让Publish服务每秒发布50条消息到 simple.queue,来演示消息的频繁发送。
    2、在Consumer服务中定义两个消费者,来监听我们的 simple.queue队列。
    3、消费者1每秒处理40条消息,消费者2每秒处理30条消息。

    1. 循环调度

    (1)生产者:

    生成50条消息:

    1. public class Send {
    2. //设置队列名
    3. private final static String QUEUE_NAME = "hello-zy";
    4. public static void main(String[] argv) throws Exception {
    5. //创建连接
    6. ConnectionFactory factory = new ConnectionFactory();
    7. //设置连接的服务器IP和端口号
    8. factory.setHost("123.249.112.12");
    9. factory.setPort(5672);
    10. //创建一个通道,这里使用 try-with-resources语句,
    11. // 因为Connection和Channel都实现了java.lang.AutoCloseable,不需要在代码中再显式的关闭他们
    12. try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
    13. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    14. //发送消息
    15. for(int i = 1; i <= 50; i++) {
    16. String msg = "hello, I am";
    17. msg = msg + i;
    18. channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
    19. Thread.sleep(200);
    20. System.out.println(" 生产者发送消息:'" + msg + "'");
    21. }
    22. }
    23. }
    24. }

    (2)消费者1和消费者2

    1. public class Worker_01 {
    2. //声明队列 和消息发送方保持一致
    3. private final static String QUEUE_NAME = "hello-zy";
    4. public static void main(String[] args) throws IOException, TimeoutException {
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //设置rabbitMQ服务端ip
    7. factory.setHost("123.249.112.12");
    8. //这里不用try-with-resources 因为消费方需要一致保持监听,不要关闭
    9. Connection connection = factory.newConnection();
    10. Channel channel = connection.createChannel();
    11. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    12. System.out.println(" 等待接收消息,退出请按 CTRL+C");
    13. //创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时,将自动执行该方法。
    14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    15. String message = new String(delivery.getBody(), "UTF-8");
    16. System.out.println(" Worker_01消费了消息:'" + message + "'");
    17. };
    18. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    19. }
    20. }
    21. public class Worker_02 {
    22. //声明队列 和消息发送方保持一致
    23. private final static String QUEUE_NAME = "hello-zy";
    24. public static void main(String[] args) throws IOException, TimeoutException {
    25. ConnectionFactory factory = new ConnectionFactory();
    26. //设置rabbitMQ服务端ip
    27. factory.setHost("123.249.112.12");
    28. //这里不用try-with-resources 因为消费方需要一致保持监听,不要关闭
    29. Connection connection = factory.newConnection();
    30. Channel channel = connection.createChannel();
    31. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    32. System.out.println(" 等待接收消息,退出请按 CTRL+C");
    33. //创建了一个消费者对象并实现了 handleDelivery() 方法作为回调方法。当消费者收到消息时,将自动执行该方法。
    34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    35. String message = new String(delivery.getBody(), "UTF-8");
    36. System.out.println(" Worker_02消费了消息:'" + message + "'");
    37. };
    38. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    39. }
    40. }

    启动两个消费者,然后开始生产消息,控制台打印如下:

     可以看到是交替消费的:

    2. 消息确认

            在上面的代码中,一旦消息传递给消费者,就会立即被删除。如果在消费过程中,消费者宕机,消息没消费成功,但是因为已经投递出去了,消息从队列删掉。就会出现:消息未消费,且丢失的问题。对于这种情况,我们希望没消费成功的消息,转交给其他的消费者消费。

            为了确保消息永远不会丢失,RabbitMQ 支持消息确认。确认由消费者告诉 RabbitMQ 已收到特定消息,并处理,RabbitMQ可以自由删除它。

            如果消费者没有发送确认消息,rabbitMQ可以知道消息没有消费成功,并将重新排队。

            在消费者返回确认消息时强制实施超时(默认为 30 分钟)。 这有助于检测从不确认交付的错误(卡住)消费者。 可以按照传递确认超时中所述增加此超时。

            默认情况下,手动消息确认处于打开状态。在上一个 示例,我们通过 autoAck=true 标志明确关闭了它们。是时候将此标志设置为 false ,来使消费者发送确认消息。

    (1)消费者

    生产者还是不变,生成10条消息到队列中:

    创建消费者,启动手动确认:

    1. public class Consumer {
    2. private final static String QUEUE_NAME = "hello-zy";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("123.249.112.12");
    6. factory.setPort(5672);
    7. Connection connection = factory.newConnection();
    8. Channel channel = connection.createChannel();
    9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    10. // 将autoAck参数设置为false,关闭自动消息确认
    11. /*
    12. * new DefaultConsumer(channel) { ... }:这是一个匿名内部类,用于定义消息处理的逻辑。
    13. * 它继承自DefaultConsumer,并覆盖了 handleDelivery 方法,以自定义消息的处理方式。
    14. * */
    15. channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    16. /*handleDelivery 方法:这是 DefaultConsumer 类中的方法,用于处理从队列中接收的消息。
    17. consumerTag:标识消费者的标签。
    18. envelope:包含与消息相关的元数据,如交付标签、交付模式等。
    19. properties:包含消息的属性,如消息的头部信息。
    20. body:消息的内容,以字节数组形式提供。
    21. */
    22. @Override
    23. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    24. String message = new String(body, "UTF-8");
    25. System.out.println("消费者接收消息: '" + message + "'");
    26. // 在消息处理成功后,发送确认消息
    27. channel.basicAck(envelope.getDeliveryTag(), false);
    28. System.out.println("确认收到了消息:"+message);
    29. }
    30. });
    31. }
    32. }

    启动消费者:

    (2)验证未成功消费情况

    先生产6条消息到队列:

    简单修改一下消费者代码,在中间添加判断逻辑,当碰到消息5的时候,退出消费者:

    1. public class Consumer1 {
    2. private final static String QUEUE_NAME = "hello-zy";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("123.249.112.12");
    6. Connection connection = factory.newConnection();
    7. Channel channel = connection.createChannel();
    8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    9. try {
    10. // 将autoAck参数设置为false,关闭自动消息确认
    11. channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
    12. @Override
    13. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
    14. throws IOException {
    15. String message = new String(body, "UTF-8");
    16. System.out.println("消费者接收消息: '" + message + "'");
    17. // 模拟某个条件,例如消息处理成功
    18. if (!message.contains("5")) {
    19. // 在消息处理成功后,发送确认消息
    20. channel.basicAck(envelope.getDeliveryTag(), false);
    21. System.out.println("确认收到了消息:" + message);
    22. }
    23. // 模拟消费者退出
    24. if (message.contains("5")) {
    25. throw new RuntimeException("Consumer exiting...");
    26. }
    27. }
    28. });
    29. } catch (RuntimeException e) {
    30. // 当消费者退出时,捕获异常
    31. System.out.println("Consumer exited.");
    32. }
    33. }
    34. }

     

    可以看到,当消费者消费到第五条消息后,因为抛出了异常,所以后面的消息都未消费成功,所以会把第五条消息和第六条消息再放回消息队列,查看消息队列,可以看到还有两条消息在队列中:

    如果把参数改成true,也就是自动确认:这就会导致未消费的消息丢失

     

    3. 消息持久性

    我们已经保证了消息未消费的情况下不会丢失,但是如果RabbitMQ服务器宕机,消息还是会丢失。这就涉及到一个消息持久化的问题。

    需要做两件事来确保 消息不会丢失:我们需要将队列和消息都标记为durable。 (重新定义一个队列)

    此时,我们确信durable_queue队列不会丢失 即使 RabbitMQ 重新启动。现在我们需要将我们的消息标记为持久 - 通过设置消息属性(实现基本属性) 到值PERSISTENT_TEXT_PLAIN。 

     

    如果不设置未持久性,重启docker的rabbitmq容器,队列消息就丢了,设置durable=true后, 即使重启docker容器,队列和消息都不会丢失。

    四、发布/订阅模式

            在前面,都是一条消息由一个消费者消费。如果一条消息需要被多个消费者消费,那么就需要引入发布/订阅模式。

    1. 交换机

    关于交换机的概念():

    一个生产者给 多个 队列发消息,1 个生产者对多个队列。
    交换机的作用:提供消息转发功能,类似于网络路由器
    要解决的问题:怎么把消息转发到不同的队列上,好让消费者从不同的队列消费。

    交换机有多种类别:fanout、direct, topic, headers

    fanout(扇出)

    扇出、广播
    特点:消息会被转发到所有绑定到该交换机的队列
    场景:很适用于发布订阅的场景。比如写日志,可以多个系统间共享

    Direct 直接

    绑定:可以让交换机和队列进行关联,可以指定让交互机把什么样的消息发送给哪个队列(类似于计算机网络中,两个路由器,或者网络设备相互连接,也可以理解为网线)
    routingKey:路由键,控制消息要转发给哪个队列的(IP 地址) 

    特点:消息会根据路由键转发到指定的队列
    场景:特定的消息只交给特定的系统(程序)来处理
    绑定关系:完全匹配字符串

    比如发日志的场景,希望用独立的程序来处理不同级别的日志,比如 C1 系统处理 error 日志,C2 系统处理其他级别的日志

     

     topic 交换机

    特点:消息会根据一个 模糊的 路由键转发到指定的队列
    场景:特定的一类消息可以交给特定的一类系统(程序)来处理
    绑定关系:可以模糊匹配多个绑定
    ●*:匹配一个单词,比如 *.orange,那么 a.orange、b.orange 都能匹配
    ●#:匹配 0 个或多个单词,比如 a.#,那么 a.a、a.b、a.a.a 都能匹配

    注意,这里的匹配和 MySQL 的like 的 % 不一样,只能按照单词来匹配,每个 '.' 分隔单词,如果是 '#.',其实可以忽略,匹配 0 个词也 ok

    Headers 交换机 

    类似主题和直接交换机,可以根据 headers 中的内容来指定发送到哪个队列。使用消息头headers来路由消息。

     2. 使用fanout交换机来实现发布/订阅

    (1)生产者

    1. public class LogProducer {
    2. private final static String EXCHANGE_NAME = "logs";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("123.249.112.12");
    6. factory.setPort(5672);
    7. try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
    8. //声明交换机类型为:FANOUT
    9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    10. for (int i = 1; i <= 5; i++) {
    11. String message = "Log message " + i;
    12. //消息发到交换机中,而不是像之前点对点那样直接发到消息队列中
    13. channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    14. System.out.println("生产者发送日志: " + message);
    15. }
    16. }
    17. }
    18. }

    查看交换机列表,可以看到新增了一个名为logs的交换机,类型为 fanout:

     (2)消费者

    创建两个消费者,都绑定一个交换机,名字为log

    1. public class LogConsumer {
    2. private final static String EXCHANGE_NAME = "logs";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("123.249.112.12");
    6. Connection connection = factory.newConnection();
    7. Channel channel = connection.createChannel();
    8. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    9. // 创建一个临时队列
    10. String queueName = channel.queueDeclare().getQueue();
    11. // 将队列绑定到交换器
    12. channel.queueBind(queueName, EXCHANGE_NAME, "");
    13. System.out.println("等待接收日志消息...");
    14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    15. String message = new String(delivery.getBody(), "UTF-8");
    16. System.out.println("消费者1接收消息: '" + message + "'");
    17. };
    18. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    19. }
    20. }
    21. public class LogConsumer2 {
    22. private final static String EXCHANGE_NAME = "logs";
    23. public static void main(String[] argv) throws Exception {
    24. ConnectionFactory factory = new ConnectionFactory();
    25. factory.setHost("123.249.112.12");
    26. Connection connection = factory.newConnection();
    27. Channel channel = connection.createChannel();
    28. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    29. // 创建一个临时队列
    30. String queueName = channel.queueDeclare().getQueue();
    31. // 将队列绑定到交换器
    32. channel.queueBind(queueName, EXCHANGE_NAME, "");
    33. System.out.println("等待接收日志消息...");
    34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    35. String message = new String(delivery.getBody(), "UTF-8");
    36. System.out.println("消费者2接收消息: '" + message + "'");
    37. };
    38. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    39. });
    40. }
    41. }

     启动两个消费者,然后启动生产者发送消息,可以看到两个消费者都消费了消息:

    五、路由

    在上面的例子中,我们实现了一个简单的日志记录系统。能够向多个消费者广播消息。

    下面要增加一个新的功能:

            例如,我们将只能将关键错误消息定向到 日志文件(以节省磁盘空间),同时仍然能够打印所有控制台上的日志消息。

    1. 绑定队列和交换机

    之前是这样绑定的:第三个参数就是路由键

    channel.queueBind(queueName, EXCHANGE_NAME, "");

    绑定是交换和队列之间的关系。

            绑定可以采用额外的路由键参数。为了避免 与basic_publish参数混淆,我们将它称为绑定键。绑定键的含义取决于交换类型。

    channel.queueBind(queueName, EXCHANGE_NAME, "black");

    2. 使用direct交换机绑定

    (1)生产者

    模拟三条不同的消息,指定消息1的路由键为orange,消息2的路由键为black,消息3的路由键为green。

    1. public class LogProducer {
    2. private final static String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("123.249.112.12");
    6. factory.setPort(5672);
    7. try (Connection connection = factory.newConnection();
    8. Channel channel = connection.createChannel()) {
    9. //声明交换机类型为 direct
    10. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    11. String message1 = "Message with routing key orange";
    12. String message2 = "Message with routing key black";
    13. String message3 = "Message with routing key green";
    14. // 发布消息到交换器,并指定不同的路由键
    15. channel.basicPublish(EXCHANGE_NAME, "orange", null, message1.getBytes());
    16. channel.basicPublish(EXCHANGE_NAME, "black", null, message2.getBytes());
    17. channel.basicPublish(EXCHANGE_NAME, "green", null, message3.getBytes());
    18. System.out.println("生产者发送消息完成.");
    19. }
    20. }
    21. }

    (2)消费者

    创建两个消费者

    消费者1和消费者2都绑定对应的交换机,其中消费者1对应路由键orange,消费者2对应路由键black和orange。

    1. public class LogConsumer1 {
    2. private final static String EXCHANGE_NAME = "direct_logs";
    3. public static void main(String[] argv) throws Exception {
    4. ConnectionFactory factory = new ConnectionFactory();
    5. factory.setHost("123.249.112.12");
    6. factory.setPort(5672);
    7. Connection connection = factory.newConnection();
    8. Channel channel = connection.createChannel();
    9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    10. // 创建一个临时队列
    11. String queueName = channel.queueDeclare().getQueue();
    12. // 绑定队列到交换器,指定路由键为 "orange"
    13. channel.queueBind(queueName, EXCHANGE_NAME, "orange");
    14. System.out.println("等待接收 orange 消息...");
    15. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    16. String message = new String(delivery.getBody(), "UTF-8");
    17. System.out.println("消费者1接收消息: '" + message + "'");
    18. };
    19. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    20. });
    21. }
    22. }
    23. public class LogConsumer2 {
    24. private final static String EXCHANGE_NAME = "direct_logs";
    25. public static void main(String[] argv) throws Exception {
    26. ConnectionFactory factory = new ConnectionFactory();
    27. factory.setHost("123.249.112.12");
    28. factory.setPort(5672);
    29. Connection connection = factory.newConnection();
    30. Channel channel = connection.createChannel();
    31. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    32. // 创建一个临时队列
    33. String queueName = channel.queueDeclare().getQueue();
    34. // 绑定队列到交换器,指定路由键为 "black" 和 "green"
    35. channel.queueBind(queueName, EXCHANGE_NAME, "black");
    36. channel.queueBind(queueName, EXCHANGE_NAME, "green");
    37. System.out.println("等待接收 black 和 green 消息...");
    38. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    39. String message = new String(delivery.getBody(), "UTF-8");
    40. System.out.println("消费者2接收消息: '" + message + "'");
    41. };
    42. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
    43. });
    44. }
    45. }

  • 相关阅读:
    用Python实现基于人脸识别的门禁管理系统(附源码)
    Docker 环境安装postgres+postgis扩展
    什么是构造函数?(JavaScript)
    mysql 不同版本 下载地址
    java 加密结果不一致的各个坑
    Hive mapjoin使用
    光模块厂家如何提高千兆光模块和万兆光模块的可靠性
    使用软路由(openWrt)安装openVPN搭建局域网连接
    基于FPGA的图像直方图统计实现,包括tb测试文件和MATLAB辅助验证
    goroutine 调度器
  • 原文地址:https://blog.csdn.net/weixin_49561506/article/details/133940098