• 消息队列系列6-RabbitMQ使用姿势 (荣耀典藏版)


    7275bf0f814141158850c243d44d6098.png

     

    目录

    前言

    1.基本使用姿势

    1.1.公共代码封装

    1.2.Direct方式​

    1.3.Fanout方式(指定队列)​

    1.4.Fanout方式(随机获取队列)

    1.5.Topic方式​

    2.RabbitMQ进阶

    2.1.durable和autoDeleted

    2.1.1.durable

    2.1.2.autoDeleted

    2.2.ACK

     


     

     

    前言

    大家好,我是月夜枫,消息队列的6篇文章目前就先写到这了,因为个人学习重点的调整,写完这篇文章后,要和小伙伴们暂时告别一段时间了,预计在9月初为大家带来新知识点的分享!!!

    这篇文章主要讲述RabbitMQ常用的使用姿势,希望在面试和工作中能给小伙伴们带来帮助!

     

    1.基本使用姿势

    1.1.公共代码封装

    封装工厂类:

    1. public class RabbitUtil {
    2.     public static ConnectionFactory getConnectionFactory() {
    3.         //创建连接工程,下面给出的是默认的case
    4.         ConnectionFactory factory = new ConnectionFactory();
    5.         factory.setHost("127.0.0.1");
    6.         factory.setPort(5672);
    7.         factory.setUsername("admin");
    8.         factory.setPassword("admin");
    9.         factory.setVirtualHost("/");
    10.         return factory;
    11.     }
    12. }

    封装生成者:

    1. public class MsgProducer {
    2.     public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
    3.         ConnectionFactory factory = RabbitUtil.getConnectionFactory();
    4.         //创建连接
    5.         Connection connection = factory.newConnection();
    6.         //创建消息通道
    7.         Channel channel = connection.createChannel();
    8.         // 声明exchange中的消息为可持久化,不自动删除
    9.         channel.exchangeDeclare(exchange, exchangeType, truefalsenull);
    10.         // 发布消息
    11.         channel.basicPublish(exchange, toutingKey, null, message.getBytes());
    12.         System.out.println("Sent '" + message + "'");
    13.         channel.close();
    14.         connection.close();
    15.     }
    16. }

    封装消费者:

    1. public class MsgConsumer {
    2.     public static void consumerMsg(String exchange, String queue, String routingKey)
    3.             throws IOException, TimeoutException {
    4.         ConnectionFactory factory = RabbitUtil.getConnectionFactory();
    5.         //创建连接
    6.         Connection connection = factory.newConnection();
    7.         //创建消息信道
    8.         final Channel channel = connection.createChannel();
    9.         //消息队列
    10.         channel.queueDeclare(queue, truefalsefalsenull);
    11.         //绑定队列到交换机
    12.         channel.queueBind(queue, exchange, routingKey);
    13.         System.out.println("[*] Waiting for message. To exist press CTRL+C");
    14.         Consumer consumer = new DefaultConsumer(channel) {
    15.             @Override
    16.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    17.                                        byte[] body) throws IOException {
    18.                 String message = new String(body, "UTF-8");
    19.                 try {
    20.                     System.out.println(" [x] Received '" + message);
    21.                 } finally {
    22.                     System.out.println(" [x] Done");
    23.                     channel.basicAck(envelope.getDeliveryTag(), false);
    24.                 }
    25.             }
    26.         };
    27.         // 取消自动ack
    28.         channel.basicConsume(queue, false, consumer);
    29.     }
    30. }

    1.2.Direct方式

    c5a16faa191946159ac5126e4956e19b.png

     

    Direct示例

    生产者:

    1. public class DirectConsumer {
    2.     private static final String exchangeName = "direct.exchange";
    3.     public void msgConsumer(String queueName, String routingKey) {
    4.         try {
    5.             MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
    6.         } catch (IOException e) {
    7.             e.printStackTrace();
    8.         } catch (TimeoutException e) {
    9.             e.printStackTrace();
    10.         }
    11.     }
    12.     public static void main(String[] args) throws InterruptedException {
    13.         DirectConsumer consumer = new DirectConsumer();
    14.         String[] routingKey = new String[]{"aaa""bbb""ccc"};
    15.         String[] queueNames = new String[]{"qa""qb""qc"};
    16.         for (int i = 0; i < 3; i++) {
    17.             consumer.msgConsumer(queueNames[i], routingKey[i]);
    18.         }
    19.         Thread.sleep(1000 * 60 * 100);
    20.     }
    21. }

    执行生产者,往消息队列中放入10条消息,其中key分别为“aaa”、“bbb”和“ccc”,分别放入qa、qb、qc三个队列:8597b80aa0c847e7b64cd3e0f85a02cb.png

    下面是qa队列的信息:02e35db43970403ebd1bab6e5ed375be.png 

     

    消费者:

    1. public class DirectProducer {
    2.     private static final String EXCHANGE_NAME = "direct.exchange";
    3.     public void publishMsg(String routingKey, String msg) {
    4.         try {
    5.             MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
    6.         } catch (Exception e) {
    7.             e.printStackTrace();
    8.         }
    9.     }
    10.     public static void main(String[] args) throws InterruptedException {
    11.         DirectProducer directProducer = new DirectProducer();
    12.         String[] routingKey = new String[]{"aaa""bbb""ccc"};
    13.         String msg = "hello >>> ";
    14.         for (int i = 0; i < 10; i++) {
    15.             directProducer.publishMsg(routingKey[i % 3], msg + i);
    16.         }
    17.         System.out.println("----over-------");
    18.         Thread.sleep(1000 * 60 * 100);
    19.     }
    20. }

    执行后的输出:

    1. [*] Waiting for message. To exist press CTRL+C
    2.  [x] Received 'hello >>> 0
    3.  [x] Done
    4.  [x] Received 'hello >>> 3
    5.  [x] Done
    6.  [x] Received 'hello >>> 6
    7.  [x] Done
    8.  [x] Received 'hello >>> 9
    9.  [x] Done
    10. [*] Waiting for message. To exist press CTRL+C
    11.  [x] Received 'hello >>> 1
    12.  [x] Done
    13.  [x] Received 'hello >>> 4
    14.  [x] Done
    15.  [x] Received 'hello >>> 7
    16.  [x] Done
    17. [*] Waiting for message. To exist press CTRL+C
    18.  [x] Received 'hello >>> 2
    19.  [x] Done
    20.  [x] Received 'hello >>> 5
    21.  [x] Done
    22.  [x] Received 'hello >>> 8
    23.  [x] Done

    可以看到,分别从qa、qb、qc中将不同的key的数据消费掉。

    问题探讨

    有个疑问:这个队列的名称qa、qb和qc是RabbitMQ自动生成的么,我们可以指定队列名称么?

    我做了个简单的实验,我把消费者代码修改了一下:

    执行后入下图所示:b9be9179ef384707a7bad123dfa4a295.png

    我们可以发现,多了一个qc1,所以可以判断这个界面中的queues,是消费者执行时,会将消费者指定的队列名称和direct.exchange绑定,绑定的依据就是key。

    当我们把队列中的数据全部消费掉,然后重新执行生成者后,会发现qc和qc1中都有3条待消费的数据,因为绑定的key都是“ccc”,所以两者的数据是一样的:3e0a96eee122423db414e4bfac16cad8.png

     绑定关系如下:b2104423c10b4503a87b6a13f174cca9.png

     

    注意:当没有Queue绑定到Exchange时,往Exchange中写入的消息也不会重新分发到之后绑定的queue上。

    思考:不执行消费者,看不到这个Queres中信息,我其实可以把这个界面理解为消费者信息界面。不过感觉还是怪怪的,这个queues如果是消费者信息,就不应该叫queues,我理解queues应该是RabbitMQ中实际存放数据的queues,难道是我理解错了?

    1.3.Fanout方式(指定队列)e52c146eaf884969a5bbebaeb68b374b.png

    生产者封装:

    1. public class FanoutProducer {
    2.     private static final String EXCHANGE_NAME = "fanout.exchange";
    3.     public void publishMsg(String routingKey, String msg) {
    4.         try {
    5.             MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
    6.         } catch (Exception e) {
    7.             e.printStackTrace();
    8.         }
    9.     }
    10.     public static void main(String[] args) {
    11.         FanoutProducer directProducer = new FanoutProducer();
    12.         String msg = "hello >>> ";
    13.         for (int i = 0; i < 10; i++) {
    14.             directProducer.publishMsg("", msg + i);
    15.         }
    16.     }
    17. }

    消费者:

    1. public class FanoutConsumer {
    2.     private static final String EXCHANGE_NAME = "fanout.exchange";
    3.     public void msgConsumer(String queueName, String routingKey) {
    4.         try {
    5.             MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
    6.         } catch (IOException e) {
    7.             e.printStackTrace();
    8.         } catch (TimeoutException e) {
    9.             e.printStackTrace();
    10.         }
    11.     }
    12.     public static void main(String[] args) {
    13.         FanoutConsumer consumer = new FanoutConsumer();
    14.         String[] queueNames = new String[]{"qa-2""qb-2""qc-2"};
    15.         for (int i = 0; i < 3; i++) {
    16.             consumer.msgConsumer(queueNames[i], "");
    17.         }
    18.     }
    19. }

    执行生成者,结果如下:6ed910b2ac3342aba272acb689536c3a.png

     

    我们发现,生产者生产的10条数据,在每个消费者中都可以消费,这个是和Direct不同的地方,但是使用Fanout方式时,有几个点需要注意一下:

    • 生产者的routkey可以为空,因为生产者的所有数据,会下放到每一个队列,所以不会通过routkey去路由;

    • 消费者需要指定queues,因为消费者需要绑定到指定的queues才能消费。202945f110a344f78dc3511188e4f716.png

       

    这幅图就画出了Fanout的精髓之处,exchange会和所有的queue进行绑定,不区分路由,消费者需要绑定指定的queue才能发起消费。

    注意:往队列塞数据时,可能通过界面看不到消息个数的增加,可能是你之前已经开启了消费进程,导致增加的消息马上被消费了。

    1.4.Fanout方式(随机获取队列)

    上面我们是指定了队列,这个方式其实很不友好,比如对于Fanout,我其实根本无需关心队列的名字,如果还指定对应队列进行消费,感觉这个很冗余,所以我们这里就采用随机获取队列名字的方式,下面代码直接Copy官网。

    生成者封装:

    1. public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
    2.     ConnectionFactory factory = RabbitUtil.getConnectionFactory();
    3.     //创建连接
    4.     Connection connection = factory.newConnection();
    5.     //创建消息通道
    6.     Channel channel = connection.createChannel();
    7.     // 声明exchange中的消息
    8.     channel.exchangeDeclare(exchange, exchangeType);
    9.     // 发布消息
    10.     channel.basicPublish(exchange, ""null, message.getBytes("UTF-8"));
    11.     System.out.println("Sent '" + message + "'");
    12.     channel.close();
    13.     connection.close();
    14. }

    消费者封装:

    1. public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
    2.     ConnectionFactory factory = RabbitUtil.getConnectionFactory();
    3.     Connection connection = factory.newConnection();
    4.     final Channel channel = connection.createChannel();
    5.     channel.exchangeDeclare(exchange, "fanout");
    6.     String queueName = channel.queueDeclare().getQueue();
    7.     channel.queueBind(queueName, exchange, "");
    8.     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    9.     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    10.         String message = new String(delivery.getBody(), "UTF-8");
    11.         System.out.println(" [x] Received '" + message + "'");
    12.     };
    13.     channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    14. }

    生产者:

    1. public class FanoutProducer {
    2.     private static final String EXCHANGE_NAME = "fanout.exchange.v2";
    3.     public void publishMsg(String msg) {
    4.         try {
    5.             MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
    6.         } catch (Exception e) {
    7.             e.printStackTrace();
    8.         }
    9.     }
    10.     public static void main(String[] args) {
    11.         FanoutProducer directProducer = new FanoutProducer();
    12.         String msg = "hello >>> ";
    13.         for (int i = 0; i < 10000; i++) {
    14.             directProducer.publishMsg(msg + i);
    15.         }
    16.     }
    17. }

    消费者:

    1. public class FanoutConsumer {
    2.     private static final String EXCHANGE_NAME = "fanout.exchange.v2";
    3.     public void msgConsumer() {
    4.         try {
    5.             MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
    6.         } catch (IOException e) {
    7.             e.printStackTrace();
    8.         } catch (TimeoutException e) {
    9.             e.printStackTrace();
    10.         }
    11.     }
    12.     public static void main(String[] args) {
    13.         FanoutConsumer consumer = new FanoutConsumer();
    14.         for (int i = 0; i < 3; i++) {
    15.             consumer.msgConsumer();
    16.         }
    17.     }
    18. }

    执行后,管理界面如下:8e5f31531e1f4eddbc460ec7c49b85f1.png

     37f2cc9c906a4ed7baf4d5b964fd704b.png

    52bbefa22fdf4c40baaa416e3e21a18c.png

     

    1.5.Topic方式7f1e4ae8d53c45cd9e7fed6a1b4152ca.png

    代码详见官网:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

    更多方式,请直接查看官网:https://www.rabbitmq.com/getstarted.html

     c1f0f929966a453a940c856aa056a3c0.png

     

    2.RabbitMQ进阶

    参考文章:https://liuyueyi.github.io/hexblog/2018/05/29/RabbitMQ%E5%9F%BA%E7%A1%80%E6%95%99%E7%A8%8B%E4%B9%8B%E4%BD%BF%E7%94%A8%E8%BF%9B%E9%98%B6%E7%AF%87/

    2.1.durable和autoDeleted

    在定义Queue时,可以指定这两个参数:

    1. /**
    2.  * Declare an exchange.
    3.  * @see com.rabbitmq.client.AMQP.Exchange.Declare
    4.  * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
    5.  * @param exchange the name of the exchange
    6.  * @param type the exchange type
    7.  * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
    8.  * @param autoDelete true if the server should delete the exchange when it is no longer in use
    9.  * @param arguments other properties (construction arguments) for the exchange
    10.  * @return a declaration-confirm method to indicate the exchange was successfully declared
    11.  * @throws java.io.IOException if an error is encountered
    12.  */
    13. Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
    14.     Map arguments) throws IOException;
    15.     
    16. /**
    17. * Declare a queue
    18. * @see com.rabbitmq.client.AMQP.Queue.Declare
    19. * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
    20. * @param queue the name of the queue
    21. * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    22. * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
    23. * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    24. * @param arguments other properties (construction arguments) for the queue
    25. * @return a declaration-confirm method to indicate the queue was successfully declared
    26. * @throws java.io.IOException if an error is encountered
    27. */
    28. Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    29.     Map arguments) throws IOException;

    2.1.1.durable

    持久化,保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。

    若是将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,会重新读取之前被持久化的queue。

    虽然队列可以被持久化,但是里面的消息是否为持久化,还要看消息的持久化设置。即重启queue,但是queue里面还没有发出去的消息,那队列里面还存在该消息么?这个取决于该消息的设置。

    2.1.2.autoDeleted

    自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

    当一个Queue被设置为自动删除时,当消费者断掉之后,queue会被删除,这个主要针对的是一些不是特别重要的数据,不希望出现消息积累的情况。

    小节

    • 当一个Queue已经声明好了之后,不能更新durable或者autoDelted值;当需要修改时,需要先删除再重新声明

    • 消费的Queue声明应该和投递的Queue声明的 durable,autoDelted属性一致,否则会报错

    • 对于重要的数据,一般设置 durable=true, autoDeleted=false

    • 对于设置 autoDeleted=true 的队列,当没有消费者之后,队列会自动被删除

    2.2.ACK

    执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。

    但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。

    为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。

    因此手动ACK的常见手段:

    1. // 接收消息之后,主动ack/nak
    2. Consumer consumer = new DefaultConsumer(channel) {
    3.     @Override
    4.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    5.             byte[] body) throws IOException {
    6.         String message = new String(body, "UTF-8");
    7.         try {
    8.             System.out.println(" [ " + queue + " ] Received '" + message);
    9.             channel.basicAck(envelope.getDeliveryTag(), false);
    10.         } catch (Exception e) {
    11.             channel.basicNack(envelope.getDeliveryTag(), falsetrue);
    12.         }
    13.     }
    14. };
    15. // 取消自动ack
    16. channel.basicConsume(queue, false, consumer);

     

    总算是把“债”给还上了,感觉RabbitMQ搭建非常容易,然后官方文档也非常全,上手也很快,关于Kafka和RocketMQ,就不再搭建环境和实操了,等后续工作中有使用它们时,我再去倒腾。

     

    最后说一句(求关注,别白嫖我)

    如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下,您的支持是我坚持写作最大的动力。

    求一键三连:点赞、转发、在看。

     

     

     

  • 相关阅读:
    分布式应用全链路跟踪实现
    Linux信号量
    入职java学习总结
    redis数据结构
    在UE内添加控制台工程(Programs)
    39.cuBLAS开发指南中文版--cuBLAS中的Level-2函数hpr()
    Python中的函数
    SpringMVC:获取请求数据
    为什么使用Golang而非Rust开发桌面应用?
    计算机组成原理 学习笔记(持续学习中)
  • 原文地址:https://blog.csdn.net/weixin_48321993/article/details/126401366