• rabbitMQ:消费者确认模式


    为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

    在Consumer中Confirm模式中分为手动确认和自动确认。

    手动确认主要并使用以下方法:

    basicAck(): 用于肯定确认,multiple参数用于多个消息确认。

    basicRecover():是路由不成功的消息可以使用recovery重新发送到队列中。

    basicReject():是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。

    basicNack():可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true。

    消费者确认模式:自动确认

    1. package com.it.rabbitmq.confirm;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Receive {
    6. public static void main(String[] args) {
    7. //创建链接工厂对象
    8. ConnectionFactory factory=new ConnectionFactory();
    9. factory.setUsername("root");
    10. factory.setPassword("123456");
    11. factory.setHost("192.168.174.129");
    12. factory.setPort(5672);
    13. Connection connection=null;//定义链接对象
    14. Channel channel=null;//定义通道对象
    15. try {
    16. connection=factory.newConnection();//实例化链接对象
    17. channel=connection.createChannel();//实例化通道对象
    18. //指定Exchange的类型
    19. //参数1为 交换机名称
    20. //参数2为交换机类型取值为 direct、queue、topic、headers
    21. //参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化
    22. channel.queueDeclare("confirmQueue", true, false, false, null);
    23. channel.exchangeDeclare("directConfirmExchange", "direct", true);
    24. channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
    25. /**
    26. * 接收消息
    27. * 参数2为消息的确认机制,true表示自动消息确认,确认以后消息会从队列中被移除
    28. * 参数为false表示手动确认,必须显示的调用一些方法
    29. * 注意:
    30. * 1.如果我们只是接收了消息但是还没有来得及处理,当前应用或数据库方面出现问题
    31. * 但当前消息已经自动确认并删除,这就会丢失消息
    32. */
    33. channel.basicConsume("confirmQueue",true, "",new DefaultConsumer(channel) {
    34. public void handleDelivery(String consumerTag,
    35. Envelope envelope,
    36. AMQP.BasicProperties properties,
    37. byte[] body) throws IOException {
    38. //获取消息数据
    39. String bodyStr = new String(body);
    40. System.out.println("消费者处理了消息:"+bodyStr);
    41. }
    42. });
    43. } catch (IOException e) {
    44. e.printStackTrace();
    45. } catch (TimeoutException e) {
    46. e.printStackTrace();
    47. }
    48. }
    49. }

     进入控制台查看队列,消息已经被全部取出

    消费者确认模式:手动确认

    好处是不会因为程序有问题,导致自动确认消息后,造成部分数据丢失。

    1. package com.it.rabbitmq.exchange.confirm.addConfirmListener;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.ConfirmListener;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. public class Send {
    9. public static void main(String[] args) {
    10. //创建连接工厂
    11. ConnectionFactory factory=new ConnectionFactory();
    12. //配置rabbitMQ的连接信息
    13. factory.setHost("192.168.174.129");
    14. factory.setPort(5672);
    15. factory.setUsername("root");
    16. factory.setPassword("123456");
    17. //定义连接
    18. Connection connection=null;
    19. //定义通道
    20. Channel channel=null;
    21. try {
    22. connection=factory.newConnection();
    23. channel=connection.createChannel();
    24. channel.queueDeclare("confirmQueue",true,false,false,null);
    25. channel.exchangeDeclare("directConfirmExchange","direct",true);
    26. channel.queueBind("confirmQueue","directConfirmExchange","confirmRoutingKey");
    27. String message="普通发送者确认模式的测试消息!";
    28. //启动发送者确认模式
    29. channel.confirmSelect();
    30. //异步消息确认监听器,需要在消息发送前启动
    31. channel.addConfirmListener(new ConfirmListener() {
    32. //消息确认以后的回调方法
    33. //参数1:被确认的消息编号,从1开始自动递增,用于标记当前是第几个消息
    34. //参数2:当前消息是否同时确定了多个
    35. //参数2为true,表示小于等于当前消息编号的所有消息全部被确认,为false,表示只确定了当前消息
    36. public void handleAck(long l, boolean b) throws IOException {
    37. System.out.println("消息被确认了--消息编号"+l+" 是否确认了多条:"+b);
    38. }
    39. //消息没有确认的回调方法
    40. //如果这个方法被执行,表示当前消息没有被确认,需要进行消息的补发
    41. //参数1:没有确认的消息编号,从1开始自动递增,用于标记当前是第几个消息
    42. //参数2:当前消息是否同时没有确定了多个
    43. //如果参数2位true,表示小于当前编号的所有消息都没有发送成功,需要进行消息的补发
    44. //为false,表示当前编号消息没有发送成功,需要进行补发
    45. public void handleNack(long l, boolean b) throws IOException {
    46. System.out.println("消息没有被确认了--消息编号"+l+" 是否没有确认多条:"+b);
    47. }
    48. });
    49. for (int i=0;i<100;i++){
    50. channel.basicPublish("directConfirmExchange","confirmRoutingKey",null,message.getBytes("utf-8"));
    51. }
    52. System.out.println("消息发送成功");
    53. } catch (IOException e) {
    54. e.printStackTrace();
    55. } catch (TimeoutException e) {
    56. e.printStackTrace();
    57. } finally {
    58. if (channel!=null){
    59. try {
    60. channel.close();
    61. } catch (IOException e) {
    62. e.printStackTrace();
    63. } catch (TimeoutException e) {
    64. e.printStackTrace();
    65. }
    66. }
    67. if (connection!=null){
    68. try {
    69. connection.close();
    70. } catch (IOException e) {
    71. e.printStackTrace();
    72. }
    73. }
    74. }
    75. }
    76. }

    运行代码

    成功的全部取出

    如果不加basicAck方法会导致消息确认会全部转入上图的unacked队列,表示消息可以重复使用

  • 相关阅读:
    iTOP3399开发板Qt蜂鸣器和LED测试
    vim学习手册
    【继承顺序和方式,子类构造方法,protected 关键字,final 关键字】
    anroid html5 拍照扫码
    Pretrain-finetune、Prompting、Instruct-tuning训练方法的区别
    Java面试题初级准备
    详细介绍c++中的类
    并发编程学习笔记 之 Lock锁及其实现类ReentrantLock、ReentrantReadWriteLock和StampedLock的基本用法
    2021年认证杯SPSSPRO杯数学建模C题(第一阶段)破局共享汽车求解全过程文档及程序
    CCF CSP认证 历年题目自练Day36
  • 原文地址:https://blog.csdn.net/weixin_59334478/article/details/127737762