• 简易消息队列实现 Nodejs + Redis =MQ


    前言

    消息队列是存储数据的一个中间件,可以理解为一个容器。生产者生产消息投递 到队列中,消费者可以拉取消息进行消费,如果消费者目前没有消费的打算,则消息队列会保留消息,直到消费者有消费的打算。

    设计思路

    生产者

    • 连接 redis
    • 向指定通道 通过 lpush 消息

    消费者

    • 连接 redis
    • 死循环通过 brpop 阻塞式获取消息
    • 拿到消息进行消费
    • 循环拿去下一个消息

    Redis

    安装及启动

    此步骤各位道友随意就好,不一定要用docker 。只要保证自己能连接到redis 服务即可。

    # 使用docker 拉取redis 镜像
    docker pull redis:latest
    
    # 启动redis服务 
    # --name 后面是容器名字方便后续维护和管理 
    # -p 后面是指映射容器服务的 6379 端口到宿主机的 6379 端口
    docker run -itd --name redis-mq -p 6379:6379 redis
    
    
    # ============ docker 常用基本操作(题外话) =================
    
    # 拉取镜像
    docker pull 镜像名称 
    
    # 查看镜像
    docker images
    
    # 删除镜像
    docker rmi 镜像名称
    
    # 查看运行容器(仅为启动中的)
    docker ps 
    
    # 查看运行容器(包含未启动)
    docker ps -a
    
    # 启动容器
    docker start 容器名称/容器id
    
    # 停止容器
    docker stop 容器名称/容器id
    
    复制代码

    Nodejs连接

    初始化工程

    # 创建文件夹并进入
    mkdir queue-node-redis && cd queue-node-redis
    
    # yarn 初始化
    yarn init -y
    
    # 下载redis包,
    # 指定版本的原因是尽量减少道友们的失败几率 毕竟前端的工具迭代太快了
    yarn add redis@4.2.0   
    复制代码

    创建 lib 与 utils 目录

    ├── .gitignore
    ├── lib
    ├── package.json
    ├── utils
    │   └── redis.js
    └── yarn.lock
    复制代码

    utils/redis.js

    const redis = require("redis");
    
    const redisCreateClient = async (config) => {
      try {
        const client = redis.createClient({
          url: `redis://${config.host}:${config.port}`,
        });
        await client.connect();
        await client.select(config.db);
        console.log("redis connect success");
        return client;
      } catch (err) {
        console.log("redis connect error");
        throw err;
      }
    };
    
    module.exports = {
      redisCreateClient,
    };
    复制代码

    index.js

    在项目根目录下创建此文件,测试redis连接是否成功

    const { redisCreateClient } = require("./utils/redis");
    const test = async () => {
      const client = await redisCreateClient({
        host: "127.0.0.1",
        port: 6379,
        db: 0,
      });
    };
    test();
    
    复制代码

    出现如下图所示即可

    01-redis GitHub地址

    minimist

    轻量级的命令行参数解析引擎。

    # 安装 minimist
    yarn add minimist@1.2.6
    复制代码

    使用方法

    const systemArg = require("minimist")(process.argv.slice(2));
    console.log(systemArg);
    
    复制代码
    # 运行 
    node index.js --name test
    
    # 输出
    { _: [], name: 'test' }
    复制代码

    02-minimist GitHub地址

    正文开始

    从目录结构及文件创建,手把手教程

    目录结构变更

    ├── config.js  	 # 配置文件
    ├── lib
    │   └── index.js # 主目录入口文件
    ├── package.json 
    ├── utils				 # 工具函数库
    │   └── redis.js
    └── yarn.lock
    复制代码

    config.js

    配置文件思路的重要性大于代码的实现

    module.exports = {
      // redis 配置
      redis: {
        default: {
          host: "127.0.0.1",
          port: 6379,
          password: "",
          db: 0,
        },
      },
      // 消息队列频道设置
      mqList: [
        {
          // 消息频道名称
          name: "QUEUE_MY_MQ",
          // 阻塞式取值超时配置
          brPopTimeout: 100,
          // 开启任务数 此配置需要 PM 启动生效
          instances: 1,
          // redis 配置key
          redis: "default",
        },
      ],
    };
    复制代码

    lib/index.js

    针对配置做程序异常处理

    const systemArg = require("minimist")(process.argv.slice(2));
    const config = require("../config");
    const { bootstrap } = require("./core");
    
    // 程序自检
    
    // 判断是否输入了 频道名称
    if (!systemArg.name) {
      console.error("ERROR: channel name cannot be empty");
      process.exit(99);
    }
    
    // 频道队列配置
    const mqConfig =
      config.mqList.find((item) => item.name === systemArg.name) ?? false;
    
    // 如果config不存在
    if (!mqConfig) {
      console.error("ERROR:  configuration not obtained");
      process.exit(99);
    }
    
    // redis 配置
    const redisConfig = config.redis[mqConfig.redis];
    if (!redisConfig) {
      console.error("ERROR: redis configuration not obtained");
      process.exit(99);
    }
    
    // node index.js --name QUEUE_MY_MQ
    bootstrap(mqConfig, redisConfig);
    
    复制代码

    lib/core.js

    后面的核心逻辑写在此处

    async function bootstrap(config) {
      console.log(config);
    }
    
    module.exports = {
      bootstrap,
    };
    
    复制代码

    03-config GitHub地址

    核心逻辑

    lib/core.js

    const { redisCreateClient } = require("../utils/redis");
    async function bootstrap(mqConfig, redisConfig) {
      try {
        // 创建redis连接
        const client = await redisCreateClient(redisConfig);
        // 通过死循环阻塞程序
        while (true) {
          let res = null;
          console.log("队列执行");
          try {
            // 从队列中获取任务, 采用阻塞式获取任务 最大阻塞时间为config.queue.timeout
            res = await client.brPop(mqConfig.name, mqConfig.brPopTimeout);
            if (res === null) {
              continue;
            }
            console.log("TODO:: Task processing", res);
          } catch (error) {
            console.log("ERROR: redis brPop error", error);
            continue;
          }
        }
      } catch (err) {
        // 处理程序异常
        console.log("ERROR: ", err);
        process.exit(1);
      }
    }
    module.exports = {
      bootstrap,
    };
    
    复制代码

    生成测试数据

    为了接下来的测试,我们先生成一些测试数据

    test/mockMq.js

    const { redisCreateClient } = require("../utils/redis");
    const config = require("../config");
    
    /** 生成 1000 条测试消息 */
    const mockMq = async (key) => {
      const client = await redisCreateClient(config.redis.default);
      for (let i = 0; i < 1000; i++) {
        // 向队列中 push 消息
        await client.lPush(key, "test" + i);
      }
      // 获取队列长度
      const count = await client.lLen(key);
      console.log(`生成1000条测试消息完成,目前共有${count}条消息`);
      // 关闭redis连接
      client.quit();
    };
    
    mockMq("QUEUE_MY_MQ");
    复制代码

    验证脚本有效性

    # 执行消息生成命令
    node ./test/mockMq.js
    
    # 程序输出
    # redis connect success
    # 生成 1000 条测试消息 完成,目前共有 1000 条消息
    
    # 执行开启消费者
    node ./lib/index.js --name QUEUE_MY_MQ 
    # TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test0' }
    # TODO:: Task processing .......
    # TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test999' }
    复制代码

    04-core GitHub地址

    定义Job

    后记

    到此为止建议队列就实现完成了,当然后面还有一些优化。例如通过配置文件 动态引入 Job 和如何使用 Pm2 启动消费队列并且可配置启动个数、启停控制。(ps:此处的坑会很快补上)

    当然除了这些,目前这个简易队列还有很多不足。例如任务执行失败如何处理,消费后如何ack , 没有用成熟的topic 协议,没有实现延时队列。这些坑因为个人水平以及redis本身的特性 可能很长一段时间都不会填了。建议生产用成熟的套件 例如 Kafka RabbitMq 以及一些其他更适合当前语言的套件。

     

     

  • 相关阅读:
    Mac机RVM安装,手动下载安装,经过验证可以正常使用
    护眼灯真的可以护眼吗?2022护眼台灯该怎样选择
    【数据库】如何利用Python中的petl将PostgreSQL中所有表的外键删除,迁移数据,再重建外键
    java代理相关知识
    C++ 中的 typeid 运算符和示例
    web3js实现通过合约方法进行代币交易查询余额
    矩阵问题(宏观调度)
    MFC扩展库BCGControlBar Pro v33.6亮点 - 流程图、Ribbon Bar功能升级
    git中rebase和merge的区别
    【数据密集型系统设计】软件系统的可靠性、可伸缩性、可维护性
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/125907266