• nodejs 操作rabbitMQ rascal库(针对amqplib的封装)


    Rascal 是一个围绕amqplib 的丰富的 pub/sub 包装器amqplib 最好的事情之一是它不会对您如何使用它做出假设。另一个是它不尝试抽象AMQP Concepts。因此,该库提供了大量的控制和灵活性,但您有责任采用适当的模式和配置。您需要注意的是:

    • 默认情况下,消息不是持久的,如果您的代理重新启动,消息将会丢失
    • 导致应用程序崩溃的消息将被无限重试
    • 如果没有预取,突然的大量消息可能会破坏您的事件循环
    • 断开的连接和中断的通道不会自动恢复
    • 任何连接或通道错误都会作为“错误”事件发出。除非您处理它们或使用域,否则它们将导致您的应用程序崩溃
    • 如果使用确认通道发布消息,而代理未能确认,则执行流程可能会无限期阻塞

    Rascal 试图通过将以下内容添加到amqplib来解决这些问题,使它们更容易处理或引起您的注意

    • 配置驱动的虚拟主机、交换器、队列、绑定、生产者和消费者
    • 集群连接支持
    • 透明内容解析
    • 透明加密/解密
    • 自动重新连接和重新订阅
    • 高级错误处理,包括延迟、有限重试
    • 远程过程调用支持
    • 再次投递保护
    • 通道池
    • 流量控制
    • 发布超时
    • 安全默认值
    • Promise 和回调支持
    • 时分双工支持

    注意:

    一、当连接或通道遇到问题时,amqplib会抛出错误事件。Rascal 将监听这些事件,并且如果您使用默认配置,则会尝试自动恢复(重新连接等),但是这些事件可能表明代码中存在错误,因此引起您的注意也很重要。Rascal 通过重新发出错误事件来做到这一点,这意味着如果您不处理它们,它们将冒泡到未捕获的错误处理程序并使您的应用程序崩溃。您应该在四个地方执行此操作:

    1.获取broker实例后立即 broker.on('error', console.error);

    2.订阅消息后 await broker.subscribe('s1').on('error', console.error)

    3.发布消息后 await broker.publish('p1', 'some text').on('error', console.error)

    4.转发消息后 await broker.forward('p1', message).on('error', console.error)

    二、避免潜在的消息丢失

    在三种情况下,Rascal 会在不重新排队的情况下确认消息,从而导致潜在的数据丢失。

    1.当无法解析消息内容并且订阅者没有“invalid_content”侦听器时

    2.当订阅者的(可选)重新传递限制已被超出并且订阅者既没有“redelivery_error”也没有“redelivery_exceeded”侦听器时

    3.当尝试通过重新发布、转发进行恢复时,但恢复操作失败。

    Rascal 拒绝消息的原因是因为替代方案是无限期地不确认消息,或者在无限紧密的循环中回滚并重试消息。这可能会对您的应用程序进行 DDOS,并导致您的基础设施出现问题。如果您已正确配置死信队列或侦听“invalid_content”和“redelivery_exceeded”订户事件,您的消息应该是安全的。

    config.js

    1. const { MQ_HOST, HOST, MQ_PORT } = process.env;
    2. const mqHost = MQ_HOST || HOST || "127.0.0.1";
    3. const mqPort = MQ_PORT || 5672;
    4. const mqUsername = "root";
    5. const mqPassword = "paasword";
    6. const exchangeName = 'exchange_direct_saas'; //交换机
    7. const queueName = 'queue_direct_saas';
    8. const routingKey = 'saasIsolution';//路由key
    9. const config = {
    10. "vhosts": {
    11. "/": {
    12. "publicationChannelPools": { //使用池通道来发布消息.为每个虚拟主机创建两个池 一个用于确认通道,另一个用于常规通道。但在第一次使用之前不会创建两个池(默认autostart: false)空闲通道会自动从池中驱逐
    13. "regularPool": {
    14. "max": 10,
    15. "min": 5,
    16. "evictionRunIntervalMillis": 10000,
    17. "idleTimeoutMillis": 60000,
    18. "autostart": true
    19. },
    20. "confirmPool": {
    21. "max": 10,
    22. "min": 5,
    23. "evictionRunIntervalMillis": 10000,
    24. "idleTimeoutMillis": 60000,
    25. "autostart": true
    26. }
    27. },
    28. "connectionStrategy": "random",
    29. "connection": {
    30. "slashes": true,
    31. "protocol": "amqp",
    32. "hostname": mqHost,
    33. "user": mqUsername,
    34. "password": mqPassword,
    35. "port": mqPort,
    36. "vhost": "/",
    37. "options": {
    38. "heartbeat": 10,//心跳时间。 如果你的任务执行时间比较长,调大此配置。 rabbit-server的heartbeat 默认为60
    39. "connection_timeout": 10000,
    40. "channelMax": 100
    41. },
    42. "socketOptions": {
    43. "timeout": 10000
    44. },
    45. "management": {
    46. "options": {
    47. "timeout": 1000
    48. }
    49. },
    50. "retry": {
    51. "min": 1000,
    52. "max": 60000,
    53. "factor": 2,
    54. "strategy": "exponential" //exponential:指数配置将导致 rascal 以指数增加的间隔(最多一分钟)重试连接。间隔会随机调整,这样如果您有多个服务,它们就不会同时重新连接。 linear: 线性配置将导致 rascal 以线性增加的间隔(一到五秒之间)重试连接
    55. }
    56. },
    57. "exchanges": {//定义exchange
    58. [exchangeName]: {
    59. "type": "direct",
    60. "options": {
    61. "durable": true
    62. }
    63. }
    64. },
    65. "queues": { //定义queue
    66. [queueName]: {
    67. "options": {
    68. "autoDelete": false,
    69. "durable": true
    70. }
    71. }
    72. },
    73. "bindings": {//定义binding
    74. "b1": {
    75. "source": exchangeName,
    76. "destination": queueName,
    77. "destinationType": "queue",
    78. "bindingKey": routingKey
    79. }
    80. }
    81. }
    82. },
    83. "subscriptions": {//订阅消息
    84. "s1": {
    85. "queue": queueName,
    86. "vhost": "/",
    87. "prefetch": 1,
    88. "retry": {
    89. "delay": 1000
    90. }
    91. }
    92. },
    93. "publications": {//发布消息
    94. "p1": {
    95. "vhost": "/",
    96. "exchange": exchangeName,
    97. "routingKey": routingKey,
    98. "confirm": true,
    99. "options": {
    100. "persistent": true
    101. }
    102. }
    103. }
    104. }
    105. module.exports = config;

    生产者端

    1. const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
    2. const definitions = require('./config.js');
    3. const { getInitParams } = require('../lib')
    4. const params = getInitParams();
    5. async function product(msg) {
    6. let broker;
    7. try {
    8. broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)
    9. broker.on('error', console.error);
    10. // Publish a message
    11. const publication = await broker.publish('p1', msg);
    12. console.log("生产者消息发送完毕");
    13. publication.on('error', console.error);
    14. } catch (err) {
    15. console.error(err);
    16. }finally{
    17. await broker?.shutdown();
    18. }
    19. }
    20. product(JSON.stringify(params));

    消费者端

    1. const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
    2. const definitions = require('./config.js');
    3. const { getDoIsolation, getDoClear } = require("../lib");
    4. async function consumer(i) {
    5. try {
    6. const broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)
    7. broker.on('error', error => { console.error(error, "broker Error"); });
    8. // Consume a message
    9. const subscription = await broker.subscribe('s1'); //subscription 不存在会抛出异常
    10. subscription
    11. .on('message', async(message, content, ackOrNack) => {
    12. const params = JSON.parse(content);
    13. const doIsolation = getDoIsolation(params);
    14. console.log(`消费者${i}`, params, doIsolation);
    15. await doIsolation();
    16. ackOrNack();
    17. })
    18. .on('error', error => { console.error("subscribe Error",error); })
    19. .on('invalid_content', (err, message, ackOrNack) => { //若无法解析内容(例如,消息的内容类型为“application/json”,但内容不是json),它将发出“invalid_content”事件
    20. console.error('Invalid content', err);
    21. ackOrNack(err);//默认nack 策略
    22. })
    23. .on('redeliveries_exceeded', (err, message, ackOrNack) => { //如果重新传递的数量超过订阅者限制,订阅者将发出“redelivery_exceeded”事件,并且可以由您的应用程序处理
    24. console.error('Redeliveries exceeded', err);
    25. ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]); //将消息重新发布回其来自的队列。 当指定尝试次数时,始终应该链接一个后备策略,否则如果超出尝试次数,您的消息将不会被确认或拒绝
    26. });
    27. } catch (err) {
    28. console.error("其他Error",err);
    29. }
    30. console.log(`消费端${i}启动成功`)
    31. }
    32. for(i=0; i<=2; i++){
    33. consumer(i)
    34. }

  • 相关阅读:
    十大java应用服务器(web server)总结
    SpringMVC(JSR303和拦截器)
    【Kafka从成神到升仙系列 四】你真的了解 Kafka 的缓存池机制嘛
    接口开发知识点整理三
    头脑风暴之约瑟夫环问题
    深度学习(PyTorch)——循环神经网络(RNN)基础篇四
    Web大学生网页作业成品——环保垃圾分类网站设计与实现(HTML+CSS+JavaScript) web前端开发技术 web课程设计 网页规划与设计
    前端开发面试-css篇
    队列的实现---超详细
    【电视剧-长相思】经典语录
  • 原文地址:https://blog.csdn.net/qq_42152032/article/details/134271908