1、前言
在上一篇文章中,完成了ZooKeeper注册中心,添加了一个简单的本地缓存
但是,存在一些问题:
- 当本地缓存OK,ZooKeeper对应服务有新的实例时,本地缓存不会自动更新
- 当ZooKeeper对应服务实例关闭,本地缓存不会监控到实例消失
2、编写
之前我们是将缓存直接放在ZooKeeperClientUtils中的,维护一个Map集合。我们将缓存部分移动到ZooKeeperClientCache中,缓存数据从这里获取:
我们监听树上所有节点的变化情况,对于包含实例的变化,每次获取对应的服务信息,然后通过Clinet查询现存的对应服务的实例,进行更新。
watchPathSet维护了Client调用过的服务集合,对于调用过的服务才开启本地的缓存,并且进行更新。
instances即为本地缓存集合
@Slf4j public class ZookeeperClientCache { private static final Map> instances=new ConcurrentHashMap<>(); private static final Set watchPathSet=new ConcurrentHashSet<>(); private static CuratorFramework zookeeperClient; private static boolean isListening=false; //将服务加入监听set中 public static void addListenService(String service){ //开启服务监听 openListen(); //path路径放入 watchPathSet.add(ZookeeperUtil.serviceName2Path(service)); } //添加本地缓存,同时开启监听服务 public static void addLocalCache(String serviceName,List addressList) { //直接替换原本的缓存 instances.put(serviceName,addressList); //将服务加入监听set addListenService(serviceName); } public static void cleanLocalCache(String serviceName){ log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName); instances.remove(serviceName); } public static boolean containsKey(String serviceName){ return instances.containsKey(serviceName); } public static List getOrDefault(String serviceName){ return instances.getOrDefault(serviceName,null); } public static List getInstances(String serviceName){ try { String path = ZookeeperUtil.serviceName2Path(serviceName); //获取路径下所有的实现 List instancePaths = zookeeperClient.getChildren().forPath(path); List addressList = new ArrayList<>(); for (String instancePath : instancePaths) { byte[] bytes = zookeeperClient.getData().forPath(path+"/"+instancePath); String json = new String(bytes); InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json); addressList.add(instance); } return addressList; } catch (Exception e) { log.error("服务获取失败====>{}",e); throw new RpcException(RpcError.SERVICE_NONE_INSTANCE); } } private static synchronized void openListen(){ //已初始化过 if (isListening){ return; } //注入client if (zookeeperClient==null) { zookeeperClient=ZookeeperUtil.getZookeeperClient(); } TreeCache cache = TreeCache.newBuilder(zookeeperClient, "/cn/zko0/myRpc/api").setCacheData(true).build(); cache.getListenable().addListener((c, event) -> { if ( event.getData() != null ) { System.out.println("type=" + event.getType() + " path=" + event.getData().getPath()); //可以通过event.type来进行节点的处理,我这里直接多节点每次行为做reload if (event.getData().getPath().contains("Service/")){ //是服务节点,做更新 String path = event.getData().getPath(); //去除尾部实例段 path=path.substring(0,path.lastIndexOf("/")); String serviceName = ZookeeperUtil.path2ServiceName(path); if (watchPathSet.contains(path)) { log.info("更新本地缓存"); List addressList = getInstances(serviceName); addLocalCache(serviceName,addressList); } } } else { System.out.println("type=" + event.getType()); } }); try { cache.start(); } catch (Exception e) { throw new RuntimeException(e); } isListening=true; } }
创建完Cache类,只需要修改之前ZooKeeperClientUtils中,从当前类改为Cache类获取即可:
完整代码:
@Slf4j public class ZookeeperClientUtils { private static CuratorFramework client = ZookeeperUtil.getZookeeperClient(); public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) { InetSocketAddress address; //本地缓存查询 if (ZookeeperClientCache.containsKey(serviceName)){ List addressList = ZookeeperClientCache.getOrDefault(serviceName); if (!addressList.isEmpty()){ //使用lb进行负载均衡 return loadBalancer.select(addressList); } } try { String path = ZookeeperUtil.serviceName2Path(serviceName); //获取路径下所有的实现 List instancePaths = client.getChildren().forPath(path); List addressList = new ArrayList<>(); for (String instancePath : instancePaths) { byte[] bytes = client.getData().forPath(path+"/"+instancePath); String json = new String(bytes); InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json); addressList.add(instance); } ZookeeperClientCache.addLocalCache(serviceName,addressList); return loadBalancer.select(addressList); } catch (Exception e) { log.error("服务获取失败====>{}",e); throw new RpcException(RpcError.SERVICE_NONE_INSTANCE); } } }
3、测试
实现上述代码,下面是服务监听的简单测试
开启Server,Client:
关闭Server,Server自动进行服务的注销:
Client服务监控: