目录
消息消费时的幂等性(消息不被重复消费),同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;
幂等性:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;
接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的,比如:注册接口;发送短信验证码接口;还有比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的。
幂等性是分布式系统设计中的一个重要概念,是在做系统或者接口设计时要着重考虑的问题,尤其像支付宝、银行、互联网金融等涉及钱的系统,既要高效,数据也要准确,绝对不能出现多扣款,多打款等问题,幂等性的设计就显得更为重要了。
在RabbitMQ也回出现这个问题,RabbitMQ把消息发送给消费者消费,消费者消费成功并返回ack消息,但这时候网络中断了,RabbitMQ没有收到ack消息,这就让RabbitMQ误以为消息消费失败了,然后RabbitMQ会重新把该条消息发送给其他的消费者,或者等网络重连后再次发送给该候消费者,这时候就会造成重复消费的问题。
消息发送到代理并持久化后,由于网络断开或者客户端崩溃,代理未能回复客户端,导致生产者认为代理没有收到消息而重新发送,结果消费者收到两条具有相同内容和消息ID的消息
消息发送给消费者后,由于网络断开等原因,消费者客户端没有向broker返回ACK响应,代理不知道消息是否被消费,为了确保消息至少被消费一次,代理在网络恢复后再次传递消息,结果消费者就收到了两条具有相同内容和消息ID的消息。
全局唯一ID + Redis
生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;
- @RabbitListener(queues = "durable-queue")
- @RabbitHandler
- public void processIde(Message message, Channel channel) throws IOException {
-
- if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
- // 业务操作...
- System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
-
- // 手动确认
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
Message
在消息传递的过程中,实际上传递的对象为
org.springframework.amqp.core.Message
,它主要由两部分组成:
MessageProperties // 消息属性
byte[] body // 消息内容
@RabbitListener
使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
- application/octet-stream:二进制字节数组存储,使用 byte[]
- application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
- text/plain:文本数据类型存储,使用 String
- application/json:JSON 格式,使用 Object、相应类型