rabbitMQ的exchange、bindingkey、routingkey的关系:
Java面试常见问题:RabbitMQ基本概念与工作原理 - 知乎
nestjs的mq使用引导(最后没按照官网的那种方式使用,而是直接编程的方式,这里看下了解下):
RabbitMQ | NestJS 中文文档 | NestJS 中文网
其中,使用的mqlib的API(非常重要,查看了官方的API才能更精准的使用!):
amqplib | Channel API reference
nestjs 8.*.*
使用rabbitMQ需要增加依赖(为什么?看官网:
):
"amqp-connection-manager": "^4.1.4", "amqplib": "^0.10.2",
怎么安装? 不想全局就在当前项目的根目录下执行:
npm install --save amqp-connection-manager amqplib
然后,开发,上代码
- import {Injectable, Logger, OnModuleInit} from '@nestjs/common';
- import amqp from 'amqplib';
- import * as acm from 'amqp-connection-manager';
- import {MqMsgHandler} from './MQMsgHandler';
- import {GLoggerService} from '../../framework/logs/glogger';
- import ChannelWrapper from "amqp-connection-manager/dist/esm/ChannelWrapper";
-
- const logger = new GLoggerService('RabbitMQ');
-
- // type OnEventHandler = (data: any) => Promise
; -
- // @Injectable()
- export class RabbitMQClient {
- private connection: acm.AmqpConnectionManager;
- private channelWrapper: acm.ChannelWrapper;
- private rabbitMqConfig: {};
- private url: string;
- private exchange: string;// = 'exchange_event_consume';
- private bindKey: string;
- private virtualHost: string;
- private host: string;
- private port: number;
- private username: string;
- private password: string;
- private msgHandler: MqMsgHandler | undefined;// 是否添加监听者,用于消费使用
- private queue: string;
-
- constructor(rabbitMqConfig, msgHandler?: MqMsgHandler) {
- this.rabbitMqConfig = rabbitMqConfig;
- this.virtualHost = rabbitMqConfig.virtualHost;
- this.exchange = rabbitMqConfig.exchange;
- this.bindKey = rabbitMqConfig.bindKey;
- this.host = rabbitMqConfig.host;
- this.port = rabbitMqConfig.port;
- this.username = rabbitMqConfig.username;
- this.password = rabbitMqConfig.password;
- this.msgHandler = msgHandler;
- this.queue = rabbitMqConfig.queue;
- this.url = 'amqp://' + rabbitMqConfig.username + ':' + rabbitMqConfig.password + '@'
- + rabbitMqConfig.host + ':' + rabbitMqConfig.port + '/%2F' + rabbitMqConfig.virtualHost;
- }
-
- public async connect():Promise<void> {
- return new Promise<void>((resolve, reject) => {
- this.connection = acm.connect(this.url, {
- heartbeatIntervalInSeconds: 3,
- reconnectTimeInSeconds: 3,
- connectionOptions: {
- rejectUnauthorized: false
- },
-
- });
-
- this.connection.addListener('connect', () => {
- logger.debug('[客户端]连接成功');
- return resolve();
- });
- this.connection.addListener('connectFailed', (err) => {
- logger.error('[客户端]连接失败!', err.toString());
- return resolve();
- });
- this.connection.addListener('disconnect', (err) => {
- logger.log('[客户端]断开连接%O');
- return reject();
- });
- });
- }
-
- /**
- * 初始化通道
- */
- async initChannel():Promise<ChannelWrapper> {
- const bindKey = this.bindKey;
- const exchange = this.exchange;
- let channelWrapper;
- const queueName = this.queue;
- if (this.msgHandler) {
- const msgHandler = this.msgHandler;
- logger.log('初始化通道[exchange=' + exchange + ', bindKey=' + bindKey + ', queue=' + queueName + '],并增加通道监听器...');
- channelWrapper = this.connection.createChannel({
- json: false,
- setup: async (channel: amqp.Channel): Promise<any> => {
- // 初始化事件Exchange
- await channel.assertExchange(exchange, 'direct', {durable: true});
- // 初始化事件队列
- await channel.assertQueue(queueName, {
- durable: false,
- autoDelete: true
- });
- // 绑定事件队列
- await channel.bindQueue(queueName, exchange, bindKey);
-
- // 设置预消费
- await channel.prefetch(1);
- // 消费事件
- // tslint:disable-next-line:only-arrow-functions
- channel.consume(queueName, async function (msg: amqp.ConsumeMessage | null): Promise<void> {
- let data: any;
- try {
- if (msg !== undefined && msg !== null && msg.content !== undefined && msg.content !== null) {
- data = msg.content.toString('utf8') as any;
- data = JSON.parse(data);
- if (msgHandler !== undefined && msgHandler !== null) {
- await msgHandler(data);
- } else {
- logger.debug(data)
- }
- channel.ack(msg);// 应答
- } else {
- logger.debug('事件数据异常 ' + data);
- return Promise.reject(`事件数据异常`);
- }
- } catch (error) {
- logger.debug('[consume]' + error);
- channel.ack(msg);
- }
-
-
- }, {noAck: false});// 需要应答
- }
- });
- logger.log('完成!');
- } else {
- logger.log('初始化通道[exchange=' + exchange + ', bindKey=' + bindKey + ', queue=' + queueName + ']');
- channelWrapper = this.connection.createChannel({
- json: false
- });
- logger.log('完成!');
- }
-
- channelWrapper.on('connect', () => {
- logger.log('[通道]连接成功');
- });
- channelWrapper.on('close', () => {
- logger.log('[通道]连接关闭');
- });
- channelWrapper.on('error', (err) => {
- logger.error('[通道]连接错误: ', err.toString());
- });
- this.channelWrapper = channelWrapper;
- return channelWrapper;
- }
-
- // tslint:disable-next-line:no-unused-expression
- async publish(data: any | {}): Promise<boolean> {
- if (!this.channelWrapper) {
- logger.error('通道没有初始化!', '');
- return Promise.reject('通道没有初始化!');
- }
- const BIND_KEY = this.bindKey;
- const exchange = this.exchange;
- return this.channelWrapper
- .publish(exchange, BIND_KEY, Buffer.from(JSON.stringify(data)))
- // .sendToQueue(queue, Buffer.from(JSON.stringify(data)))
- .then(() => {
- logger.log('publish Message was sent!')
- return true;
- })
- .catch((err) => {
- logger.error('publish Message was rejected... Boo!', err.toString());
- return false;
- });
- }
-
-
- // tslint:disable-next-line:no-unused-expression
- public async start() {
- await this.connect();
- await this.initChannel();
- }
- }
说明:
1. 我封装的这个工具用来创建一个客户端出来,通过publish来发布数据,通过创建之初执行初始化方法start()来初始化连接(没交给nest的inject是因为我需要具备自己的特殊控制)。
2. 创建时构造函数传入参数,第一个参数是初始化参数,结构见rabbitconfig.ts.第二个参数是订阅的回调函数,用于处理收听到的消息,如果不传就不会创建监听。这个回调函数定义了一个类型,见mqMsgHandler.ts.
export type MqMsgHandler = (data: any) => Promise<boolean>;
说明:消息的回调函数
- export class RabbitConfig{
- private _host:string;
- private _port:number;
- private _queue:string;
- private _bindKey:string;
- private _exchange:string;
- private _username:string;
- private _password:string;
- private _virtualHost:string;
-
-
- constructor(host: string, port: number, queue: string, bindKey: string, exchange: string, username: string, password: string, virtualHost: string) {
- this._host = host;
- this._port = port;
- this._queue = queue;
- this._bindKey = bindKey;
- this._exchange = exchange;
- this._username = username;
- this._password = password;
- this._virtualHost = virtualHost;
- }
-
- get host(): string {
- return this._host;
- }
-
- set host(value: string) {
- this._host = value;
- }
-
- get port(): number {
- return this._port;
- }
-
- set port(value: number) {
- this._port = value;
- }
-
- get queue(): string {
- return this._queue;
- }
-
- set queue(value: string) {
- this._queue = value;
- }
-
- get bindKey(): string {
- return this._bindKey;
- }
-
- set bindKey(value: string) {
- this._bindKey = value;
- }
-
- get exchange(): string {
- return this._exchange;
- }
-
- set exchange(value: string) {
- this._exchange = value;
- }
-
- get username(): string {
- return this._username;
- }
-
- set username(value: string) {
- this._username = value;
- }
-
- get password(): string {
- return this._password;
- }
-
- set password(value: string) {
- this._password = value;
- }
-
- get virtualHost(): string {
- return this._virtualHost;
- }
-
- set virtualHost(value: string) {
- this._virtualHost = value;
- }
- }
注意:代码经过一些删除处理,可能直接粘贴无法运行,但是核心逻辑都在里面。
- import {GLoggerService} from '../framework/logs/glogger';
- import {RabbitMQClient} from '../util/mq/mq.util';
- import {Injectable} from '@nestjs/common';
- import * as config from 'config';
- import {RabbitConfig} from '../util/mq/RabbitConfig';
- import {MqMsgHandler} from '../util/mq/MQMsgHandler';
- import {data} from 'cheerio/lib/api/attributes';
-
- const rabbitMQ = config['rabbitMQ'];
-
- @Injectable()
- export class NetDiskService {
- private readonly glogger = new GLoggerService();
- private mQClientSubscriber: RabbitMQClient;//订阅者
- private mQClientSender: RabbitMQClient;//发布者
-
- /**
- * 由于定于与发布不是同一个queue,所以需要建立两个client
- */
- constructor() {
- const subscribe = new RabbitConfig(
- rabbitMQ.host,
- rabbitMQ.port,
- rabbitMQ.subscribe.queue,
- rabbitMQ.subscribe.bindKey,
- rabbitMQ.exchange,
- rabbitMQ.username,
- rabbitMQ.password,
- rabbitMQ.virtualHost
- );
- this.mQClientSubscriber = new RabbitMQClient(subscribe,
- // tslint:disable-next-line:no-empty
- async (msg: any) => {
- console.log('收到0发送的消息:' + JSON.stringify(msg));
- return await this.onNetDiskUploadGeomFile(msg);
- }
- );
-
- const send = new RabbitConfig(
- rabbitMQ.host,
- rabbitMQ.port,
- rabbitMQ.send.queue,
- rabbitMQ.send.bindKey,
- rabbitMQ.exchange,
- rabbitMQ.username,
- rabbitMQ.password,
- rabbitMQ.virtualHost
- );
- this.mQClientSender = new RabbitMQClient(send,async (msg: any) => {
- console.log('收到1发送的消息:' + JSON.stringify(msg));
- return Promise.resolve(true);
- });
-
- this.mQClientSubscriber.start();
- this.mQClientSender.start();
- }
-
-
- public async publishMTo1(metaData: any): Promise<number> {
- try {
- console.log("发送消息给1")
- let result = await this.mQClientSender.publish(metaData);
- if (!result) {
- return 1
- }
- return 0
- } catch (e) {
- this.glogger.error('失败!', e);
- }
-
- return 1
- }
-
-
-
- public async publishTo0(metaData: any): Promise<number> {
- try {
- console.log("发送消息给0")
- let result = await this.mQClientSubscriber.publish(metaData);
- if (!result) {
- return 1
- }
- return 0
- } catch (e) {
- this.glogger.error('失败!', e);
- }
-
- return 1
- }
- }