• Rpc-实现Client对ZooKeeper的服务监听


    1、前言

    在上一篇文章中,完成了ZooKeeper注册中心,添加了一个简单的本地缓存

    但是,存在一些问题:

    1. 当本地缓存OK,ZooKeeper对应服务有新的实例时,本地缓存不会自动更新
    2. 当ZooKeeper对应服务实例关闭,本地缓存不会监控到实例消失

    2、编写

    之前我们是将缓存直接放在ZooKeeperClientUtils中的,维护一个Map集合。我们将缓存部分移动到ZooKeeperClientCache中,缓存数据从这里获取:

    我们监听树上所有节点的变化情况,对于包含实例的变化,每次获取对应的服务信息,然后通过Clinet查询现存的对应服务的实例,进行更新。

    watchPathSet维护了Client调用过的服务集合,对于调用过的服务才开启本地的缓存,并且进行更新。

    instances即为本地缓存集合

    @Slf4j
    public class ZookeeperClientCache {
    private static final Map> instances=new ConcurrentHashMap<>();
    private static final Set watchPathSet=new ConcurrentHashSet<>();
    private static CuratorFramework zookeeperClient;
    private static boolean isListening=false;
    //将服务加入监听set中
    public static void addListenService(String service){
    //开启服务监听
    openListen();
    //path路径放入
    watchPathSet.add(ZookeeperUtil.serviceName2Path(service));
    }
    //添加本地缓存,同时开启监听服务
    public static void addLocalCache(String serviceName,List addressList){
    //直接替换原本的缓存
    instances.put(serviceName,addressList);
    //将服务加入监听set
    addListenService(serviceName);
    }
    public static void cleanLocalCache(String serviceName){
    log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
    instances.remove(serviceName);
    }
    public static boolean containsKey(String serviceName){
    return instances.containsKey(serviceName);
    }
    public static List getOrDefault(String serviceName){
    return instances.getOrDefault(serviceName,null);
    }
    public static List getInstances(String serviceName){
    try {
    String path = ZookeeperUtil.serviceName2Path(serviceName);
    //获取路径下所有的实现
    List instancePaths = zookeeperClient.getChildren().forPath(path);
    List addressList = new ArrayList<>();
    for (String instancePath : instancePaths) {
    byte[] bytes = zookeeperClient.getData().forPath(path+"/"+instancePath);
    String json = new String(bytes);
    InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
    addressList.add(instance);
    }
    return addressList;
    } catch (Exception e) {
    log.error("服务获取失败====>{}",e);
    throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
    }
    }
    private static synchronized void openListen(){
    //已初始化过
    if (isListening){
    return;
    }
    //注入client
    if (zookeeperClient==null) {
    zookeeperClient=ZookeeperUtil.getZookeeperClient();
    }
    TreeCache cache = TreeCache.newBuilder(zookeeperClient, "/cn/zko0/myRpc/api").setCacheData(true).build();
    cache.getListenable().addListener((c, event) -> {
    if ( event.getData() != null )
    {
    System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
    //可以通过event.type来进行节点的处理,我这里直接多节点每次行为做reload
    if (event.getData().getPath().contains("Service/")){
    //是服务节点,做更新
    String path = event.getData().getPath();
    //去除尾部实例段
    path=path.substring(0,path.lastIndexOf("/"));
    String serviceName = ZookeeperUtil.path2ServiceName(path);
    if (watchPathSet.contains(path)) {
    log.info("更新本地缓存");
    List addressList = getInstances(serviceName);
    addLocalCache(serviceName,addressList);
    }
    }
    }
    else
    {
    System.out.println("type=" + event.getType());
    }
    });
    try {
    cache.start();
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    isListening=true;
    }
    }

    创建完Cache类,只需要修改之前ZooKeeperClientUtils中,从当前类改为Cache类获取即可:

    image-20230220133343196

    完整代码:

    @Slf4j
    public class ZookeeperClientUtils {
    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();
    public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
    InetSocketAddress address;
    //本地缓存查询
    if (ZookeeperClientCache.containsKey(serviceName)){
    List addressList = ZookeeperClientCache.getOrDefault(serviceName);
    if (!addressList.isEmpty()){
    //使用lb进行负载均衡
    return loadBalancer.select(addressList);
    }
    }
    try {
    String path = ZookeeperUtil.serviceName2Path(serviceName);
    //获取路径下所有的实现
    List instancePaths = client.getChildren().forPath(path);
    List addressList = new ArrayList<>();
    for (String instancePath : instancePaths) {
    byte[] bytes = client.getData().forPath(path+"/"+instancePath);
    String json = new String(bytes);
    InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
    addressList.add(instance);
    }
    ZookeeperClientCache.addLocalCache(serviceName,addressList);
    return loadBalancer.select(addressList);
    } catch (Exception e) {
    log.error("服务获取失败====>{}",e);
    throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
    }
    }
    }

    3、测试

    实现上述代码,下面是服务监听的简单测试

    开启Server,Client:

    image-20230220134850987

    关闭Server,Server自动进行服务的注销:

    image-20230220135154126

    Client服务监控:

    image-20230220135320003

  • 相关阅读:
    [NOI2020统一省选 A] 组合数问题 (推式子)
    微服务学习第三十二节
    vs2022的下载及安装教程(Visual Studio 2022)
    JVM第六讲:JVM 基础 - Java 内存模型引入
    GitHub操作之跨团队操作
    OTG 配置为U盘
    RFID固定资产定位管理系统-智慧资产人员可视化管理系统
    我的测试文章
    LQ0149 排序【枚举】
    ElementUI组件-日期时间控件设置禁用日期
  • 原文地址:https://www.cnblogs.com/zko0/p/17137124.html