• 【架构师视角系列】QConfig配置中心系列之Server端(三)


    声明

    原创文章,转载请标注。https://www.cnblogs.com/boycelee/p/18055933
    《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

    配置中心系列文章

    《【架构师视角系列】Apollo配置中心之架构设计(一)》https://www.cnblogs.com/boycelee/p/17967590
    《【架构师视角系列】Apollo配置中心之Client端(二)》https://www.cnblogs.com/boycelee/p/17978027
    《【架构师视角系列】Apollo配置中心之Server端(ConfigSevice)(三)》https://www.cnblogs.com/boycelee/p/18005318
    《【架构师视角系列】QConfig配置中心系列之架构设计(一)》https://www.cnblogs.com/boycelee/p/18013653
    《【架构师视角系列】QConfig配置中心系列之Client端(二)》https://www.cnblogs.com/boycelee/p/18033286
    《【架构师视角系列】QConfig配置中心系列之Server端(三)》https://www.cnblogs.com/boycelee/p/18055933

    一、通知与配置拉取

    二、设计思考

    1、Admin如何通知Server所有实例配置发生变更?

    2、Server如何通知Client端配置发生变更?

    3、Client如何拉取配置?

    三、源码分析

    1、Admin配置推送

    1.1、主动推送

    1.1.1、逻辑描述

    QConfig的Server配置发现有两种方式,一种是主动推送,另一种是被动扫描。

    主动发现是Admin(管理平台)通过注册中心获取到已经注册的Server实例相关IP与Port信息,然后通过遍历的方式调用Server接口通知实例此时有配置更新。

    被动发现是Server实例中自主定时进行数据库扫描,当发现新版本时通知Client端有配置变更。

    1.1.2、时序图

    1.1.3、代码位置

    1.1.3.1、NotifyServiceImpl#notifyPush

    当用户在操作平台进行配置修改时,会调用该接口进行配置变更推送,由于需要通知所有已经部署的Servers有配置更新,所以需要从注册中心中获取到对应的Host信息,然后通过遍历的方式进行配置推送。

    @Service
    public class NotifyServiceImpl implements NotifyService, InitializingBean {
    
        /**
         * 管理平台操作,配置变更通知
         */
        @Override
        public void notifyPush(final ConfigMeta meta, final long version, List destinations) {
            // 从注册中心(Eureka)获取Server实例的Hosts信息
            List serverUrls = getServerUrls();
            if (serverUrls.isEmpty()) {
                logger.warn("notify push server, {}, version: {}, but no server, {}", meta, version, destinations);
                return;
            }
    
            // Server中接收变更推送的接口URL
            String uri = this.notifyPushUrl;
            logger.info("notify push server, {}, version: {}, uri: {}, servers: {}, {}", meta, version, uri, serverUrls, destinations);
            StringBuilder sb = new StringBuilder();
            for (PushItemWithHostName item : destinations) {
                sb.append(item.getHostname()).append(',')
                        .append(item.getIp()).append(',')
                        .append(item.getPort()).append(Constants.LINE);
            }
            final String destinationsStr = sb.toString();
            
            // 根据已注册Server的Host列表,配置信息、配置版本等信息,执行通知推送动作
            doNotify(serverUrls, uri, "push", new Function() {
                @Override
                public Request apply(String url) {
                    AsyncHttpClient.BoundRequestBuilder builder = getBoundRequestBuilder(url, meta, version, destinationsStr);
                    return builder.build();
                }
            });
        }
    
        /**
         * 获取注册中心中已注册的Server Hosts信息
         */
        private List getServerUrls() {
            return serverListService.getOnlineServerHosts();
        }
    
        private void doNotify(List serverUrls, String uri, String type, Function requestBuilder) {
            List> futures = Lists.newArrayListWithCapacity(serverUrls.size());
            for (String oneServer : serverUrls) {
                String url = "http://" + oneServer + "/" + uri;
                Request request = requestBuilder.apply(url);
                ListenableFuture future = HttpListenableFuture.wrap(httpClient.executeRequest(request));
                futures.add(future);
            }
    
            dealResult(futures, serverUrls, type);
        }
    
        
    }
    
    1.1.3.2、LongPollingStoreImpl#manualPush
    @Service
    public class LongPollingStoreImpl implements LongPollingStore {
    
        private static final ConcurrentMap> listenerMappings = Maps.newConcurrentMap();
    
        private static final int DEFAULT_THREAD_COUNT = 4;
    
        private static final long DEFAULT_TIMEOUT = 60 * 1000L;
    
        private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(
                DEFAULT_THREAD_COUNT, new NamedThreadFactory("qconfig-config-listener-push"));
    
        private static ExecutorService onChangeExecutor = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("config-on-change"));
    
        @Override
        public void manualPush(ConfigMeta meta, long version, final Set ipAndPorts) {
            logger.info("push client file: {}, version {}, {}", meta, version, ipAndPorts);
            Set ips = Sets.newHashSetWithExpectedSize(ipAndPorts.size());
            for (IpAndPort ipAndPort : ipAndPorts) {
                ips.add(ipAndPort.getIp());
            }
    
            manualPushIps(meta, version, ips);
        }
    
        @Override
        public void manualPushIps(ConfigMeta meta, long version, final Set ips) {
            logger.info("push client file: {}, version {}, {}", meta, version, ips);
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                doChange(meta, version, Constants.PULL, new Predicate() {
                    @Override
                    public boolean apply(Listener input) {
                        return ips.contains(input.getContextHolder().getIp());
                    }
                });
            } finally {
                Monitor.filePushOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    
        @Override
        public void onChange(final ConfigMeta meta, final long version) {
            logger.info("file change: {}, version {}", meta, version);
            onChangeExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    try {
                        doChange(meta, version, Constants.UPDATE, Predicates.alwaysTrue());
                    } finally {
                        Monitor.fileOnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
                    }
                }
            });
        }
    
        private void doChange(ConfigMeta meta, long newVersion, String type, Predicate needChange) {
            List listeners = getListeners(meta, needChange);
            if (listeners.isEmpty()) {
                return;
            }
    
            Changed change = new Changed(meta, newVersion);
            // 如果没超过直接推送数量,则直接推送
            if (listeners.size() <= pushConfig.getDirectPushLimit()) {
                directDoChange(listeners, change, type);
            } else {
                // 如果超过一定数量,则scheduled定时,通过一定节奏来推送,避免惊群
                PushItem pushItem = new PushItem(listeners, type, change);
                scheduledExecutor.execute(new PushRunnable(pushItem));
            }
        }
    
        private void directDoChange(List listeners, Changed change, String type) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                for (Listener listener : listeners) {
                    logger.debug("return {}, {}", listener, change);
                    returnChange(change, listener, type);
                }
            } catch (Exception e) {
                Monitor.batchReturnChangeFailCounter.inc();
                logger.error("batch direct return changes error, type {}, change {}", type, change, e);
            } finally {
                Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    
        private static class PushRunnable implements Runnable {
    
            private final PushItem pushItem;
    
            private PushRunnable(PushItem pushItem) {
                this.pushItem = pushItem;
            }
    
            @Override
            public void run() {
                Stopwatch stopwatch = Stopwatch.createStarted();
                try {
                    long start = System.currentTimeMillis();
                    PushConfig config = pushConfig;
                    int num = Math.min(pushItem.getListeners().size(), config.getPushMax());
                    for (int i = 0; i < num; ++i) {
                        Listener listener = pushItem.getListeners().poll();
                        returnChange(pushItem.getChange(), listener, pushItem.getType());
                    }
    
                    if (!pushItem.getListeners().isEmpty()) {
                        long elapsed = System.currentTimeMillis() - start;
                        long delay;
                        if (elapsed >= config.getPushInterval()) {
                            delay = 0;
                        } else {
                            delay = config.getPushInterval() - elapsed;
                        }
                        //一次推送后,以这次推送时间为起始时间,延迟一定时间后再次推送。这里的PushRunnable递归执行
                        scheduledExecutor.schedule(new PushRunnable(pushItem), delay, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
                    Monitor.batchReturnChangeFailCounter.inc();
                    logger.error("batch return changes error, {}", pushItem, e);
                } finally {
                    Monitor.batchReturnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
                }
            }
        }
    
        private static void returnChange(Changed change, Listener listener, String type) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                // 通知注册的监听器,响应client,返回版本信息
                listener.onChange(change, type);
            } finally {
                Monitor.returnChangeTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    
    }
    

    1.2、被动推送

    1.2.1、逻辑描述

    首次启动或启动后每3分钟,刷新一次配置的最新版本,如果出现最新版本,则触发推送逻辑,将配置最新的版本推送至Client端中。

    1.2.2、代码位置

    1.2.2.1、CacheConfigVersionServiceImpl#freshConfigVersionCache
    @Service
    public class CacheConfigVersionServiceImpl implements CacheConfigVersionService {
    
        private volatile ConcurrentMap cache = Maps.newConcurrentMap();
    
        /**
         * 首次启动或启动后每3分钟,刷新一次配置的最新版本
         */
        @PostConstruct
        public void init() {
            freshConfigVersionCache();
    
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            // 每3分钟执行一次缓存刷新,判断配置是否有最新版本
            scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    Thread.currentThread().setName("fresh-config-version-thread");
                    try {
                        freshConfigVersionCache();
                    } catch (Throwable e) {
                        logger.error("fresh config version error", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);
        }
    
        @Override
        public Optional getVersion(ConfigMeta meta) {
            return Optional.fromNullable(cache.get(meta));
        }
    
        /**
         * 定时刷新配置最新版本,如果出现最新版本,则触发推送逻辑
         */
        private void freshConfigVersionCache() {
            Stopwatch stopwatch = Stopwatch.createStarted();
            try {
                logger.info("fresh config version cache");
                List> configIds = configDao.loadAll();
    
                ConcurrentMap newCache = new ConcurrentHashMap(configIds.size());
                ConcurrentMap oldCache = this.cache;
    
                // 判断是否有最新版本
                synchronized (this) {
                    for (VersionData configId : configIds) {
                        long newVersion = configId.getVersion();
                        Long oldVersion = cache.get(configId.getData());
                        // 暂时不考虑delete的情况
                        // 从数据库load数据先于配置更新
                        if (oldVersion != null && oldVersion > newVersion) {
                            newVersion = oldVersion;
                        }
                        // 如果有最新版本则刷新缓存
                        newCache.put(configId.getData(), newVersion);
                    }
    
                    this.cache = newCache;
                }
    
                logger.info("fresh config version cache successOf, count [{}]", configIds.size());
                int updates = 0;
                for (Map.Entry oldEntry : oldCache.entrySet()) {
                    ConfigMeta meta = oldEntry.getKey();
                    Long oldVersion = oldEntry.getValue();
                    Long newVersion = newCache.get(meta);
                    if (newVersion != null && newVersion > oldVersion) {
                        updates += 1;
                        // 配置变更,通知Client端
                        longPollingStore.onChange(meta, newVersion);
                    }
                }
                logger.info("fresh size={} config version cache from db", updates);
            } finally {
                Monitor.freshConfigVersionCacheTimer.update(stopwatch.elapsed().toMillis(), TimeUnit.MILLISECONDS);
            }
        }
    }
    

    2、变更监听

    2.1.1、逻辑描述

    Client端与Server端建立长轮询,长轮询建立完成之后会为当前请求建立一个监听器,当配置发生变变更时就会触发监听器,然后通过监听机制结束长轮询并返回最新的配置版本。如果没有版本变更,长轮询会每分钟断开重新建立一次。

    2.1.2、时序图

    2.1.3、代码位置

    2.1.3.1、AbstractCheckVersionServlet#doPost
    public abstract class AbstractCheckVersionServlet extends AbstractServlet {
    
        private static final long serialVersionUID = -8278568383506314625L;
    
        @Override
        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            ...
            
            checkVersion(requests, req, resp);
        }
    }
    
    2.1.3.2、LongPollingCheckServlet#checkVersion
    public class LongPollingCheckServlet extends AbstractCheckVersionServlet {
    
        @Override
        protected void checkVersion(List checkRequests, HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
            ...
            try {
                // 异步
                AsyncContext context = req.startAsync();
                // (核心流程,重点关注),执行版本检查(长轮询)
                getLongPollingProcessService().process(context, checkRequests);
            } catch (Throwable e) {
                // never come here !!!
                logger.error("服务异常", e);
            }
        }
    }
    
    2.1.3.3、LongPollingProcessServiceImpl#process
    @Service
    public class LongPollingProcessServiceImpl implements LongPollingProcessService {
    
        @PostConstruct
        public void init() {
            MapConfig config = MapConfig.get("config.properties");
            config.asMap();
            // 向config中添加监听器
            config.addListener(new Configuration.ConfigListener>() {
                @Override
                public void onLoad(Map conf) {
                    String newTimeout = conf.get("longPolling.server.timeout");
                    if (!Strings.isNullOrEmpty(newTimeout)) {
                        timeout = Numbers.toLong(newTimeout, DEFAULT_TIMEOUT);
                    }
                }
            });
        }
    
        // 核心逻辑,重点关注
        @Override
        public void process(AsyncContext context, List requests) {
            IpAndPort address = new IpAndPort(clientInfoService.getIp(), clientInfoService.getPort());
            AsyncContextHolder contextHolder = new AsyncContextHolder(context, address);
            // 设置超时
            context.setTimeout(timeout);
            // 设置监听器
            context.addListener(new TimeoutServletListener(contextHolder));
            processCheckRequests(requests, clientInfoService.getIp(), contextHolder);
        }
    
        private void processCheckRequests(List requests, String ip, AsyncContextHolder contextHolder) {
            CheckResult result = checkService.check(requests, ip, qFileFactory);
            logger.info("profile:{}, result change list {} for check request {}", clientInfoService.getProfile(), result.getChanges(), requests);
    
            if (!result.getChanges().isEmpty()) {
                returnChanges(AbstractCheckConfigServlet.formatOutput(CheckUtil.processStringCase(result.getChanges())), contextHolder, Constants.UPDATE);
                return;
            }
            // 为该请求注册监听器,并存放至longPollingStore中
            addListener(result.getRequestsNoChange(), contextHolder);
            // 注册client
            registerOnlineClients(result, contextHolder);
        }
    
        private void addListener(Map requests, AsyncContextHolder contextHolder) {
            for (Map.Entry noChangeEntry : requests.entrySet()) {
                CheckRequest request = noChangeEntry.getKey();
                QFile qFile = noChangeEntry.getValue();
                if (!contextHolder.isComplete()) {
                    // 根据请求创建监听器
                    Listener listener = qFile.createListener(request, contextHolder);
                    // 将监听器存储至longPollingStore
                    longPollingStore.addListener(listener);
                }
            }
        }
    
        private void registerOnlineClients(CheckResult result, AsyncContextHolder contextHolder) {
            Map noChanges = Maps.newHashMapWithExpectedSize(
                    result.getRequestsNoChange().size() + result.getRequestsLockByFixVersion().size());
            noChanges.putAll(result.getRequestsNoChange());
            noChanges.putAll(result.getRequestsLockByFixVersion());
    
            for (Map.Entry noChangeEntry : noChanges.entrySet()) {
                CheckRequest request = noChangeEntry.getKey();
                QFile qFile = noChangeEntry.getValue();
                if (!contextHolder.isComplete()) {
                    long version = request.getVersion();
                    ConfigMeta meta = qFile.getRealMeta();
                    String ip = contextHolder.getIp();
                    if (qFile instanceof InheritQFileV2) {
                        InheritQFileV2 inheritQFile = (InheritQFileV2) qFile;
                        Optional optional = inheritQFile.getCacheConfigInfoService().getVersion(inheritQFile.getRealMeta());
                        version = optional.isPresent() ? optional.get() : version;
                        onlineClientListService.register(inheritQFile.getRealMeta(), ip, version);
                    } else {
                        // 注册client,admin(管理平台)获取已经连接的client信息,其中包括ip、配置版本
                        onlineClientListService.register(meta, ip, version);
                    }
                }
            }
        }
    
        /**
         * 配置变化,执行返回
         */
        private void returnChanges(String change, AsyncContextHolder contextHolder, String type) {
            contextHolder.completeRequest(new ChangeReturnAction(change, type));
        }
    }
    
    2.1.3.4、CheckService#check
    @Service
    public class CheckServiceImpl implements CheckService {
        ...
    
        @Override
        public CheckResult check(List requests, String ip, QFileFactory qFileFactory) {
            List requestsNoFile = Lists.newArrayList();
            Map changes = Maps.newHashMap();
            Map requestNoChange = Maps.newHashMap();
            Map requestsLockByFixVersion = Maps.newHashMap();
            for (CheckRequest request : requests) {
                ConfigMeta meta = new ConfigMeta(request.getGroup(), request.getDataId(), request.getProfile());
                Optional qFileOptional = qFileFactory.create(meta, cacheConfigInfoService);
                if (!qFileOptional.isPresent()) {
                    requestsNoFile.add(request);
                    continue;
                }
    
                QFile qFile = qFileOptional.get();
                // 核心逻辑,检测版本
                Optional changedOptional = qFile.checkChange(request, ip);
                if (changedOptional.isPresent()) {
                    Optional resultChange = repairChangeWithFixVersion(qFile, request, ip, changedOptional.get());
                    if (resultChange.isPresent()) {
                        changes.put(request, resultChange.get());
                    } else {
                        requestsLockByFixVersion.put(request, qFile);
                    }
                } else {
                    requestNoChange.put(request, qFile);
                }
            }
            return new CheckResult(requestsNoFile, changes, requestNoChange, requestsLockByFixVersion);
        }
    }
    
    2.1.3.5、QFileEntityV1#checkChange
    public class QFileEntityV1 extends AbstractQFileEntity implements QFile {
    
        public QFileEntityV1(ConfigMeta meta,
                             CacheConfigInfoService cacheConfigInfoService,
                             ConfigStore configStore,
                             LogService logService,
                             ClientInfoService clientInfoService) {
            super(meta, cacheConfigInfoService, configStore, logService, clientInfoService);
        }
    
        @Override
        public Optional checkChange(CheckRequest request, String ip) {
            ConfigMeta meta = getSourceMeta();
            // 从缓存中获取配置文件的最新版本
            Optional version = getCacheConfigInfoService().getVersion(meta, ip);
            if (!version.isPresent()) {
                return Optional.absent();
            }
    
            if (version.get() <= request.getVersion()) {
                return Optional.absent();
            }
    
            return Optional.of(new Changed(meta.getGroup(), meta.getDataId(), meta.getProfile(), version.get()));
        }
    }
    
    2.1.3.6、CacheConfigInfoService#getVersion
    @Service("cacheConfigInfoService")
    public class CacheConfigInfoService implements ConfigInfoService {
        ... 
        @Override
        public Optional getVersion(ConfigMeta meta, String ip) {
            // 获取配置已发布的最新版本
            Optional publishVersion = getVersion(meta);
            // 获取推送给该IP的配置的最新灰度版本
            Optional pushVersion = getPushVersion(meta, ip);
            return VersionUtil.getLoadVersion(publishVersion, pushVersion);
        }
    }
    

    3、Client配置拉取

    3.1.1、逻辑描述

    根据长轮询后Client端获取到的配置文件对应的最新版本信息,查询最新的配置数据。查询顺序是先查询缓存,如果查找不到则通过本地文件查找,如果再查不到则查询数据库。这样可以有效缓解数据库压力。

    3.1.2、代码位置

    3.1.2.1、ConfigStoreImpl#findConfig
    @Service
    public class ConfigStoreImpl implements ConfigStore {
    
        private LoadingCache, ChecksumData> configCache;
    
        @PostConstruct
        private void init() {
            configCache = CacheBuilder.newBuilder()
                    .maximumSize(5000) // 最大数量
                    .expireAfterAccess(10, TimeUnit.SECONDS) // 访问失效时间
                    .recordStats()
                    .build(new CacheLoader, ChecksumData>() {
                        @Override
                        public ChecksumData load(VersionData configId) throws ConfigNotFoundException {
                            
                            return loadConfig(configId);
                        }
                    });
    
            Metrics.gauge("configFile_notFound_cache_hitRate", new Supplier() {
                @Override
                public Double get() {
                    return configCache.stats().hitRate();
                }
            });
        }
    
        /**
         * 查本地guava cache
         */
        @Override
        public ChecksumData findConfig(VersionData configId) throws ConfigNotFoundException {
            try {
                return configCache.get(configId);
            } catch (ExecutionException e) {
                if (e.getCause() instanceof ConfigNotFoundException) {
                    throw (ConfigNotFoundException) e.getCause();
                } else {
                    log.error("find config error, configId:{}", configId, e);
                    throw new RuntimeException(e.getCause());
                }
            }
        }
    
        /**
         * 从本地文件或数据库中获取配置信息
         */
        private ChecksumData loadConfig(VersionData configId) throws ConfigNotFoundException {
            // 从本地配置文件中查询配置信息
            ChecksumData config = findFromDisk(configId);
            if (config != null) {
                return config;
            }
    
            String groupId = configId.getData().getGroup();
            Monitor.notFoundConfigFileFromDiskCounterInc(groupId);
            log.warn("config not found from disk: {}", configId);
            // 从数据库中加载配置数据
            config = findFromDb(configId);
            if (config != null) {
                return config;
            }
            Monitor.notFoundConfigFileFromDbCounterInc(groupId);
    
            throw new ConfigNotFoundException();
        }
    
        private ChecksumData findFromDb(VersionData configId) {
            ChecksumData config = configDao.loadFromCandidateSnapshot(configId);
            if (config != null) {
                saveToFile(configId, config);
            }
            return config;
        }
    }
    

    三、最后

    《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

    懂得不多,做得太少。欢迎批评、指正。

  • 相关阅读:
    The 2020 ICPC Asia Macau Regional Contest
    8086 汇编笔记(八):转移指令的原理
    【OpenCV】—线性滤波:方框滤波、均值滤波、高斯滤波
    设计一款可扩展和基于windows系统的一键处理表格小工具思路
    深入剖析Tomcat(四) 剖析Tomcat的默认连接器
    一致性思维链(SELF-CONSISTENCY IMPROVES CHAIN OF THOUGHT REASONING IN LANGUAGE MODELS)
    MacOS ventura跳过配置锁
    SAP STMS请求重复传输
    MySQL源码解析之执行计划
    leetcode 44. 通配符匹配(困难,dp)
  • 原文地址:https://www.cnblogs.com/boycelee/p/18055933