• C# 对RabbitMQ使用


    1、安装NuGet包RabbitMQ.Client

     2    生产者-确认机制   

    (1). 含义:就是应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示收到了。

    (2). 特点:异步模式,在响应之前,可以继续发送消息,单条消息、批量消息均可继续发送。

    (3). 核心代码:单条消息确认: channel.waitForConfirms()

             批量消息确认: channel.waitForConfirmsOrDie()

    大致流程:channel.ConfirmSelect();  开启确认模式→发送消息→提供一个回执方法WaitForConfirms();   返回一个bool 值

    1. using RabbitMQ.Client;
    2. using RabbitMQ.Client.Events;
    3. using System.Text;
    4. var factory = new ConnectionFactory();
    5. factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
    6. factory.DispatchConsumersAsync = true;//支持异步发送消息
    7. string exchangeName = "exchange1";//交换机的名字
    8. string eventName = "myEvent";// routingKey的值
    9. using var conn = factory.CreateConnection();
    10. while (true)
    11. {
    12. string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
    13. using (var channel = conn.CreateModel())//创建信道
    14. {
    15. try
    16. {
    17. var properties = channel.CreateBasicProperties();
    18. properties.DeliveryMode = 2;//1非持久化、2是持久化
    19. //交换机 Name:交换机名称。
    20. // Type: 交换机类型——direct、topic、fanout、headers、sharding
    21. //Durable:消息代理重启后,交换机是否还存在。
    22. //Auto-delete :当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它。
    23. //Arguments:依赖代理本身。
    24. channel.ExchangeDeclare(exchange: exchangeName, type: "direct", true, false);//声明交换机
    25. byte[] body = Encoding.UTF8.GetBytes(msg);
    26. channel.ConfirmSelect();//开启消息确认模式
    27. //发布消息
    28. //exchange:交换机名称
    29. //mandatory为true时,表示如果消息没有被正确路由,消息将退回消息的生产者 如果设置为false,那么broker端自动删除该消息。
    30. //routingKey:路由键
    31. //props:消息属性字段,比如消息头部信息等等
    32. //body:消息主体部分
    33. channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
    34. mandatory: true, basicProperties: properties, body: body);
    35. Console.WriteLine("发布了消息:" + msg);
    36. /*首先开启Confirm模式,通知消息生产者成功推送到RabbitMQ中*/
    37. if (channel.WaitForConfirms()) //单条消息确认
    38. {
    39. //表示消息发送成功(已经存入队列)
    40. Console.WriteLine($"【{msg}】成功发送到RabbitMQ!");
    41. }
    42. else
    43. {
    44. Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");
    45. }
    46. //channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行, 如果有消息发送失败;就抛出异常;
    47. }
    48. catch (Exception ex)
    49. {
    50. //表示消息发送失败
    51. Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");
    52. }
    53. }
    54. Thread.Sleep(1000);
    55. }

    运行结果:

    2. TX事务模式

    (1). 含义:基于AMPQ协议;可以让信道设置成一个带事务的信道,分为三步:开启事务、提交事务、事务回滚

    (2). 特点:同步模式,在事务提交之前不能继续发送消息,该模式相比Confirm模式效率差一点。

    (3). 核心代码:channel.TxSelect();   开启一个事务

             channel.TxCommit();  提交事务, 这一步成功后,消息才真正的写入队列

           channel.TxRollback();   事务回滚

    1. using RabbitMQ.Client;
    2. using RabbitMQ.Client.Events;
    3. using System.Text;
    4. var factory = new ConnectionFactory();
    5. factory.HostName = "127.0.0.1";
    6. factory.DispatchConsumersAsync = true;
    7. string exchangeName = "exchange1";
    8. string eventName = "myEvent";
    9. using var conn = factory.CreateConnection();
    10. while (true)
    11. {
    12. string msg = DateTime.Now.TimeOfDay.ToString();
    13. using (var channel = conn.CreateModel())
    14. {
    15. try
    16. {
    17. var properties = channel.CreateBasicProperties();
    18. properties.DeliveryMode = 2;
    19. channel.ExchangeDeclare(exchange: exchangeName, type: "direct", true, false);
    20. byte[] body = Encoding.UTF8.GetBytes(msg);
    21. channel.TxSelect(); //开启事务
    22. channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
    23. mandatory: true, basicProperties: properties, body: body);
    24. Console.WriteLine($"【{msg}】成功发送到RabbitMQ!");
    25. channel.TxCommit(); //只有事务提交成功以后,才会真正的写入到队列里面去
    26. }
    27. catch (Exception ex)
    28. {
    29. //表示消息发送失败
    30. Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");
    31. channel.TxRollback();
    32. }
    33. }
    34. Thread.Sleep(1000);
    35. }

    3、消费者(手动确认)

    (1) 含义:消费者消费一条,回执给RabbitMq一条消息,Rabbitmq 只删除当前这一条消息;相当于是一条消费了,删除一条消息,性能稍微低一些;

    (2) 特点:消费1条应答一次,可以告诉RabbitMq消费成功or失败,消费成功,服务器删除该条消息,消费失败,可以删除也可以重新写入。

    (3) 核心代码:autoAck: false,表示不自动确认

     然后:channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);    表示消费成功

     channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);         表示消费失败, 可以配置:requeue: true:重新写入到队列里去; false: 删除消息

    1. using RabbitMQ.Client;
    2. using RabbitMQ.Client.Events;
    3. using System.Text;
    4. var factory = new ConnectionFactory();
    5. factory.HostName = "127.0.0.1";//服务器地址
    6. factory.DispatchConsumersAsync = true;//支持异步接受
    7. string exchangeName = "exchange1";//交换机的名称
    8. string eventName = "myEvent";//路由键
    9. using var conn = factory.CreateConnection();
    10. using var channel = conn.CreateModel();//创建信道
    11. string queueName = "queue1";//队列名称
    12. //声明了交换机
    13. channel.ExchangeDeclare(exchange: exchangeName, type: "direct",true,false);
    14. //声明一个队列
    15. //queuename: 队列的名称
    16. //durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
    17. //exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
    18. //autoDelete:队列如果与Exchange未绑定,则自动删除
    19. //arguments:扩展参数
    20. channel.QueueDeclare(queue: queueName, durable: true,
    21. exclusive: false, autoDelete: false, arguments: null);
    22. //绑定队列
    23. channel.QueueBind(queue: queueName,
    24. exchange: exchangeName, routingKey: eventName);
    25. //创建一个消费者
    26. var consumer = new AsyncEventingBasicConsumer(channel);
    27. consumer.Received += Consumer_Received;
    28. //注册消费者订阅
    29. //autoAck 是否自动确认消息, true自动确认,false 不自动要手动调用, 建立设置为false
    30. channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    31. Console.ReadLine();
    32. async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
    33. {
    34. try
    35. {
    36. var bytes = args.Body.ToArray();
    37. string msg = Encoding.UTF8.GetString(bytes);
    38. Console.WriteLine(DateTime.Now + "收到了消息" + msg);
    39. //DeliveryTag: 唯一的编号
    40. //multiple:是否批量确认.true:将一次性ack所有小于deliveryTag的消息。
    41. channel.BasicAck(args.DeliveryTag, multiple: false);手动确认
    42. await Task.Delay(800);
    43. }
    44. catch (Exception ex)
    45. {
    46. //异常重试
    47. //DeliveryTag: 唯一的编号
    48. //requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
    49. channel.BasicReject(args.DeliveryTag, true);
    50. Console.WriteLine("处理收到的消息出错" + ex);
    51. }
    52. }

    运行结果

    参考

    【Windows安装RabbitMQ详细教程】_慕之寒的博客-CSDN博客_rabbitmq安装windows

    第四节:RabbitMq剖析生产者、消费者的几种消息确认机制(Confirm、事务、自动、手动) - Yaopengfei - 博客园

  • 相关阅读:
    LLM强势挺进端侧,AI大语言模型端侧部署如何影响超自动化?
    node的个个版本的下载
    笔试选择题-图
    筋膜枪控制、驱动方案推荐:有感方波、无感方波控制
    yolox转rknn
    如何快速清理c盘缓存垃圾(最简单的c盘清理方法)
    【无标题】
    Java(二):IDEA使用教程
    【信号去噪】基于麻雀搜索算法优化VM实现信号去噪附matlab代码
    理解React Hooks看这一篇就够了
  • 原文地址:https://blog.csdn.net/qq_42335551/article/details/127302909