• nestjs使用rabbitMQ


    基础知识

    rabbitMQ的exchange、bindingkey、routingkey的关系:

    Java面试常见问题:RabbitMQ基本概念与工作原理 - 知乎

    nestjs的mq使用引导(最后没按照官网的那种方式使用,而是直接编程的方式,这里看下了解下):

    RabbitMQ | NestJS 中文文档 | NestJS 中文网

    其中,使用的mqlib的API(非常重要,查看了官方的API才能更精准的使用!):

    amqplib | Channel API reference

    软件

    nestjs 8.*.*

    使用rabbitMQ需要增加依赖(为什么?看官网:):

    "amqp-connection-manager": "^4.1.4",
    "amqplib": "^0.10.2",

    怎么安装? 不想全局就在当前项目的根目录下执行:

    npm install --save amqp-connection-manager amqplib

    然后,开发,上代码

    代码

    mt.tuil.ts

    1. import {Injectable, Logger, OnModuleInit} from '@nestjs/common';
    2. import amqp from 'amqplib';
    3. import * as acm from 'amqp-connection-manager';
    4. import {MqMsgHandler} from './MQMsgHandler';
    5. import {GLoggerService} from '../../framework/logs/glogger';
    6. import ChannelWrapper from "amqp-connection-manager/dist/esm/ChannelWrapper";
    7. const logger = new GLoggerService('RabbitMQ');
    8. // type OnEventHandler = (data: any) => Promise;
    9. // @Injectable()
    10. export class RabbitMQClient {
    11. private connection: acm.AmqpConnectionManager;
    12. private channelWrapper: acm.ChannelWrapper;
    13. private rabbitMqConfig: {};
    14. private url: string;
    15. private exchange: string;// = 'exchange_event_consume';
    16. private bindKey: string;
    17. private virtualHost: string;
    18. private host: string;
    19. private port: number;
    20. private username: string;
    21. private password: string;
    22. private msgHandler: MqMsgHandler | undefined;// 是否添加监听者,用于消费使用
    23. private queue: string;
    24. constructor(rabbitMqConfig, msgHandler?: MqMsgHandler) {
    25. this.rabbitMqConfig = rabbitMqConfig;
    26. this.virtualHost = rabbitMqConfig.virtualHost;
    27. this.exchange = rabbitMqConfig.exchange;
    28. this.bindKey = rabbitMqConfig.bindKey;
    29. this.host = rabbitMqConfig.host;
    30. this.port = rabbitMqConfig.port;
    31. this.username = rabbitMqConfig.username;
    32. this.password = rabbitMqConfig.password;
    33. this.msgHandler = msgHandler;
    34. this.queue = rabbitMqConfig.queue;
    35. this.url = 'amqp://' + rabbitMqConfig.username + ':' + rabbitMqConfig.password + '@'
    36. + rabbitMqConfig.host + ':' + rabbitMqConfig.port + '/%2F' + rabbitMqConfig.virtualHost;
    37. }
    38. public async connect():Promise<void> {
    39. return new Promise<void>((resolve, reject) => {
    40. this.connection = acm.connect(this.url, {
    41. heartbeatIntervalInSeconds: 3,
    42. reconnectTimeInSeconds: 3,
    43. connectionOptions: {
    44. rejectUnauthorized: false
    45. },
    46. });
    47. this.connection.addListener('connect', () => {
    48. logger.debug('[客户端]连接成功');
    49. return resolve();
    50. });
    51. this.connection.addListener('connectFailed', (err) => {
    52. logger.error('[客户端]连接失败!', err.toString());
    53. return resolve();
    54. });
    55. this.connection.addListener('disconnect', (err) => {
    56. logger.log('[客户端]断开连接%O');
    57. return reject();
    58. });
    59. });
    60. }
    61. /**
    62. * 初始化通道
    63. */
    64. async initChannel():Promise<ChannelWrapper> {
    65. const bindKey = this.bindKey;
    66. const exchange = this.exchange;
    67. let channelWrapper;
    68. const queueName = this.queue;
    69. if (this.msgHandler) {
    70. const msgHandler = this.msgHandler;
    71. logger.log('初始化通道[exchange=' + exchange + ', bindKey=' + bindKey + ', queue=' + queueName + '],并增加通道监听器...');
    72. channelWrapper = this.connection.createChannel({
    73. json: false,
    74. setup: async (channel: amqp.Channel): Promise<any> => {
    75. // 初始化事件Exchange
    76. await channel.assertExchange(exchange, 'direct', {durable: true});
    77. // 初始化事件队列
    78. await channel.assertQueue(queueName, {
    79. durable: false,
    80. autoDelete: true
    81. });
    82. // 绑定事件队列
    83. await channel.bindQueue(queueName, exchange, bindKey);
    84. // 设置预消费
    85. await channel.prefetch(1);
    86. // 消费事件
    87. // tslint:disable-next-line:only-arrow-functions
    88. channel.consume(queueName, async function (msg: amqp.ConsumeMessage | null): Promise<void> {
    89. let data: any;
    90. try {
    91. if (msg !== undefined && msg !== null && msg.content !== undefined && msg.content !== null) {
    92. data = msg.content.toString('utf8') as any;
    93. data = JSON.parse(data);
    94. if (msgHandler !== undefined && msgHandler !== null) {
    95. await msgHandler(data);
    96. } else {
    97. logger.debug(data)
    98. }
    99. channel.ack(msg);// 应答
    100. } else {
    101. logger.debug('事件数据异常 ' + data);
    102. return Promise.reject(`事件数据异常`);
    103. }
    104. } catch (error) {
    105. logger.debug('[consume]' + error);
    106. channel.ack(msg);
    107. }
    108. }, {noAck: false});// 需要应答
    109. }
    110. });
    111. logger.log('完成!');
    112. } else {
    113. logger.log('初始化通道[exchange=' + exchange + ', bindKey=' + bindKey + ', queue=' + queueName + ']');
    114. channelWrapper = this.connection.createChannel({
    115. json: false
    116. });
    117. logger.log('完成!');
    118. }
    119. channelWrapper.on('connect', () => {
    120. logger.log('[通道]连接成功');
    121. });
    122. channelWrapper.on('close', () => {
    123. logger.log('[通道]连接关闭');
    124. });
    125. channelWrapper.on('error', (err) => {
    126. logger.error('[通道]连接错误: ', err.toString());
    127. });
    128. this.channelWrapper = channelWrapper;
    129. return channelWrapper;
    130. }
    131. // tslint:disable-next-line:no-unused-expression
    132. async publish(data: any | {}): Promise<boolean> {
    133. if (!this.channelWrapper) {
    134. logger.error('通道没有初始化!', '');
    135. return Promise.reject('通道没有初始化!');
    136. }
    137. const BIND_KEY = this.bindKey;
    138. const exchange = this.exchange;
    139. return this.channelWrapper
    140. .publish(exchange, BIND_KEY, Buffer.from(JSON.stringify(data)))
    141. // .sendToQueue(queue, Buffer.from(JSON.stringify(data)))
    142. .then(() => {
    143. logger.log('publish Message was sent!')
    144. return true;
    145. })
    146. .catch((err) => {
    147. logger.error('publish Message was rejected... Boo!', err.toString());
    148. return false;
    149. });
    150. }
    151. // tslint:disable-next-line:no-unused-expression
    152. public async start() {
    153. await this.connect();
    154. await this.initChannel();
    155. }
    156. }

    说明:

    1. 我封装的这个工具用来创建一个客户端出来,通过publish来发布数据,通过创建之初执行初始化方法start()来初始化连接(没交给nest的inject是因为我需要具备自己的特殊控制)。

    2. 创建时构造函数传入参数,第一个参数是初始化参数,结构见rabbitconfig.ts.第二个参数是订阅的回调函数,用于处理收听到的消息,如果不传就不会创建监听。这个回调函数定义了一个类型,见mqMsgHandler.ts.

    mqmsghandler.ts

    export type MqMsgHandler = (data: any) => Promise<boolean>;

    说明:消息的回调函数

    rabbitconfig.ts

    1. export class RabbitConfig{
    2. private _host:string;
    3. private _port:number;
    4. private _queue:string;
    5. private _bindKey:string;
    6. private _exchange:string;
    7. private _username:string;
    8. private _password:string;
    9. private _virtualHost:string;
    10. constructor(host: string, port: number, queue: string, bindKey: string, exchange: string, username: string, password: string, virtualHost: string) {
    11. this._host = host;
    12. this._port = port;
    13. this._queue = queue;
    14. this._bindKey = bindKey;
    15. this._exchange = exchange;
    16. this._username = username;
    17. this._password = password;
    18. this._virtualHost = virtualHost;
    19. }
    20. get host(): string {
    21. return this._host;
    22. }
    23. set host(value: string) {
    24. this._host = value;
    25. }
    26. get port(): number {
    27. return this._port;
    28. }
    29. set port(value: number) {
    30. this._port = value;
    31. }
    32. get queue(): string {
    33. return this._queue;
    34. }
    35. set queue(value: string) {
    36. this._queue = value;
    37. }
    38. get bindKey(): string {
    39. return this._bindKey;
    40. }
    41. set bindKey(value: string) {
    42. this._bindKey = value;
    43. }
    44. get exchange(): string {
    45. return this._exchange;
    46. }
    47. set exchange(value: string) {
    48. this._exchange = value;
    49. }
    50. get username(): string {
    51. return this._username;
    52. }
    53. set username(value: string) {
    54. this._username = value;
    55. }
    56. get password(): string {
    57. return this._password;
    58. }
    59. set password(value: string) {
    60. this._password = value;
    61. }
    62. get virtualHost(): string {
    63. return this._virtualHost;
    64. }
    65. set virtualHost(value: string) {
    66. this._virtualHost = value;
    67. }
    68. }

    net_disk.service.ts

    注意:代码经过一些删除处理,可能直接粘贴无法运行,但是核心逻辑都在里面。

    1. import {GLoggerService} from '../framework/logs/glogger';
    2. import {RabbitMQClient} from '../util/mq/mq.util';
    3. import {Injectable} from '@nestjs/common';
    4. import * as config from 'config';
    5. import {RabbitConfig} from '../util/mq/RabbitConfig';
    6. import {MqMsgHandler} from '../util/mq/MQMsgHandler';
    7. import {data} from 'cheerio/lib/api/attributes';
    8. const rabbitMQ = config['rabbitMQ'];
    9. @Injectable()
    10. export class NetDiskService {
    11. private readonly glogger = new GLoggerService();
    12. private mQClientSubscriber: RabbitMQClient;//订阅者
    13. private mQClientSender: RabbitMQClient;//发布者
    14. /**
    15. * 由于定于与发布不是同一个queue,所以需要建立两个client
    16. */
    17. constructor() {
    18. const subscribe = new RabbitConfig(
    19. rabbitMQ.host,
    20. rabbitMQ.port,
    21. rabbitMQ.subscribe.queue,
    22. rabbitMQ.subscribe.bindKey,
    23. rabbitMQ.exchange,
    24. rabbitMQ.username,
    25. rabbitMQ.password,
    26. rabbitMQ.virtualHost
    27. );
    28. this.mQClientSubscriber = new RabbitMQClient(subscribe,
    29. // tslint:disable-next-line:no-empty
    30. async (msg: any) => {
    31. console.log('收到0发送的消息:' + JSON.stringify(msg));
    32. return await this.onNetDiskUploadGeomFile(msg);
    33. }
    34. );
    35. const send = new RabbitConfig(
    36. rabbitMQ.host,
    37. rabbitMQ.port,
    38. rabbitMQ.send.queue,
    39. rabbitMQ.send.bindKey,
    40. rabbitMQ.exchange,
    41. rabbitMQ.username,
    42. rabbitMQ.password,
    43. rabbitMQ.virtualHost
    44. );
    45. this.mQClientSender = new RabbitMQClient(send,async (msg: any) => {
    46. console.log('收到1发送的消息:' + JSON.stringify(msg));
    47. return Promise.resolve(true);
    48. });
    49. this.mQClientSubscriber.start();
    50. this.mQClientSender.start();
    51. }
    52. public async publishMTo1(metaData: any): Promise<number> {
    53. try {
    54. console.log("发送消息给1")
    55. let result = await this.mQClientSender.publish(metaData);
    56. if (!result) {
    57. return 1
    58. }
    59. return 0
    60. } catch (e) {
    61. this.glogger.error('失败!', e);
    62. }
    63. return 1
    64. }
    65. public async publishTo0(metaData: any): Promise<number> {
    66. try {
    67. console.log("发送消息给0")
    68. let result = await this.mQClientSubscriber.publish(metaData);
    69. if (!result) {
    70. return 1
    71. }
    72. return 0
    73. } catch (e) {
    74. this.glogger.error('失败!', e);
    75. }
    76. return 1
    77. }
    78. }

    完成

  • 相关阅读:
    java-php-python-springboot网上订餐系统计算机毕业设计
    【LeetCode】611.有效三角形的个数
    CentOS Stream 9 配置静态 IP
    [附源码]Python计算机毕业设计java高校社团管理系统
    学校介绍静态HTML网页设计作品 DIV布局学校官网模板代码 DW大学网站制作成品下载 HTML5期末大作业
    电脑小白也能修电脑?这几个维修技巧快记住
    敏捷12原则
    Django 用户认证
    EthernetIP 转MODBUS RTU协议网关连接FANUC机器人作为EthernetIP通信从站
    HIVE伪分布安装
  • 原文地址:https://blog.csdn.net/jfqqqqq/article/details/126281403