• RabbitMQ: Routing结构


    生产者 

    1. package com.qf.mq2302.routing;
    2. import com.qf.mq2302.utils.MQUtils;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. public class EmitLog {
    6. public static final String EXCHANGE_NAME="emitlogs";
    7. public static void main(String[] args) throws Exception {
    8. Connection connection = MQUtils.getConnection();
    9. Channel channel = connection.createChannel();
    10. //创建一个路由模式的交换机,默认创出来,不持久化,不自动删除,不是内部交换机
    11. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    12. String msg="hello routing!!";
    13. //准备routingKey
    14. String routingKey="info";
    15. //发送消息
    16. channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes("utf-8"));
    17. channel.close();
    18. connection.close();
    19. }
    20. }

    消费者1号

    1. package com.qf.mq2302.routing;
    2. import com.qf.mq2302.utils.MQUtils;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.DeliverCallback;
    6. import com.rabbitmq.client.Delivery;
    7. import java.io.IOException;
    8. public class ReceiveError {
    9. private static final String EXCHANGE_NAME="emitlogs";
    10. public static void main(String[] args) throws Exception {
    11. Connection connection = MQUtils.getConnection();
    12. Channel channel = connection.createChannel();
    13. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    14. //该消费者创建一个自己独占的队列,绑定到指定交换机接收消息。
    15. String queueName = channel.queueDeclare().getQueue();
    16. //准备号要绑定时使用的routingkey
    17. String routingKey = "error";
    18. //绑定该队列到交换机
    19. channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
    20. //设置预留消息队列,也就是,RabbitMQ发过来,我可以存几个。当确认一个就会又发过来一个,
    21. // 但是这些相当于线程池里的线程,然后每个线程又去开辟一个新的线程去执行,回调方法,
    22. // 当回调方法确认完事,才会释放当前这个线程,然后去队列里在消费一个过来。
    23. channel.basicQos(1);
    24. //autoAck :false不自动确认,需要手动确认,如果手动不确认,就会按照 channel.basicQos(1);的数量,给多少就消费多少,不会再给你发了。
    25. channel.basicConsume(queueName, false, new DeliverCallback() {
    26. @Override
    27. public void handle(String consumerTag, Delivery message) throws IOException {
    28. byte[] body = message.getBody();
    29. String msg = new String(body, "utf-8");
    30. System.out.println(msg);
    31. //手动ACK
    32. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    33. }
    34. },consumerTag -> {});
    35. }
    36. }

    消费者2号

    1. package com.qf.mq2302.routing;
    2. import com.qf.mq2302.utils.MQUtils;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.DeliverCallback;
    6. import com.rabbitmq.client.Delivery;
    7. import java.io.IOException;
    8. public class ReceiveIOther {
    9. private static final String EXCHANGE_NAME="emitlogs";
    10. public static void main(String[] args) throws Exception {
    11. Connection connection = MQUtils.getConnection();
    12. Channel channel = connection.createChannel();
    13. channel.exchangeDeclare(EXCHANGE_NAME,"direct");
    14. //该消费者创建一个自己独占的队列,绑定到指定交换机接收消息。
    15. String queueName = channel.queueDeclare().getQueue();
    16. //准备号要绑定时使用的routingkey
    17. String routingKey1 = "error";
    18. String routingKey2 = "info";
    19. String routingKey3 = "warn";
    20. //绑定该队列到交换机
    21. channel.queueBind(queueName,EXCHANGE_NAME,routingKey1);
    22. channel.queueBind(queueName,EXCHANGE_NAME,routingKey2);
    23. channel.queueBind(queueName,EXCHANGE_NAME,routingKey3);
    24. channel.basicQos(1);
    25. channel.basicConsume(queueName, false, new DeliverCallback() {
    26. @Override
    27. public void handle(String consumerTag, Delivery message) throws IOException {
    28. byte[] body = message.getBody();
    29. //获取routingKey
    30. String routingKey = message.getEnvelope().getRoutingKey();
    31. String msg = new String(body, "utf-8");
    32. System.out.println(msg);
    33. //手动ACK
    34. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    35. }
    36. },consumerTag -> {});
    37. }
    38. }

  • 相关阅读:
    DBA常用命令
    小满nestjs(第十三章 nestjs 上传图片-静态目录)
    服务器之间传文件夹,文件夹内容为空
    什么是AI管道和MLOps?
    2023年中国数据存储市场现状及发展前景预测分析
    GC-垃圾回收
    whistle 的使用
    链接脚本(Linker Script)解析
    前端项目打包体积分析与优化
    电脑重装系统后DirectX12旗舰版禁用了怎么解决?
  • 原文地址:https://blog.csdn.net/qq_53374893/article/details/132742958