• nodejs操作rabbitMQ amqplib库 消息持久化


    config.js

    1. const { MQ_HOST, HOST, MQ_PORT } = process.env;
    2. const mqHost = MQ_HOST || HOST || "127.0.0.1";
    3. const mqPort = MQ_PORT || 5672;
    4. const mqUsername = "root";
    5. const mqPassword = "password";
    6. const mqProtocol = "amqp";
    7. const exchangeName = 'exchange_direct_saas'; //交换机
    8. const queueName = 'queue_direct_saas';
    9. const routingKey = 'saasIsolution';//路由key
    10. const config = { mqHost, mqPort, mqUsername, mqPassword, mqProtocol, exchangeName, queueName, routingKey };
    11. module.exports = config;

    生产者端:

    1. const amqp = require('amqplib');
    2. const { getInitParams } = require('../../lib')
    3. const params = getInitParams();
    4. const { mqHost, mqPort, mqUsername, mqPassword, mqProtocol, exchangeName, queueName, routingKey } = require("./config");
    5. async function product(msg) {
    6. const connection = await amqp.connect({protocol: mqProtocol, hostname: mqHost, port: mqPort, username: mqUsername, password: mqPassword});
    7. connection.on('error', console.error)
    8. const channel = await connection.createConfirmChannel();
    9. channel.on('error', console.error)
    10. //关联交换机,交换机设置类型,并将消息持久化 { durable: true } 参数为将消息持久化
    11. await channel.assertExchange(exchangeName, 'direct', { durable: true });
    12. //设置公平调度 将prefetch count项的值配置为1,这将会指示 RabbitMQ 在同一时间不要发送超过一条消息给每个消费者。换句话说,直到消息被处理和应答之前都不会发送给该消费者任何消息。取而代之的是,它将会发送消息至下一个比较闲的消费者或工作进程。
    13. await channel.prefetch(1, false); //true 为 channel 上做限制,false 为消费端上做限制,默认为 false
    14. await channel.publish(exchangeName, routingKey, Buffer.from(msg));
    15. // console.log("生产者消息发送完毕");
    16. channel.waitForConfirms().then(async results => { //等待confirm完毕
    17. // console.log(results);
    18. const errors = results.filter(Boolean); //去掉假值
    19. if (errors.length) console.error('Errors', errors);
    20. else console.log(new Date().toISOString(), `Broker confirmed ${results.length} messages`);
    21. }).catch(e => {
    22. console.error(e, "error");
    23. }).finally( async()=>{
    24. //关闭通道
    25. channel.close();
    26. //关闭连接
    27. connection.close();
    28. } )
    29. }
    30. product(JSON.stringify(params));

    消费者端

    1. const amqp = require('amqplib');
    2. const { getDoIsolation } = require("../../lib");
    3. const { mqHost, mqPort, mqUsername, mqPassword, mqProtocol, exchangeName, queueName, routingKey } = require("./config");
    4. async function consumer() {
    5. const connection = await amqp.connect({protocol: mqProtocol, hostname: mqHost, port: mqPort, username: mqUsername, password: mqPassword});
    6. connection.on('error', console.error)
    7. const channel = await connection.createConfirmChannel();
    8. channel.on('error', console.error)
    9. //关联交换机,设置交换机类型,并将消息持久化 { durable: true } 参数为将消息持久化
    10. await channel.assertExchange(exchangeName, 'direct', { durable: true });
    11. //关联消息队列 autoDelete:true 设置队列为空时自动删除 队列持久化
    12. await channel.assertQueue(queueName,{autoDelete:false, durable: true});
    13. //绑定关系(队列,交换机,路由键)
    14. await channel.bindQueue(queueName, exchangeName, routingKey);
    15. //设置公平调度 将prefetch count项的值配置为1,这将会指示 RabbitMQ 在同一时间不要发送超过一条消息给每个消费者。换句话说,直到消息被处理和应答之前都不会发送给该消费者任何消息。取而代之的是,它将会发送消息至下一个比较闲的消费者或工作进程。
    16. await channel.prefetch(1, false);//true 为 channel 上做限制,false 为消费端上做限制,默认为 false
    17. //消费队列消息
    18. await channel.consume(queueName, async (msg) => {
    19. try{
    20. const params = JSON.parse(msg.content.toString());
    21. const doIsolation = getDoIsolation(params);
    22. console.log(params, doIsolation);
    23. await doIsolation();
    24. channel.ack(msg);
    25. }catch(e){
    26. console.error(e);
    27. }
    28. }, { noAck: false });//手动ack应答
    29. console.log("消费端启动成功")
    30. }
    31. for(i=0; i<=2; i++){
    32. consumer();
    33. }

    消息持久化 避免丢失

    1.避免broker消息中间件维护的消息丢失

    交换机的持久化其实就是相当于将交换机的属性在服务器内部保存。当MQ的服务器发生意外或关闭之后,重启RabbitMQ时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。 在声明交换器的时候,将 durable 属性设置为 true即可。

    队列的持久化也是在声明队列的时候,将durable参数设置为true。如果队列不设置持久化,那么 RabbitMQ服务重启之后,队列就会被删除,既然队列都不存在了,队列中的消息也会丢失。

    消息持久化要确保消息不会丢失,需要将消息设置为持久化。信息持久化则是将信息存在磁盘中。生产者在发布消息时,可以设置options的 deliveryMode 属性为 2,标记消息为持久化消息

    可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理,以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做权衡。

    一般的系统也用不到对消息进行持久化。不过交换机和队列的持久化还是要支持的。

    2.避免producer生产者 -> MQ过程中消息丢失

    生产者发送消息由于网络等原因并没有发送到RabbitMq

    解决方案:

    2-1、开启RabbitMq事务机制

    生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消 息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit,类似我们数据库数据库事务机制。

    2-2、开启 confirm 模式(推荐)

    在生产者端设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 ID,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息已经收到。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且可以结合这个机制在自己业务里维护每个消息 ID 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以业务主动重发。

    事务机制和 confirm 机制优劣:

    事务机制是同步的,提交一个事务之后会阻塞,吞吐量会下来,耗性能。

    confirm 机制是异步的,流程不会阻塞,吞吐量较高,性能较好。

    3.避免consumer消费者未处理完的消息丢失

    原因:消费者自动ack配置情况下(no_ack=true),消息发送后立即被认为已经传送成功,业务代码异常或者其他故障消息并没有处理完成也会自动ack。RabbitMq消息ack后就会丢弃,这就导致异常情况下的消息丢失了。

    解决方案:

    关闭RabbitMq自动ack(no_ack=false),业务代码成功消费了消息手动调用Mq ack,让Mq丢弃消息;如果业务代码异常则直接nack,让Mq重新推送消息进行处理。当然,在要求比较高的情况下也可以异常数据进入死信队列,保证数据的完整性。

  • 相关阅读:
    集合划分-递归分治
    排序 “叁” 之交换排序
    sql注入(5), sqlmap工具
    idea设置了maven会自动变回C盘那个
    【前端】css如何实现箭头>?
    高精度地图定位在高速公路自动驾驶系统中的应用
    node + sqlite + Sequelize (ORM:Object-Relational Mapping对象关系映射)
    【C语言实现简单的数据结构】栈和队列
    爬虫基本库的使用(urllib库的详细解析)
    【AIGC核心技术剖析】扩大富有表现力的人体姿势和形状估计SMPLer-X模型
  • 原文地址:https://blog.csdn.net/qq_42152032/article/details/134270914