消息队列(Message Queue)是在消息的传输过程中保存消息的容器、 消息指的是两个应用间传递的数据。数据的类型有很多种形式

主要有三个作用
异步处理
场景说明: 用户注册后,需要发注册邮件和注册短信,传统的做法串行的
应用解耦
场景: 双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口

流量削峰
场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
关于消息队列的优点也就是上面列举的:解耦、异步、削峰。
缺点有以下几个:系统可用性降低、系统复杂度提高、一致性问题
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
AMQP和JMS
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
常见MQ产品 kafka、ActiveMQ、RocketMQ、RabbitMQ
下面我们来看一下,他们之间有什么区别,他们分别应该用于什么场景
AMQP 中的消息路由
pom.xml里导入相关的依赖:
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
application.properties配置文件
- spring.rabbitmq.host=192.168.152.155
- spring.rabbitmq.port=5672
- spring.rabbitmq.virtual-host=/
使用 Direct exchange(直通交换机)Rabbitmq的发送和接受消息
- package com.beijing.gulimall.product.rabbitmq;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Slf4j
- @Configuration
- public class DirectRabbitConfig {
-
- //创建队列
- @Bean
- public Queue TestDirectQueue() {
- //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- log.info("Queue[{}]创建成功", "TestDirectQueue");
- return new Queue("TestDirectQueue", true, false, false);
- }
-
- //创建交换机
- @Bean
- DirectExchange TestDirectExchange() {
- log.info("Exchange[{}]创建成功", "TestDirectExchange");
- return new DirectExchange("TestDirectExchange", true, false);
- }
-
- //创建绑定关系
- @Bean
- Binding TestBindingDirect() {
- // public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
- // Map
arguments) { - log.info("Binding[{}]创建成功", "TestBindingDirect");
- return new Binding("TestDirectQueue", Binding.DestinationType.QUEUE, "TestDirectExchange", "direct.test", null);
- }
-
- }
然后写个接口进行消息推送 SendMessageController.java:
- package com.beijing.gulimall.product.rabbitmq;
-
-
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.UUID;
-
- @RestController
- public class SendMessageController {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
-
- @RequestMapping("/hello")
- public void testRabbitMQ() {
- String messageId = String.valueOf(UUID.randomUUID());
- String messageData = "test message, hello!";
- String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- Map
map = new HashMap<>(); - map.put("messageId", messageId);
- map.put("messageData", messageData);
- map.put("createTime", createTime);
-
- //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
- rabbitTemplate.convertAndSend("TestDirectExchange", "direct.test", map);
- }
- }
配置一下Rabbit序列化对象的方式
- package com.beijing.gulimall.product.rabbitmq;
-
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitmqConfig {
-
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
- }
然后写个接口进行消息接收消息 主启动类上必须加 @EnableRabbit
- package com.beijing.gulimall.product.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import java.util.Map;
-
- @RabbitListener(queues = "TestDirectQueue")
- @Service
- public class ReceiveRabbitMQ {
-
-
- @RabbitHandler
- public void ReceiveRabbitMQ(Map
entity, Channel channel, Message message) { - /**
- * MQ监听消息
- * @param
- * @return
- * queues:指定监听那个队列 可以监听多个 只要收到消息,队列删除消息,而且只能有一个收到此消息
- * 1、Message msg:原生的消息详细信息内容 包括头和体
- * 2、T<发送的消息类型>:内容,会根据发送的消息类型自动跳转不同的方法
- * 3、通道 channel 当前传输的数据的通道
- *
- * 场景:
- * 1、假设是集群部署当前项目 都有这段代码 同一个消息只能由一个服务消费
- * 2、只有一个消息处理完成之后(方法执行完)才能继续接受下一个消息
- *
- * RabbitListener (可以标记在方法(表示当前方法监听队列)和类上(配合 RabbitHandler,整个类中标记了改注解的都会监听指定的队列)):指定监听的队列,当发送的消息类型不同时,可以使用RabbitHandler标记不同的方法,接收的类型不同,进入的监听方法就不同
- * RabbitHandler 只能标记在方法上,用于监听接收同一个队列中的不同的消息类型的消息
- */
-
- System.out.println(entity);
- System.out.println(channel);
- System.out.println(message.getBody());
- System.out.println(message.getMessageProperties());
- }
-
-
-
-
- }
可靠抵达-ConfirmCallback
spring.rabbitmq.publisher-confirms=true
- #开启发送端确认 生产者Publisher 到服务器Broker
- spring.rabbitmq.publisher-confirms=true
-
- #开启发送端消息抵达队列的确认
- spring.rabbitmq.publisher-returns=true
- #只要抵达队列,以异步发送优先回调我们这个returnConfirm
- spring.rabbitmq.template.mandatory=true
实现代码:
- package com.beijing.gulimall.product.rabbitmq;
-
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- @Configuration
- public class RabbitmqConfig {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
- /**
- * 定制RabbitTemplate
- * 1、服务收到消息就回调
- * 1、spring.rabbitmq.publisher-confirms=true
- * 2、设置确认回调 ConfirmCallback
- * 2、消息正确抵达队列进行回调
- * 1、spring.rabbitmq.publisher-returns=true
- * spring.rabbitmq.template.mandatory=true
- * 2、设置确认回调ReturnCallback
- * 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
- * spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动ack消息
- * 1、默认是自动确认的,只要消息接收到,客户端自动确认,服务端就会移除这个消息
- * 问题:
- * 我们收到很多消息,自动回复给服务器ack,只要一个消息处理成功,宕机了。发送消息丢失;
- * 手动确认。只要我们没有明确告诉MQ,货物被签收。没有ack,消息就一直是unacked状态。即使Consumer宕机。消息不会丢失,会重新变为Ready模式
- *
- * 2、如何签收:
- * channel. basicAck(deliveryTag, false);签收;业务成功完成就应该签收
- * channel。basicNack(deliveryTag, false, true);拒签;业务失败,拒签
- */
-
-
- //p->b
- @PostConstruct //MyRabbitConfig 对象创建完成以后,执行这个方法
- public void initRabbitTemplate() {
-
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- /**
- *
- * @param correlationData 当前消息的唯一管理数据(这个是消息的唯一id)
- * @param ack 消息是否成功收到
- * @param cause 失败的原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- /**
- * 1、做好消息确认机制(producer,consumer【手动ack】)
- * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一遍
- */
- //服务器收到了
- //修改消息状态
- System.out.println("confirm...correlationData[" + correlationData + "]" + "ack == [" + ack + "]" + "cause ==[" + cause + "]");
- }
- });
-
- //e->q
- //设置消息抵达队列的确认回调
- rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
-
- /**
- * 只要消息没有投递给指定的队列,就触发这个失败回调
- * @param message 投递失败的消息详细消息
- * @param replyCode 回复的状态码
- * @param replyText 回复的文本内容
- * @param exchange 当时这个消息发给那个交换机
- * @param routingKey 当时这个消息用那个路由键
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- //报错误了。修改数据库当前消息的状态 -> 错误
- System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>"+exchange+"==>[["+routingKey+"]]");
- }
- });
- }
-
- }
测一下 故意把路由写成 p->b 成功 e->失败
- @RequestMapping("/hello")
- public void testRabbitMQ() {
- String messageId = String.valueOf(UUID.randomUUID());
- String messageData = "test message, hello!";
- String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- Map
map = new HashMap<>(); - map.put("messageId", messageId);
- map.put("messageData", messageData);
- map.put("createTime", createTime);
- //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
- rabbitTemplate.convertAndSend("TestDirectExchange", "direct.te12312st", map);
- }
结果
Fail Message[(Body:'{"createTime":"2023-10-13 16:38:17","messageId":"b916a451-b96b-4a41-9229-c8b12d5710a0","messageData":"test message, hello!"}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, X-B3-SpanId=174db242dccf34df, __KeyTypeId__=java.lang.Object, X-B3-ParentSpanId=f0a4cf84a15d8d32, X-B3-Sampled=0, X-B3-TraceId=f0a4cf84a15d8d32, __TypeId__=java.util.HashMap}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]==>replyCode[312]==>replyText[NO_ROUTE]==>TestDirectExchange==>[[direct.te12312st]]
confirm...correlationData[null]ack == [true]cause ==[null]
spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.Map;
-
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.lihailin.common.utils.PageUtils;
- import com.lihailin.common.utils.Query;
-
-
- @RabbitListener(queues = {"hello-java-queue"})
- @Service("orderItemService")
- @Slf4j
- public class OrderItemServiceImpl extends ServiceImpl
implements OrderItemService { -
- @Override
- public PageUtils queryPage(Map
params) { - IPage
page = this.page( - new Query
().getPage(params), - new QueryWrapper
() - );
-
- return new PageUtils(page);
- }
-
- /**
- * queues :声明需要监听的所有队列
- * 参数可以写一下类型
- * 1、Message message: 原生信息详细信息。 头 + 体
- * 2、T<发送的消息的类型> OrderReturnReasonEntity content
- * 3、Channel channel:当前传输数据的通道
- *
- * Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
- * 场景:
- * 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
- * 2)、只要一个消息完成处理完,方法运行结束,我们就可以接收收到下一个消息
- */
- @RabbitHandler
- public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException, IOException {
- //{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1678017004436}
- //byte[] body = message.getBody();
- //消息头属性信息
- log.info("OrderReturnReasonEntity 接收到消息。。。:{}", content);
- //System.out.println("OrderReturnReasonEntity 接收到消息。。。:"+content);
- //Thread.sleep(3000);
- //byte[] body = message.getBody();
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- log.info("deliveryTag =>{}:", deliveryTag);
-
- try {
- if (deliveryTag % 2 == 0){
- //验收货物,非配量模式
- channel.basicAck(deliveryTag, false);
- log.info(" 验收货物成功 deliveryTag =>{}:", deliveryTag);
- }else {
- //退货 requeue = false 丢弃 requeue= true 发回服务器,服务器重新入队
- channel.basicNack(deliveryTag,false,false);
- log.info("拒收了 货物");
- }
-
- } catch (Exception e) {
- //网络中断
- }
- }
-
- }
测试结果:
消息的TTL(Time To Live)
Dead Letter Exchanges(DLX)

SpringBoot中使用延时队列
演示:创建queue、exchange、binding
-
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Configuration
- public class MyMQConfig {
-
-
-
-
-
- //@Bean Binding,Queue,Exchange
- /**
- * 容器中的Binding,Queue,Exchange 都会自动创建(RabbitMQ 没有的情况)
- *
- * RabbitMQ 只要有。@Bean声明属性发生变化也不会覆盖
- */
- @Bean
- public Queue orderDelayQueue() {
- Map
arguments = new HashMap<>(); - /**
- x-dead-letter-exchange: order-event-exchange
- x-dead-letter-routing-key: order.release.order
- x-message-ttl: 60000
- */
- arguments.put("x-dead-letter-exchange", "order-event-exchange");
- arguments.put("x-dead-letter-routing-key", "order.release.order");
- arguments.put("x-message-ttl", 60000);
-
- //(String name, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments) { - Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
- return queue;
- }
-
- @Bean
- public Queue orderReleaseOrderQueue() {
- Queue queue = new Queue("order.release.order.queue", true, false, false);
- return queue;
- }
-
- @Bean
- public Exchange orderEventExchange() {
- //String name, boolean durable, boolean autoDelete, Map
arguments - return new TopicExchange("order-event-exchange", true, false);
- }
-
- @Bean
- public Binding orderCreateOrderBinding() {
- //String destination, DestinationType destinationType, String exchange, String routingKey, Map
arguments - return new Binding("order.delay.queue",
- Binding.DestinationType.QUEUE,
- "order-event-exchange",
- "order.create.order",
- null);
- }
-
- @Bean
- public Binding orderReleaseOrderBinding() {
- return new Binding("order.release.order.queue",
- Binding.DestinationType.QUEUE,
- "order-event-exchange",
- "order.release.order",
- null);
- }
- }
发送消息
- @ResponseBody
- @GetMapping(value = "/test/createOrder")
- public String createOrderTest() {
-
- //订单下单成功
- OrderEntity orderEntity = new OrderEntity();
- orderEntity.setOrderSn(UUID.randomUUID().toString());
- orderEntity.setModifyTime(new Date());
- //给MQ发送消息
- rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
- return "ok";
- }
接收消息
- package com.lihailin.gulimall.order.listener;
-
- import com.lihailin.gulimall.order.entity.OrderEntity;
- import com.lihailin.gulimall.order.service.OrderService;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
-
- @RabbitListener(queues = "order.release.order.queue")
- @Service
- public class OrderCloseListener {
-
-
- @Autowired
- OrderService orderService;
-
- @RabbitHandler
- public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
- System.out.println("收到过期的订单信息:准备关闭订单" + entity.getOrderSn());
-
- try {
- orderService.closeOrder(entity);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }catch (Exception e){
- channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
- }
-
- }
- }
如何保证消息可靠性-消息丢失