• Nodejs 使用 ZooKeeper 做服务发现


    将单体服务拆分为微服务后,为了服务高可用,一般会做集群多实例。但在分布式下,怎么进行高效、便捷的进行服务访问问题,出现了各类服务注册和服务发现框架。这里使用的是Zookeeper。ZooKeeper 官网 https://zookeeper.apache.org
    我们的业务系统使用的开发语言是JAVA,但是部分页面请求是先到nodejs 做的webportal服务,进行权限校验,校验通过后调用Java提供的API。当前阶段Java端已经微服务化,使用Zookeeper作为注册中心,目前只需要让nodejs端,也接入到Zookeeper,作为服务消费者,就能搭建机器环境。

    找轮子

    通过查找,发现npm有现成的库 node-zookeeper-client ,避免重复造轮子,就用它了。

    接入思路

    由于我们只是作为服务消费者,不需要使用服务注册的api,大部分可以直接在文档中找到API。

    编码过程 

    npm 安装

     1 npm i node-zookeeper-client 

    连接ZK

    复制代码
     1 const Zookeeper = require('node-zookeeper-client');
     2 const CONNECTION_STRING = "127.0.0.1:2181"; // ZK的服务地址
     3 const OPTIONS = {
     4   sessionTimeout: 5000 
     5 }
     6 const zk = Zookeeper.createClient(CONNECTION_STRING, OPTIONS);
     7   zk.on('connected', function(){
     8   console.log("zk=====", zk);
     9 });
    10 //获取根节点下的子节点数据
    11 zk.getChildren('/', function(error, children, stat){
    12   if(error){
    13     console.log(error.stack);
    14     return;
    15   }
    16   console.log(children);
    17 })
    18 zk.connect();
    复制代码

    其他API(仅供参考)

    复制代码
     1 // 判断节点是否已存在
     2 zk.exists('/phpnode',function(error,stat){
     3    if(stat){
     4         console.log("节点存在");
     5    }else{
     6        console.log("节点不存在");
     7    }
     8 })
     9  
    10  // 创建/注册节点
    11 zk.create('/phpnode',new Buffer('hello'),function(error,path){
    12    console.log(path);
    13 })
    14 
    15 // 获取节点数据
    16 zk.getData('/phpnode',function(error,data,stat){
    17    console.log(data.toString());
    18 });
    19 
    20 //节点删除
    21 zk.remove('/phpnode',function(error){
    22    if(!error){
    23        console.log('node 节点删除成功');
    24     }
    25 })
    复制代码

    于是有了第一版本代码

    复制代码
     1 const zookeeper = require('node-zookeeper-client');
     2 
     3 
     4 // ZK基础配置信息,正式项目需要从环境文件导入
     5 export const ZK = {
     6     clientAddress: 'localhost:2181/zk/test', // ZK地址
     7     servicePath: '/test-service', // 服务路径
     8 };
     9 
    10 let zkClient = null;
    11 
    12 // 获取服务ip+port
    13 export const getZKServiceBaseUrl = (servicePath) => {
    14     return new Promise((resolve, reject) => {
    15         try {
    16             // 防止重复连接
    17             if (zkClient) {
    18                 disconnectZKService();
    19             }
    20 
    21             // 新建连接
    22             zkClient = zookeeper.createClient(ZK.clientAddress);
    23             // 连接后执行一次
    24             zkClient.once('connected', async function () {
    25                 // 获取服务节点信息
    26                 const res = await listChildren(zkClient, servicePath);
    27                 res.message ? reject(res) : resolve(res);
    28             });
    29 
    30             zkClient.connect();
    31         } catch (error) {
    32             reject(error);
    33         }
    34     });
    35 };
    36 
    37 // 断开链接
    38 export const disconnectZKService = () => {
    39     if (zkClient) {
    40         zkClient.close();
    41     }
    42 };
    43 
    44 // 获取节点信息,ip+port
    45 function listChildren(client, path) {
    46     return new Promise((resolve, reject) => {
    47         client.getChildren(path,
    48             function () {},
    49             function (error, children) {
    50                 if (error) {
    51                     reject({
    52                         ...error,
    53                         message: `获取ZK节点error,Path: ${path}`
    54                     });
    55                 }
    56                 try {
    57                     let addressPath = path + '/';
    58                     if (children.length > 1) {
    59                         //若存在多个地址,则随机获取一个地址
    60                         addressPath += children[Math.floor(Math.random() * children.length)];
    61                     } else {
    62                         //若只有唯一地址,则获取该地址
    63                         addressPath += children[0];
    64                     }
    65                     //获取服务地址
    66                     client.getData(addressPath, function (err, data) {
    67                         if (err) {
    68                             reject({
    69                                 ...error,
    70                                 message: `获取ZK服务地址error,Stack: ${err.stack}`
    71                             });
    72                         }
    73                         if (!data) {
    74                             reject({
    75                                 ...error,
    76                                 message: `ZK data is not exist`
    77                             });
    78                         }
    79                         const serviceInfo = JSON.parse(data);
    80 
    81                         const url = serviceInfo.address + ':' + serviceInfo.port;
    82                         resolve(url);
    83                     });
    84                 } catch (error) {
    85                     reject({
    86                         ...error,
    87                         message: `list ZK children error`
    88                     });
    89                 }
    90             }
    91         );
    92     });
    93 }
    复制代码

    通过测试代码,可以实现调用Java服务。可能一般的程序员实现功能了就好了,可是作为一个有点追求的,感觉代码哪里有问题。具体是哪里呢,盯着屏幕瞅了两分钟,发现每次获取服务都取 ZK 注册中心获取,这个过程涉及到的网络请求而且还不是一次HTTP,如果只是这么简单的改造,程序单纯在性能响应上很有可能还不如老版本。我们可以在获取服务的真实远程地址前,添加一个本地缓存。通过ZK订阅机制,更新本地缓存数据。

    思路虽然明确了,可以api扫了扫,没有我们想要的监听器,如下所示

     

    这怎么办,按理说的应该会有一个,节点数据改变推送的监听器,例如新增,删除,修改等等。找了半天也没找到合适的。

    没办法,接着看源码吧,看了一会,忽然,看到一个似乎可用的,类

     

     这不就是我需要的类吗,但是居然在一方法中注入监听器,先试试吧。

     

     试了一下,嘿,真的可以了,当服务端节点数据发生变动后,会自动触发监听器 watcher 的回调逻辑。这就好办了,改造开始。

    改进后的代码

    复制代码
    const zookeeper = require('node-zookeeper-client');
    var ZK = require('../config/env.js').zk;
    
    const client = Object.freeze({
        zkClient: zookeeper.createClient(ZK.connectionString),
        serviceSet: [], //
        serviceCache: Object.freeze({
            map: new Map(),
    
            /**
             * 更新缓存
             * @param {String} path 服务路径
             * @param {Array} arr 真实访问集合
             */
            updateCache: function (path, arr) {
                this.map.set(path, arr);
            },
    
            /**
             * 从缓存中获取访问地址
             * 
             * @param {String} path 服务路径
             * @returns String 真实访问地址
             */
            getRealPath: function (path) {
                let arr = this.map.get(path);
    
                if (arr.length > 1) //若存在多个地址,则随机获取一个地址
                    return arr[Math.floor(Math.random() * arr.length)];
                else //若只有唯一地址,则获取该地址
                    return arr[0];
            }
        }),
    
        connect: function () {
            console.info("连接 zookeeper");
    
            this.zkClient.once('connected', function () {
                console.info("连接成功");
            });
    
            this.zkClient.connect();
        },
    
        getRealPath: function (serviceName) {
            return new Promise(async (resolve, reject) => {
                if (this.serviceSet.includes(serviceName)) {
                    resolve(this.serviceCache.getRealPath(serviceName));
                } else {
                    // 加载服务节点信息
                    this.loadChildren(serviceName).then(url => resolve(url)).catch(error => reject(error));
                }
            });
        },
    
        loadChildren: function (path) {
            console.info("进入 loadChildren ");
            return new Promise((resolve, reject) => {
                this.zkClient.getChildren(path, (event) => {
                    console.info(" loadChildren watcher ", path, event);
    
                    this.getChildren(event.path);
                }, (error, ids) => {
                    console.info(" loadChildren callback ", path, error, ids);
                    if (error) {
                        reject({
                            ...error,
                            message: `获取ZK节点error,Path: ${path}`
                        });
                    } else {
                        resolve(this.getData(path, ids));
                    }
                });
            });
        },
    
        getChildren: function (path) {
            console.info("进入 getChildren ");
            return new Promise((resolve, reject) => {
                this.zkClient.getChildren(path, (error, ids) => {
                    console.info(" getChildren callback ", path, error, ids);
                    if (error) {
                        reject({
                            ...error,
                            message: `获取ZK节点error,Path: ${path}`
                        });
                    }
    
                    resolve(this.getData(path, ids));
                });
            });
        },
    
        getData: function (path, ids) {
            console.info("进入 getData ");
    
            let pros = ids.map(id => new Promise((resolve, reject) => {
                //获取服务地址
                this.zkClient.getData(path + "/" + id, (error, data) => {
                    console.info(" getData callback ", path, id);
                    if (error) {
                        reject({
                            ...error,
                            message: `获取ZK服务地址error,Stack: ${err.stack}`
                        });
                    }
                    if (!data) {
                        reject({
                            ...error,
                            message: `ZK data is not exist`
                        });
                    }
    
                    const node = JSON.parse(data).payload;
                    const protocol = node.ssl ? "https://" : "http://";
    
                    resolve(`${protocol}${node.host}:${node.port}`);
                });
            }));
    
            return Promise.all(pros).then(arr => this.serviceCache.updateCache(path, arr)).then(() => this.serviceCache.getRealPath(path));
        },
    
        disconnect: function () { //断开连接
            console.info("进入 disconnect ")
            if (this.zkClient) {
                console.info("执行 close")
                this.zkClient.close();
            }
        },
    });
    
    client.connect();
    
    module.exports = {
        getServiceUrl: (path) => client.getRealPath(path),
        disconnect: () => client.disconnect(),
    }
    复制代码

    这样终于,好一点了。

    未完待续(多节点的选择问题)

    多节点选择策略:随机,轮转,粘性 等等,一般不同的项目使用的策略也不太一样,实例中使用的是简单随机策略,后续再进行节点选择的策略问题优化啦。

    关机,收工!!!

     

  • 相关阅读:
    过控Matlab-实验法建立被控过程的数学模型(一)
    刚开始测试自动化? 这些错误不要犯,一定要看!别踩坑!!
    李迟2022年7月工作生活总结
    以数据为中心 的AI v.s. 以模型为中心的AI
    面试官不讲武德,产品经理如何“耗子尾汁”?
    NetCore自带的IOC依赖注入如何实现一个接口多个实现类的注入
    OpenCV-Python实战(4) —— OpenCV 五角星各点在坐标系上面的坐标计算(以重心为原点)
    【数据结构】链表的学习和介绍
    【后端框架】MyBatis(3)
    分布式调度Elastic-job
  • 原文地址:https://www.cnblogs.com/songyz/p/17131345.html