Rascal 是一个围绕amqplib 的丰富的 pub/sub 包装器。amqplib 最好的事情之一是它不会对您如何使用它做出假设。另一个是它不尝试抽象AMQP Concepts。因此,该库提供了大量的控制和灵活性,但您有责任采用适当的模式和配置。您需要注意的是:
Rascal 试图通过将以下内容添加到amqplib来解决这些问题,使它们更容易处理或引起您的注意
一、当连接或通道遇到问题时,amqplib会抛出错误事件。Rascal 将监听这些事件,并且如果您使用默认配置,则会尝试自动恢复(重新连接等),但是这些事件可能表明代码中存在错误,因此引起您的注意也很重要。Rascal 通过重新发出错误事件来做到这一点,这意味着如果您不处理它们,它们将冒泡到未捕获的错误处理程序并使您的应用程序崩溃。您应该在四个地方执行此操作:
1.获取broker实例后立即 broker.on('error', console.error);
2.订阅消息后 await broker.subscribe('s1').on('error', console.error)
3.发布消息后 await broker.publish('p1', 'some text').on('error', console.error)
4.转发消息后 await broker.forward('p1', message).on('error', console.error)
二、避免潜在的消息丢失
在三种情况下,Rascal 会在不重新排队的情况下确认消息,从而导致潜在的数据丢失。
1.当无法解析消息内容并且订阅者没有“invalid_content”侦听器时
2.当订阅者的(可选)重新传递限制已被超出并且订阅者既没有“redelivery_error”也没有“redelivery_exceeded”侦听器时
3.当尝试通过重新发布、转发进行恢复时,但恢复操作失败。
Rascal 拒绝消息的原因是因为替代方案是无限期地不确认消息,或者在无限紧密的循环中回滚并重试消息。这可能会对您的应用程序进行 DDOS,并导致您的基础设施出现问题。如果您已正确配置死信队列或侦听“invalid_content”和“redelivery_exceeded”订户事件,您的消息应该是安全的。
- const { MQ_HOST, HOST, MQ_PORT } = process.env;
- const mqHost = MQ_HOST || HOST || "127.0.0.1";
- const mqPort = MQ_PORT || 5672;
- const mqUsername = "root";
- const mqPassword = "paasword";
-
- const exchangeName = 'exchange_direct_saas'; //交换机
- const queueName = 'queue_direct_saas';
- const routingKey = 'saasIsolution';//路由key
-
- const config = {
- "vhosts": {
- "/": {
- "publicationChannelPools": { //使用池通道来发布消息.为每个虚拟主机创建两个池 一个用于确认通道,另一个用于常规通道。但在第一次使用之前不会创建两个池(默认autostart: false)空闲通道会自动从池中驱逐
- "regularPool": {
- "max": 10,
- "min": 5,
- "evictionRunIntervalMillis": 10000,
- "idleTimeoutMillis": 60000,
- "autostart": true
- },
- "confirmPool": {
- "max": 10,
- "min": 5,
- "evictionRunIntervalMillis": 10000,
- "idleTimeoutMillis": 60000,
- "autostart": true
- }
- },
- "connectionStrategy": "random",
- "connection": {
- "slashes": true,
- "protocol": "amqp",
- "hostname": mqHost,
- "user": mqUsername,
- "password": mqPassword,
- "port": mqPort,
- "vhost": "/",
- "options": {
- "heartbeat": 10,//心跳时间。 如果你的任务执行时间比较长,调大此配置。 rabbit-server的heartbeat 默认为60
- "connection_timeout": 10000,
- "channelMax": 100
- },
- "socketOptions": {
- "timeout": 10000
- },
- "management": {
- "options": {
- "timeout": 1000
- }
- },
- "retry": {
- "min": 1000,
- "max": 60000,
- "factor": 2,
- "strategy": "exponential" //exponential:指数配置将导致 rascal 以指数增加的间隔(最多一分钟)重试连接。间隔会随机调整,这样如果您有多个服务,它们就不会同时重新连接。 linear: 线性配置将导致 rascal 以线性增加的间隔(一到五秒之间)重试连接
- }
- },
- "exchanges": {//定义exchange
- [exchangeName]: {
- "type": "direct",
- "options": {
- "durable": true
- }
- }
- },
- "queues": { //定义queue
- [queueName]: {
- "options": {
- "autoDelete": false,
- "durable": true
- }
- }
- },
- "bindings": {//定义binding
- "b1": {
- "source": exchangeName,
- "destination": queueName,
- "destinationType": "queue",
- "bindingKey": routingKey
- }
- }
- }
- },
- "subscriptions": {//订阅消息
- "s1": {
- "queue": queueName,
- "vhost": "/",
- "prefetch": 1,
- "retry": {
- "delay": 1000
- }
- }
- },
- "publications": {//发布消息
- "p1": {
- "vhost": "/",
- "exchange": exchangeName,
- "routingKey": routingKey,
- "confirm": true,
- "options": {
- "persistent": true
- }
- }
- }
- }
- module.exports = config;
- const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
- const definitions = require('./config.js');
- const { getInitParams } = require('../lib')
- const params = getInitParams();
-
- async function product(msg) {
- let broker;
- try {
- broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)
- broker.on('error', console.error);
-
- // Publish a message
- const publication = await broker.publish('p1', msg);
- console.log("生产者消息发送完毕");
- publication.on('error', console.error);
- } catch (err) {
- console.error(err);
- }finally{
- await broker?.shutdown();
- }
- }
-
- product(JSON.stringify(params));
-
- const { BrokerAsPromised: Broker, withDefaultConfig } = require('rascal');
- const definitions = require('./config.js');
- const { getDoIsolation, getDoClear } = require("../lib");
-
- async function consumer(i) {
- try {
- const broker = await Broker.create(withDefaultConfig(definitions));//withDefaultConfig附带了适用于生产和测试环境的合理默认值(针对可靠性而不是速度进行了优化)
- broker.on('error', error => { console.error(error, "broker Error"); });
-
- // Consume a message
- const subscription = await broker.subscribe('s1'); //subscription 不存在会抛出异常
- subscription
- .on('message', async(message, content, ackOrNack) => {
- const params = JSON.parse(content);
- const doIsolation = getDoIsolation(params);
- console.log(`消费者${i}`, params, doIsolation);
- await doIsolation();
- ackOrNack();
- })
- .on('error', error => { console.error("subscribe Error",error); })
- .on('invalid_content', (err, message, ackOrNack) => { //若无法解析内容(例如,消息的内容类型为“application/json”,但内容不是json),它将发出“invalid_content”事件
- console.error('Invalid content', err);
- ackOrNack(err);//默认nack 策略
- })
- .on('redeliveries_exceeded', (err, message, ackOrNack) => { //如果重新传递的数量超过订阅者限制,订阅者将发出“redelivery_exceeded”事件,并且可以由您的应用程序处理
- console.error('Redeliveries exceeded', err);
- ackOrNack(err, [{ strategy: 'republish', defer: 1000, attempts: 10 }, { strategy: 'nack' }]); //将消息重新发布回其来自的队列。 当指定尝试次数时,始终应该链接一个后备策略,否则如果超出尝试次数,您的消息将不会被确认或拒绝
- });
- } catch (err) {
- console.error("其他Error",err);
- }
- console.log(`消费端${i}启动成功`)
- }
-
-
-
- for(i=0; i<=2; i++){
- consumer(i)
- }