RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
Confirm保证消息到达exchange。
Return机制监听消息是否从exchange送到指定的queue中。
消费者在消费消息时,如果消费者宕机了,可以使用手动ACK,但是会导致重复消费的问题
直接在生产者里面加入下面代码就可以
普通Confirm方式
- //3.1 开启confirm
- channel.confirmSelect();
- //3.2 发送消息
- String msg = "Hello-World!";
- channel.basicPublish("","HelloWorld",null,msg.getBytes());
- //3.3 判断消息发送是否成功
- if(channel.waitForConfirms()){
- System.out.println("消息发送成功");
- }else{
- System.out.println("发送消息失败");
- }
批量Confirm方式
- //3.1 开启confirm
- channel.confirmSelect();
- //3.2 批量发送消息
- for (int i = 0; i < 1000; i++) {
- String msg = "Hello-World!" + i;
- channel.basicPublish("","HelloWorld",null,msg.getBytes());
- }
- //3.3 确定批量操作是否成功
- channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
异步Confirm方式。
- //3.1 开启confirm
- channel.confirmSelect();
- //3.2 批量发送消息
- for (int i = 0; i < 1000; i++) {
- String msg = "Hello-World!" + i;
- channel.basicPublish("","HelloWorld",null,msg.getBytes());
- }
- //3.3 开启异步回调
- channel.addConfirmListener(new ConfirmListener() {
-
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
- }
-
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
- }
- });

Return机制
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中

开启Return机制,并在发送消息时,指定mandatory为true 生产者里加
- // 开启return机制
- channel.addReturnListener(new ReturnListener() {
- @Override
- public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 当消息没有送达到queue时,才会执行。
- System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
- }
- });
-
- // 在发送消息时,指定mandatory参数为true
- channel.basicPublish("","HelloWorld",true,null,msg.getBytes());
编写配置文件
- spring:
- rabbitmq:
- publisher-confirm-type: simple
- publisher-returns: true
开启Confirm和Return
- @Component
- public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct // init-method
- public void initMethod(){
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
- }
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if(ack){
- System.out.println("消息已经送达到Exchange");
- }else{
- System.out.println("消息没有送达到Exchange");
- }
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- System.out.println("消息没有送达到Queue");
- }
- }
mave实现
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId
- AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
- .deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
- .messageId(UUID.randomUUID().toString())
- .build();
- String msg = "Hello-World!";
- channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
消费者,在消费消息时,根据具体业务逻辑去操作redis
- efaultConsumer consume = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- Jedis jedis = new Jedis("192.168.199.109",6379);
- String messageId = properties.getMessageId();
- //1. setnx到Redis中,默认指定value-0
- String result = jedis.set(messageId, "0", "NX", "EX", 10);
- if(result != null && result.equalsIgnoreCase("OK")) {
- System.out.println("接收到消息:" + new String(body, "UTF-8"));
- //2. 消费成功,set messageId 1
- jedis.set(messageId,"1");
- channel.basicAck(envelope.getDeliveryTag(),false);
- }else {
- //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
- String s = jedis.get(messageId);
- if("1".equalsIgnoreCase(s)){
- channel.basicAck(envelope.getDeliveryTag(),false);
- }
- }
- }
- };
导入依赖
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-data-redisartifactId>
- dependency>
配置文件
- spring:
- redis:
- host: 192.168.199.109
- port: 6379
修改生产者
- @Test
- void contextLoads() throws IOException {
- CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
- System.in.read();
- }
修改消费者
- @Autowired
- private StringRedisTemplate redisTemplate;
-
-
- @RabbitListener(queues = "boot-queue")
- public void getMessage(String msg, Channel channel, Message message) throws IOException {
- //0. 获取MessageId
- String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
- //1. 设置key到Redis
- if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
- //2. 消费消息
- System.out.println("接收到消息:" + msg);
-
- //3. 设置key的value为1
- redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
- //4. 手动ack
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }else {
- //5. 获取Redis中的value即可 如果是1,手动ack
- if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
- }
- }
- }