
目录
大家好,我是月夜枫,消息队列的6篇文章目前就先写到这了,因为个人学习重点的调整,写完这篇文章后,要和小伙伴们暂时告别一段时间了,预计在9月初为大家带来新知识点的分享!!!
这篇文章主要讲述RabbitMQ常用的使用姿势,希望在面试和工作中能给小伙伴们带来帮助!
封装工厂类:
- public class RabbitUtil {
- public static ConnectionFactory getConnectionFactory() {
- //创建连接工程,下面给出的是默认的case
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("127.0.0.1");
- factory.setPort(5672);
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setVirtualHost("/");
- return factory;
- }
- }
封装生成者:
- public class MsgProducer {
- public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException {
- ConnectionFactory factory = RabbitUtil.getConnectionFactory();
- //创建连接
- Connection connection = factory.newConnection();
- //创建消息通道
- Channel channel = connection.createChannel();
- // 声明exchange中的消息为可持久化,不自动删除
- channel.exchangeDeclare(exchange, exchangeType, true, false, null);
- // 发布消息
- channel.basicPublish(exchange, toutingKey, null, message.getBytes());
- System.out.println("Sent '" + message + "'");
- channel.close();
- connection.close();
- }
- }
封装消费者:
- public class MsgConsumer {
- public static void consumerMsg(String exchange, String queue, String routingKey)
- throws IOException, TimeoutException {
- ConnectionFactory factory = RabbitUtil.getConnectionFactory();
- //创建连接
- Connection connection = factory.newConnection();
- //创建消息信道
- final Channel channel = connection.createChannel();
- //消息队列
- channel.queueDeclare(queue, true, false, false, null);
- //绑定队列到交换机
- channel.queueBind(queue, exchange, routingKey);
- System.out.println("[*] Waiting for message. To exist press CTRL+C");
-
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- try {
- System.out.println(" [x] Received '" + message);
- } finally {
- System.out.println(" [x] Done");
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- // 取消自动ack
- channel.basicConsume(queue, false, consumer);
- }
- }

生产者:
- public class DirectConsumer {
- private static final String exchangeName = "direct.exchange";
- public void msgConsumer(String queueName, String routingKey) {
- try {
- MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws InterruptedException {
- DirectConsumer consumer = new DirectConsumer();
- String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
- String[] queueNames = new String[]{"qa", "qb", "qc"};
-
- for (int i = 0; i < 3; i++) {
- consumer.msgConsumer(queueNames[i], routingKey[i]);
- }
- Thread.sleep(1000 * 60 * 100);
- }
- }
执行生产者,往消息队列中放入10条消息,其中key分别为“aaa”、“bbb”和“ccc”,分别放入qa、qb、qc三个队列:
下面是qa队列的信息:
消费者:
- public class DirectProducer {
- private static final String EXCHANGE_NAME = "direct.exchange";
- public void publishMsg(String routingKey, String msg) {
- try {
- MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) throws InterruptedException {
- DirectProducer directProducer = new DirectProducer();
- String[] routingKey = new String[]{"aaa", "bbb", "ccc"};
- String msg = "hello >>> ";
- for (int i = 0; i < 10; i++) {
- directProducer.publishMsg(routingKey[i % 3], msg + i);
- }
- System.out.println("----over-------");
- Thread.sleep(1000 * 60 * 100);
- }
- }
执行后的输出:
- [*] Waiting for message. To exist press CTRL+C
- [x] Received 'hello >>> 0
- [x] Done
- [x] Received 'hello >>> 3
- [x] Done
- [x] Received 'hello >>> 6
- [x] Done
- [x] Received 'hello >>> 9
- [x] Done
- [*] Waiting for message. To exist press CTRL+C
- [x] Received 'hello >>> 1
- [x] Done
- [x] Received 'hello >>> 4
- [x] Done
- [x] Received 'hello >>> 7
- [x] Done
- [*] Waiting for message. To exist press CTRL+C
- [x] Received 'hello >>> 2
- [x] Done
- [x] Received 'hello >>> 5
- [x] Done
- [x] Received 'hello >>> 8
- [x] Done
可以看到,分别从qa、qb、qc中将不同的key的数据消费掉。
有个疑问:这个队列的名称qa、qb和qc是RabbitMQ自动生成的么,我们可以指定队列名称么?
我做了个简单的实验,我把消费者代码修改了一下:
执行后入下图所示:
我们可以发现,多了一个qc1,所以可以判断这个界面中的queues,是消费者执行时,会将消费者指定的队列名称和direct.exchange绑定,绑定的依据就是key。
当我们把队列中的数据全部消费掉,然后重新执行生成者后,会发现qc和qc1中都有3条待消费的数据,因为绑定的key都是“ccc”,所以两者的数据是一样的:
绑定关系如下:
注意:当没有Queue绑定到Exchange时,往Exchange中写入的消息也不会重新分发到之后绑定的queue上。
思考:不执行消费者,看不到这个Queres中信息,我其实可以把这个界面理解为消费者信息界面。不过感觉还是怪怪的,这个queues如果是消费者信息,就不应该叫queues,我理解queues应该是RabbitMQ中实际存放数据的queues,难道是我理解错了?

生产者封装:
- public class FanoutProducer {
- private static final String EXCHANGE_NAME = "fanout.exchange";
- public void publishMsg(String routingKey, String msg) {
- try {
- MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- FanoutProducer directProducer = new FanoutProducer();
- String msg = "hello >>> ";
- for (int i = 0; i < 10; i++) {
- directProducer.publishMsg("", msg + i);
- }
- }
- }
消费者:
- public class FanoutConsumer {
- private static final String EXCHANGE_NAME = "fanout.exchange";
- public void msgConsumer(String queueName, String routingKey) {
- try {
- MsgConsumer.consumerMsg(EXCHANGE_NAME, queueName, routingKey);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- FanoutConsumer consumer = new FanoutConsumer();
- String[] queueNames = new String[]{"qa-2", "qb-2", "qc-2"};
- for (int i = 0; i < 3; i++) {
- consumer.msgConsumer(queueNames[i], "");
- }
- }
- }
执行生成者,结果如下:
我们发现,生产者生产的10条数据,在每个消费者中都可以消费,这个是和Direct不同的地方,但是使用Fanout方式时,有几个点需要注意一下:
生产者的routkey可以为空,因为生产者的所有数据,会下放到每一个队列,所以不会通过routkey去路由;
消费者需要指定queues,因为消费者需要绑定到指定的queues才能消费。
这幅图就画出了Fanout的精髓之处,exchange会和所有的queue进行绑定,不区分路由,消费者需要绑定指定的queue才能发起消费。
注意:往队列塞数据时,可能通过界面看不到消息个数的增加,可能是你之前已经开启了消费进程,导致增加的消息马上被消费了。
上面我们是指定了队列,这个方式其实很不友好,比如对于Fanout,我其实根本无需关心队列的名字,如果还指定对应队列进行消费,感觉这个很冗余,所以我们这里就采用随机获取队列名字的方式,下面代码直接Copy官网。
生成者封装:
- public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException {
- ConnectionFactory factory = RabbitUtil.getConnectionFactory();
- //创建连接
- Connection connection = factory.newConnection();
- //创建消息通道
- Channel channel = connection.createChannel();
-
- // 声明exchange中的消息
- channel.exchangeDeclare(exchange, exchangeType);
-
- // 发布消息
- channel.basicPublish(exchange, "", null, message.getBytes("UTF-8"));
-
- System.out.println("Sent '" + message + "'");
- channel.close();
- connection.close();
- }
消费者封装:
- public static void consumerMsgV2(String exchange) throws IOException, TimeoutException {
- ConnectionFactory factory = RabbitUtil.getConnectionFactory();
- Connection connection = factory.newConnection();
- final Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(exchange, "fanout");
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, exchange, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
生产者:
- public class FanoutProducer {
- private static final String EXCHANGE_NAME = "fanout.exchange.v2";
- public void publishMsg(String msg) {
- try {
- MsgProducer.publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, msg);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- FanoutProducer directProducer = new FanoutProducer();
- String msg = "hello >>> ";
- for (int i = 0; i < 10000; i++) {
- directProducer.publishMsg(msg + i);
- }
- }
- }
消费者:
- public class FanoutConsumer {
- private static final String EXCHANGE_NAME = "fanout.exchange.v2";
- public void msgConsumer() {
- try {
- MsgConsumer.consumerMsgV2(EXCHANGE_NAME);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- FanoutConsumer consumer = new FanoutConsumer();
- for (int i = 0; i < 3; i++) {
- consumer.msgConsumer();
- }
- }
- }
执行后,管理界面如下:



代码详见官网:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
更多方式,请直接查看官网:https://www.rabbitmq.com/getstarted.html

参考文章:https://liuyueyi.github.io/hexblog/2018/05/29/RabbitMQ%E5%9F%BA%E7%A1%80%E6%95%99%E7%A8%8B%E4%B9%8B%E4%BD%BF%E7%94%A8%E8%BF%9B%E9%98%B6%E7%AF%87/
在定义Queue时,可以指定这两个参数:
- /**
- * Declare an exchange.
- * @see com.rabbitmq.client.AMQP.Exchange.Declare
- * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
- * @param exchange the name of the exchange
- * @param type the exchange type
- * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
- * @param autoDelete true if the server should delete the exchange when it is no longer in use
- * @param arguments other properties (construction arguments) for the exchange
- * @return a declaration-confirm method to indicate the exchange was successfully declared
- * @throws java.io.IOException if an error is encountered
- */
- Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
- Map
arguments) throws IOException; -
- /**
- * Declare a queue
- * @see com.rabbitmq.client.AMQP.Queue.Declare
- * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
- * @param queue the name of the queue
- * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
- * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
- * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
- * @param arguments other properties (construction arguments) for the queue
- * @return a declaration-confirm method to indicate the queue was successfully declared
- * @throws java.io.IOException if an error is encountered
- */
- Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
- Map
arguments) throws IOException;
持久化,保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
若是将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,会重新读取之前被持久化的queue。
虽然队列可以被持久化,但是里面的消息是否为持久化,还要看消息的持久化设置。即重启queue,但是queue里面还没有发出去的消息,那队列里面还存在该消息么?这个取决于该消息的设置。
自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
当一个Queue被设置为自动删除时,当消费者断掉之后,queue会被删除,这个主要针对的是一些不是特别重要的数据,不希望出现消息积累的情况。
小节
当一个Queue已经声明好了之后,不能更新durable或者autoDelted值;当需要修改时,需要先删除再重新声明
消费的Queue声明应该和投递的Queue声明的 durable,autoDelted属性一致,否则会报错
对于重要的数据,一般设置 durable=true, autoDeleted=false
对于设置 autoDeleted=true 的队列,当没有消费者之后,队列会自动被删除
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
因此手动ACK的常见手段:
- // 接收消息之后,主动ack/nak
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- try {
- System.out.println(" [ " + queue + " ] Received '" + message);
- channel.basicAck(envelope.getDeliveryTag(), false);
- } catch (Exception e) {
- channel.basicNack(envelope.getDeliveryTag(), false, true);
- }
- }
- };
- // 取消自动ack
- channel.basicConsume(queue, false, consumer);
总算是把“债”给还上了,感觉RabbitMQ搭建非常容易,然后官方文档也非常全,上手也很快,关于Kafka和RocketMQ,就不再搭建环境和实操了,等后续工作中有使用它们时,我再去倒腾。
最后说一句(求关注,别白嫖我)
如果这篇文章对您有所帮助,或者有所启发的话,帮忙关注一下,您的支持是我坚持写作最大的动力。
求一键三连:点赞、转发、在看。