• 【架构师视角系列】Apollo配置中心之Server端(ConfigSevice)(三)


    声明

    原创文章,转载请标注。https://www.cnblogs.com/boycelee/p/17993697
    《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

    配置中心系列文章

    《【架构师视角系列】Apollo配置中心之架构设计(一)》https://www.cnblogs.com/boycelee/p/17967590
    《【架构师视角系列】Apollo配置中心之Client端(二)》https://www.cnblogs.com/boycelee/p/17978027
    《【架构师视角系列】Apollo配置中心之Server端(ConfigSevice)(三)》https://www.cnblogs.com/boycelee/p/18005318
    《【架构师视角系列】QConfig配置中心系列之架构设计(一)》https://www.cnblogs.com/boycelee/p/18013653
    《【架构师视角系列】QConfig配置中心系列之Client端(二)》https://www.cnblogs.com/boycelee/p/18033286

    一、通知机制

    二、架构思考

    1、配置变更如何通知客户端?

    (1)如何建立长轮询?

    2、客户端如何拉取数据?

    (1)如何拉取数据?

    3、如何发现变更数据?

    (1)为什么使用Config Service定时扫描ReleaseMessage的方式?

    (2)为什么不采用Client调用Config Service直接查询的方式?

    三、源码剖析

    1、配置监听

    1.1、建立长轮询

    1.1.1、逻辑描述

    1.1.2、时序图

    1.1.3、代码位置

    1.1.3.1、NotificationControllerV2#pollNotification
    @RestController
    @RequestMapping("/notifications/v2")
    public class NotificationControllerV2 implements ReleaseMessageListener {
        ...
        
        private final Multimap deferredResults = Multimaps.synchronizedSetMultimap(TreeMultimap.create(String.CASE_INSENSITIVE_ORDER, Ordering.natural()));
        
        @GetMapping
        public DeferredResult>> pollNotification(
                @RequestParam(value = "appId") String appId,
                @RequestParam(value = "cluster") String cluster,
                @RequestParam(value = "notifications") String notificationsAsString,
                @RequestParam(value = "dataCenter", required = false) String dataCenter,
                @RequestParam(value = "ip", required = false) String clientIp) {
            List notifications = null;
    
            // 反序列化
            try {
                notifications =
                        gson.fromJson(notificationsAsString, notificationsTypeReference);
            } catch (Throwable ex) {
                Tracer.logError(ex);
            }
            // (非核心,不关注)
            Map filteredNotifications = filterNotifications(appId, notifications);
    
            // (核心流程,重点关注)
            // 使用Wrapper封装DeferredResult,利用Spring的DeferredResult + tomcat实现长轮询。
            DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
            Set namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
            Map clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());
    
            for (Map.Entry notificationEntry : filteredNotifications.entrySet()) {
                String normalizedNamespace = notificationEntry.getKey();
                ApolloConfigNotification notification = notificationEntry.getValue();
                namespaces.add(normalizedNamespace);
                clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
                if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
                    // namespace名的关系映射(非核心,不关注)
                    deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
                }
            }
    
            // watchedKeysMap 格式: namespace : appId_cluster_namespace
            Multimap watchedKeysMap = watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
    
            Set watchedKeys = Sets.newHashSet(watchedKeysMap.values());
    
            /**
             * 1、set deferredResult before the check, for avoid more waiting
             * If the check before setting deferredResult,it may receive a notification the next time
             * when method handleMessage is executed between check and set deferredResult.
             */
            deferredResultWrapper
                    .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
            
            // 完成时执行,将对应deferredResultWrapper从deferredResults中移除,表示该本次长轮询结束
            deferredResultWrapper.onCompletion(() -> {
                //unregister all keys
                for (String key : watchedKeys) {
                    deferredResults.remove(key, deferredResultWrapper);
                }
                logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
            });
            // (核心流程,重点关注)
            // watchedKey : 格式:appId_cluster_namespace
            // register all keys
            // 将namespace:deferredResult注册至deferredResults(Map容器)中。
            // 多个namespace对应同一个deferredResult。当namespace发生变化时,就会从deferredResults找到对应的deferredResult,通知客户端。
            // 思考:DeferredResult是什么?和长轮询有什么关系?可以和其他的异步工具有什么区别?
            for (String key : watchedKeys) {
                this.deferredResults.put(key, deferredResultWrapper);
            }
    
            /**
             * 2、check new release
             */
            // (核心流程,重点关注)
            // 同步检测是否有最新版本,通过WatchedKeys(格式:appId_cluster_namespace)拉取最新通知信息(如果有变更直接返回,并不会等后续通知)。
            List latestReleaseMessages = releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
    
            /**
             * Manually close the entity manager.
             * Since for async request, Spring won't do so until the request is finished,
             * which is unacceptable since we are doing long polling - means the db connection would be hold
             * for a very long time
             */
            entityManagerUtil.closeEntityManager();
    
            // (核心流程,重点关注)
            // 对latestReleaseMessages进行封装,将其封装成ApolloConfigNotification类型
            // 此处ApolloConfigNotification中只返回配置发生变更的namespace及其对应的notificationId
            List newNotifications = getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap, latestReleaseMessages);
    
            // (核心流程,重点关注)
            // 注意:这里是同步返回,如果有查询发现有最新版本,直接返回,不需要等待通知。
            // DeferredResult则需要用户在代码中手动set值到DeferredResult,否则即便异步线程中的任务执行完毕,DeferredResult仍然不会向客户端返回任何结果。
            // 如果是有新配置,则通过handleMessage函数向deferredResultWrapper#setResult赋值.
            if (!CollectionUtils.isEmpty(newNotifications)) {
                // 非主动变更,其他情况通过此处进行相应。
                deferredResultWrapper.setResult(newNotifications);
            }
    
            return deferredResultWrapper.getResult();
        }
    
        /**
        * 此处ApolloConfigNotification中只返回配置发生变更的namespace及其对应的notificationId
        **/
        private List getApolloConfigNotifications(Set namespaces,
                                                                            Map clientSideNotifications,
                                                                            Multimap watchedKeysMap,
                                                                            List latestReleaseMessages) {
            List newNotifications = Lists.newArrayList();
            // 判断是否查询到namespace的最新版本消息
            if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
                Map latestNotifications = Maps.newHashMap();
                for (ReleaseMessage releaseMessage : latestReleaseMessages) {
                    latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
                }
    
                // 遍历namespace
                for (String namespace : namespaces) {
                    long clientSideId = clientSideNotifications.get(namespace);
                    long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
                    Collection namespaceWatchedKeys = watchedKeysMap.get(namespace);
                    for (String namespaceWatchedKey : namespaceWatchedKeys) {
                        // 获取最新版本的namespace对应的nofiticationId
                        long namespaceNotificationId =
                                latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);
                        if (namespaceNotificationId > latestId) {
                            latestId = namespaceNotificationId;
                        }
                    }
                    // 如果Config Service中的namespace对应的通知编号大于Client上传的namespace对应的通知编号,则说明有配置变更,就执行封装动作,将最新的namespace对应的通知编号(notificationId)返回
                    if (latestId > clientSideId) {
                        ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);
                        namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->
                                notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));
                        newNotifications.add(notification);
                    }
                }
            }
            return newNotifications;
        }
    
    }
    
    1.1.3.2、ReleaseMessageServiceWithCache#findLatestReleaseMessagesGroupByMessages
    @Service
    public class ReleaseMessageServiceWithCache implements ReleaseMessageListener, InitializingBean {
    
      private ConcurrentMap releaseMessageCache;
    
      ...
      public List findLatestReleaseMessagesGroupByMessages(Set messages) {
        // messages格式:appId_cluster_namespace
        if (CollectionUtils.isEmpty(messages)) {
          return Collections.emptyList();
        }
        List releaseMessages = Lists.newArrayList();
    
        // 此处的message命名为namespaces更合适
        for (String message : messages) {
          // 获取缓存中namespace的版本信息
          ReleaseMessage releaseMessage = releaseMessageCache.get(message);
          if (releaseMessage != null) {
            releaseMessages.add(releaseMessage);
          }
        }
    
        return releaseMessages;
      }
      ...
        
    }
    

    1.2、刷新ReleaseMessage缓存

    1.3.1、逻辑描述

    更新 ReleaseMessages,管理 releaseMessageCache,其中键为 appId_cluster_namespace,值为通知编号 notificationId。

    1.3.2、代码位置

    1.3.2.1、ReleaseMessageServiceWithCache#afterPropertiesSet
    @Service
    public class ReleaseMessageServiceWithCache implements ReleaseMessageListener, InitializingBean {
    
      // 维护空间最新的,结构为appId_cluster_namespace : notificationId
      private ConcurrentMap releaseMessageCache;
    
      private AtomicBoolean doScan;
      private ExecutorService executorService;
    
      private void initialize() {
        releaseMessageCache = Maps.newConcurrentMap();
        doScan = new AtomicBoolean(true);
        // 此处初始化的是单线程的线程池,不带定时任务。
        executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory
            .create("ReleaseMessageServiceWithCache", true));
      }
    
      public List findLatestReleaseMessagesGroupByMessages(Set messages) {
        // messages格式:appId_cluster_namespace
        if (CollectionUtils.isEmpty(messages)) {
          return Collections.emptyList();
        }
        List releaseMessages = Lists.newArrayList();
    
        for (String message : messages) {
          ReleaseMessage releaseMessage = releaseMessageCache.get(message);
          if (releaseMessage != null) {
            releaseMessages.add(releaseMessage);
          }
        }
    
        return releaseMessages;
      }
    
      @Override
      public void handleMessage(ReleaseMessage message, String channel) {
        //Could stop once the ReleaseMessageScanner starts to work
        doScan.set(false);
        logger.info("message received - channel: {}, message: {}", channel, message);
    
        String content = message.getMessage();
        Tracer.logEvent("Apollo.ReleaseMessageService.UpdateCache", String.valueOf(message.getId()));
        if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
          return;
        }
    
        // 计算本地ReleaseMessageId与触发handleMessage的ReleaseMessageId的gap
        long gap = message.getId() - maxIdScanned;
        // 如果gap等于1,直接合并
        if (gap == 1) {
          mergeReleaseMessage(message);
        } else if (gap > 1) {
          //gap found!
          // 如果gap大于1,加载gap间缺失的ReleaseMessage查询出来,并将新查询出的ReleaseMessages与历史数据进行比较,releaseMessageCache中维护最新的ReleaseMessageId(notificationId)
          loadReleaseMessages(maxIdScanned);
        }
      }
    
      @Override
      public void afterPropertiesSet() throws Exception {
        // 读取配置
        populateDataBaseInterval();
        //block the startup process until load finished
        //this should happen before ReleaseMessageScanner due to autowire
        // 初始化,拉取ReleaseMessages
        loadReleaseMessages(0);
        // 异步拉取增量ReleaseMessages。(注意:这里executorService不是定时任务,而是单线程的线程池)
        // 目的:处理初始化时拉取ReleaseMessages产生的遗漏问题
        // 这里可以理解为fix(可以不关注)
        executorService.submit(() -> {
          while (doScan.get() && !Thread.currentThread().isInterrupted()) {
            Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageServiceWithCache",
                "scanNewReleaseMessages");
            try {
              // 加载ReleaseMessages
              loadReleaseMessages(maxIdScanned);
              transaction.setStatus(Transaction.SUCCESS);
            } catch (Throwable ex) {
              transaction.setStatus(ex);
              logger.error("Scan new release messages failed", ex);
            } finally {
              transaction.complete();
            }
            try {
              scanIntervalTimeUnit.sleep(scanInterval);
            } catch (InterruptedException e) {
              //ignore
            }
          }
        });
      }
    
      /**
       *
       * @param releaseMessage
       */
      private synchronized void mergeReleaseMessage(ReleaseMessage releaseMessage) {
        ReleaseMessage old = releaseMessageCache.get(releaseMessage.getMessage());
        // 判断当前ReleaseMessages的id是否大于历史ReleaseMessages的id,如果大于则更新缓存
        if (old == null || releaseMessage.getId() > old.getId()) {
          // message 内容为: appId_cluster_namespace
          releaseMessageCache.put(releaseMessage.getMessage(), releaseMessage);
          maxIdScanned = releaseMessage.getId();
        }
      }
    
      private void loadReleaseMessages(long startId) {
        boolean hasMore = true;
        while (hasMore && !Thread.currentThread().isInterrupted()) {
          //current batch is 500
          // 此处逻辑和AppNamespaceServiceWithCache的一样
          // 批量获取大于startId的500条ReleaseMessages数据(返回升序)
          // 思考:需要扫描才能知道最新的消息ID,这样的设计不太好
          List releaseMessages = releaseMessageRepository
              .findFirst500ByIdGreaterThanOrderByIdAsc(startId);
          if (CollectionUtils.isEmpty(releaseMessages)) {
            break;
          }
          // 将新查询出的ReleaseMessages与历史数据进行比较,releaseMessageCache中维护最新的ReleaseMessageId(notificationId)
          releaseMessages.forEach(this::mergeReleaseMessage);
          // 获取新的startId,作为当前的最新数据标记,便于后续在此startId基础上拉取后续新的ReleaseMessages
          int scanned = releaseMessages.size();
          startId = releaseMessages.get(scanned - 1).getId();
          // 当拉取数据(scanned)大于500时,说明后续还有数据,则继续执行进入while中,否则退出
          hasMore = scanned == 500;
          logger.info("Loaded {} release messages with startId {}", scanned, startId);
        }
      }
    

    2、变更推送

    Admin Service将发布后的配置,通过消息的方式发送给Config Service,然后Config Service通知对应的Client。此处可以通过消息中间件来实现消息的生产与发现,但考虑到一个中间件的引入的同时也会带来很多不确定性隐患,所以通过数据库的方式实现消息的生产与消费。

    2.1、触发变更

    2.1.1、逻辑描述

    用户的操作发布后,通过AdminService向数据库中的ReleaseMessage表插入配置变更通知编号。

    2.1.2、代码位置

    2.1.3.1、DatabaseMessageSender#sendMessage
    @Component
    public class DatabaseMessageSender implements MessageSender {
      private BlockingQueue toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
      private final ExecutorService cleanExecutorService;
    
      private final ReleaseMessageRepository releaseMessageRepository;
    
      public DatabaseMessageSender(final ReleaseMessageRepository releaseMessageRepository) {
        cleanExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("DatabaseMessageSender", true));
        cleanStopped = new AtomicBoolean(false);
        this.releaseMessageRepository = releaseMessageRepository;
      }
    
      @Override
      @Transactional
      public void sendMessage(String message, String channel) {
        logger.info("Sending message {} to channel {}", message, channel);
        // 只发布APOLLO_RELEASE_TOPIC的数据
        if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
          logger.warn("Channel {} not supported by DatabaseMessageSender!", channel);
          return;
        }
    
        Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
        Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
        try {
          // 存储 ReleaseMessage
          ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
          // 添加到阻塞阻塞队列中,成功失败都会立即返回(会在初始化时,开启一个线程处理toClean)
          toClean.offer(newMessage.getId());
          transaction.setStatus(Transaction.SUCCESS);
        } catch (Throwable ex) {
          logger.error("Sending message to database failed", ex);
          transaction.setStatus(ex);
          throw ex;
        } finally {
          transaction.complete();
        }
      }
      ...
    }
    
    2.1.3.2、ReleaseMessage
    @Entity
    @Table(name = "ReleaseMessage")
    public class ReleaseMessage {
      // 自增ID
      @Id
      @GeneratedValue(strategy = GenerationType.IDENTITY)
      @Column(name = "Id")
      private long id;
    
      // message 内容为: appId_cluster_namespace
      @Column(name = "Message", nullable = false)
      private String message;
    
      // 更新时间
      @Column(name = "DataChange_LastTime")
      private Date dataChangeLastModifiedTime;
    }
    

    2.2、感知变更

    2.2.1、逻辑描述

    在bean初始化结束后执行ReleaseMessageScanner#afterPropertiesSet函数中的操作,定时扫描数据库,获取最大的ReleaseMessageId。一旦有新的ReleaseMessage就会立即通过fireMessageScanned通知监听器。

    2.2.2、时序图

    2.2.3、代码位置

    2.2.3.1、ReleaseMessageScanner#afterPropertiesSet
    public class ReleaseMessageScanner implements InitializingBean {
      @Autowired
      private ReleaseMessageRepository releaseMessageRepository;
      private int databaseScanInterval;
      private final List listeners;
      private final ScheduledExecutorService executorService;
      private final Map missingReleaseMessages; // missing release message id => age counter
      private long maxIdScanned;
    
      public ReleaseMessageScanner() {
        listeners = Lists.newCopyOnWriteArrayList();
        executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
            .create("ReleaseMessageScanner", true));
        missingReleaseMessages = Maps.newHashMap();
      }
    
      @Override
      public void afterPropertiesSet() throws Exception {
        databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
        // 获取最大的ReleaseMessageId
        maxIdScanned = loadLargestMessageId();
        executorService.scheduleWithFixedDelay(() -> {
          Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
          try {
            scanMissingMessages();
            // 扫描数据库中的ReleaseMessage
            scanMessages();
            transaction.setStatus(Transaction.SUCCESS);
          } catch (Throwable ex) {
            transaction.setStatus(ex);
            logger.error("Scan and send message failed", ex);
          } finally {
            transaction.complete();
          }
        }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
    
      }
    
      /**
       * Scan messages, continue scanning until there is no more messages
       */
      private void scanMessages() {
        boolean hasMoreMessages = true;
        while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
          // 循环获取最新ReleaseMessage
          hasMoreMessages = scanAndSendMessages();
        }
      }
    
      /**
       * scan messages and send
       *
       * @return whether there are more messages
       */
      private boolean scanAndSendMessages() {
        // 批量获取500条ReleaseMessage数据(升序)
        //current batch is 500
        List releaseMessages =
            releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
        if (CollectionUtils.isEmpty(releaseMessages)) {
          return false;
        }
        // 通知Listener,触发监听器
        fireMessageScanned(releaseMessages);
        int messageScanned = releaseMessages.size();
        long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
        // check id gaps, possible reasons are release message not committed yet or already rolled back
        if (newMaxIdScanned - maxIdScanned > messageScanned) {
          recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
        }
        maxIdScanned = newMaxIdScanned;
        // 一次拉取500条数据,如果超过500条则分配多次拉取
        return messageScanned == 500;
      }
    
      private void scanMissingMessages() {
        Set missingReleaseMessageIds = missingReleaseMessages.keySet();
        Iterable releaseMessages = releaseMessageRepository
            .findAllById(missingReleaseMessageIds);
        fireMessageScanned(releaseMessages);
        releaseMessages.forEach(releaseMessage -> {
          missingReleaseMessageIds.remove(releaseMessage.getId());
        });
        growAndCleanMissingMessages();
      }
    
      /**
       * Notify listeners with messages loaded
       * @param messages
       */
      private void fireMessageScanned(Iterable messages) {
        for (ReleaseMessage message : messages) {
          for (ReleaseMessageListener listener : listeners) {
            try {
              // 通知Listener,触发监听器
              listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
            } catch (Throwable ex) {
              Tracer.logError(ex);
              logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
            }
          }
        }
      }
    }
    

    2.3、推送变更

    2.3.1、逻辑描述

    向与ConfigSerivce建立监听长轮询的Client端推送变更的配置信息,为了避免“惊群效应”出现,会使用线程池分批进行消息推送。

    2.3.3、代码位置

    2.3.3.1、NotificationControllerV2#handleMessage
    @RestController
    @RequestMapping("/notifications/v2")
    public class NotificationControllerV2 implements ReleaseMessageListener {
    
        @Override
        public void handleMessage(ReleaseMessage message, String channel) {
            logger.info("message received - channel: {}, message: {}", channel, message);
    
            String content = message.getMessage();
            Tracer.logEvent("Apollo.LongPoll.Messages", content);
            if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
                return;
            }
    
            // 获取对应namespace
            String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
    
            if (Strings.isNullOrEmpty(changedNamespace)) {
                logger.error("message format invalid - {}", content);
                return;
            }
    
            if (!deferredResults.containsKey(content)) {
                return;
            }
    
            //create a new list to avoid ConcurrentModificationException
            List results = Lists.newArrayList(deferredResults.get(content));
    
            ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
            configNotification.addMessage(content, message.getId());
    
            //do async notification if too many clients
            // 使用线程池分批进行消息推送,避免“惊群效应”出现
            if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
                largeNotificationBatchExecutorService.submit(() -> {
                    logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
                            bizConfig.releaseMessageNotificationBatch());
                    for (int i = 0; i < results.size(); i++) {
                        if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
                            try {
                                TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
                            } catch (InterruptedException e) {
                                //ignore
                            }
                        }
                        logger.debug("Async notify {}", results.get(i));
                        results.get(i).setResult(configNotification);
                    }
                });
                return;
            }
    
            logger.debug("Notify {} clients for key {}", results.size(), content);
    
            // 将变更消息设置进DeferredResult中
            for (DeferredResultWrapper result : results) {
                result.setResult(configNotification);
            }
            logger.debug("Notification completed");
        }
    }
    

    3、配置拉取

    3.1、构建缓存

    3.1.1、逻辑描述

    为了降低数据库的查询压力,会将热点数据缓存至GuavaCache中。ConfigServiceWithCache会构建两个缓存,分别是configCache和configIdCache,其中configCache可以通过空间名称查询具体配置,configIdCache可以通过通知编号(notificationId)查询具体配置。

    3.1.2、代码位置

    3.1.2.1、ConfigServiceWithCache#initialize

    public class ConfigServiceWithCache extends AbstractConfigService {
    
      ...
    
      private LoadingCache configCache;
    
      private LoadingCache> configIdCache;
    
      /**
       * 初始化配置加载
       */
      @PostConstruct
      void initialize() {
        // key为namespace,value为通知编号(notificationId)及其对应的配置信息(Release)
        // 访问后配置缓存有效时间为60分钟
        configCache = CacheBuilder.newBuilder()
            .expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
            .build(new CacheLoader() {
              @Override
              public ConfigCacheEntry load(String key) throws Exception {
                List namespaceInfo = STRING_SPLITTER.splitToList(key);
                if (namespaceInfo.size() != 3) {
                  Tracer.logError(
                      new IllegalArgumentException(String.format("Invalid cache load key %s", key)));
                  return nullConfigCacheEntry;
                }
    
                Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD, key);
                try {
                  // 加载ReleaseMessage对象,具体就是通知编号notificationId,ReleaseMessage中包含参数有id、message(内容:appId_cluster_namespace)、dataChangeLastModifiedTime
                  ReleaseMessage latestReleaseMessage = releaseMessageService.findLatestReleaseMessageForMessages(Lists
                      .newArrayList(key));
                  // 获取Release 最新配置信息
                  Release latestRelease = releaseService.findLatestActiveRelease(namespaceInfo.get(0), namespaceInfo.get(1),
                      namespaceInfo.get(2));
    
                  transaction.setStatus(Transaction.SUCCESS);
    
                  // 获取通知编号
                  long notificationId = latestReleaseMessage == null ? ConfigConsts.NOTIFICATION_ID_PLACEHOLDER : latestReleaseMessage
                      .getId();
    
                  if (notificationId == ConfigConsts.NOTIFICATION_ID_PLACEHOLDER && latestRelease == null) {
                    return nullConfigCacheEntry;
                  }
    
                  // 缓存key为通知编号(notificationId),value为对应的具体配置信息(Release)
                  return new ConfigCacheEntry(notificationId, latestRelease);
                } catch (Throwable ex) {
                  transaction.setStatus(ex);
                  throw ex;
                } finally {
                  transaction.complete();
                }
              }
            });
        // key为通知编号(notificationId),value为具体配置信息
        configIdCache = CacheBuilder.newBuilder()
            .expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
            .build(new CacheLoader>() {
              @Override
              public Optional load(Long key) throws Exception {
                Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD_ID, String.valueOf(key));
                try {
                  // key为notificationId,value为具体配置
                  Release release = releaseService.findActiveOne(key);
    
                  transaction.setStatus(Transaction.SUCCESS);
    
                  return Optional.ofNullable(release);
                } catch (Throwable ex) {
                  transaction.setStatus(ex);
                  throw ex;
                } finally {
                  transaction.complete();
                }
              }
            });
      }
    }
    

    3.2、查询配置

    3.2.1、逻辑描述

    通过前序长轮询流程通知,获取的namespace对应的最新通知编号(notificationId),来查询最新配置。

    3.2.2、时序图

    3.2.3、代码位置

    3.2.3.1、ConfigController#queryConfig
    public class ConfigController {
        
      private final ConfigService configService;
      private final AppNamespaceServiceWithCache appNamespaceService;
      
      ...
    
      @GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
      public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
                                      @PathVariable String namespace,
                                      @RequestParam(value = "dataCenter", required = false) String dataCenter,
                                      @RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
                                      @RequestParam(value = "ip", required = false) String clientIp,
                                      @RequestParam(value = "messages", required = false) String messagesAsString,
                                      HttpServletRequest request, HttpServletResponse response) throws IOException {
        String originalNamespace = namespace;
        //strip out .properties suffix
        namespace = namespaceUtil.filterNamespaceName(namespace);
        //fix the character case issue, such as FX.apollo <-> fx.apollo
        namespace = namespaceUtil.normalizeNamespace(appId, namespace);
    
        if (Strings.isNullOrEmpty(clientIp)) {
          clientIp = tryToGetClientIp(request);
        }
    
        // 反序列化
        ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);
    
        List releases = Lists.newLinkedList();
    
        String appClusterNameLoaded = clusterName;
        if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
          //(核心逻辑,重点关注)加载配置信息
          Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
              dataCenter, clientMessages);
    
          if (currentAppRelease != null) {
            releases.add(currentAppRelease);
            //we have cluster search process, so the cluster name might be overridden
            appClusterNameLoaded = currentAppRelease.getClusterName();
          }
        }
    
        //if namespace does not belong to this appId, should check if there is a public configuration
        // 如果namespace不属于当前appId,而是属于公共的配置文件。具体应用场景,就是应用A向共享自己的配置给其他应用,就可以将其自身的配置文件设置成public类型的
        if (!namespaceBelongsToAppId(appId, namespace)) {
          Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
              dataCenter, clientMessages);
          if (Objects.nonNull(publicRelease)) {
            releases.add(publicRelease);
          }
        }
    
        if (releases.isEmpty()) {
          response.sendError(HttpServletResponse.SC_NOT_FOUND,
              String.format(
                  "Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
                  appId, clusterName, originalNamespace));
          Tracer.logEvent("Apollo.Config.NotFound",
              assembleKey(appId, clusterName, originalNamespace, dataCenter));
          return null;
        }
    
        auditReleases(appId, clusterName, dataCenter, clientIp, releases);
    
        // 格式是:私有的ReleaseKey1+私有的ReleaseKey2+public的ReleaseKey1+public的ReleaseKey1
        String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
                .collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
    
        // Client端上的ReleaseKey与Server端key相同,则没有配置没有变更
        if (mergedReleaseKey.equals(clientSideReleaseKey)) {
          // Client side configuration is the same with server side, return 304
          response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
          Tracer.logEvent("Apollo.Config.NotModified",
              assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
          return null;
        }
    
        ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
            mergedReleaseKey);
        // 合并配置信息
        apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));
    
        Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
            originalNamespace, dataCenter));
        return apolloConfig;
      }
    }
    
    3.2.3.2、AbstractConfigService#loadConfig
    public abstract class AbstractConfigService implements ConfigService {
      @Autowired
      private GrayReleaseRulesHolder grayReleaseRulesHolder;
    
      /**
       * 加载配置
       * @return
       */
      @Override
      public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName,
          String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) {
        // load from specified cluster first
        // 从指定cluster拉取配置
        if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) {
          // 查找配置
          Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace,
              clientMessages);
    
          if (Objects.nonNull(clusterRelease)) {
            return clusterRelease;
          }
        }
    
        // try to load via data center
        // 从指定的dataCenter的cluster加载配置
        if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {
          Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace,
              clientMessages);
          if (Objects.nonNull(dataCenterRelease)) {
            return dataCenterRelease;
          }
        }
    
        // fallback to default release
        // 不指定,走默认
        return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace,
            clientMessages);
      }
    
      /**
       * 查找配置信息
       */
      private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,
          String configNamespace, ApolloNotificationMessages clientMessages) {
        // 获取namespace的灰度发布的配置编号
        Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId,
            configClusterName, configNamespace);
    
        Release release = null;
    
        // 通过灰度的配置编号获取具体配置信息
        if (grayReleaseId != null) {
          release = findActiveOne(grayReleaseId, clientMessages);
        }
    
        // 如果没有灰度发布的信息,则直接获取namespace最新的配置信息
        if (release == null) {
          //(核心逻辑,重点关注)通过appId + cluster + namespace,拉取最新配置信息
          release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);
        }
    
        return release;
      }
    }
    
    3.2.3.3、ConfigServiceWithCache#findLatestActiveRelease

    从Cache或DB中查询namespace的最新配置信息

    public class ConfigServiceWithCache extends AbstractConfigService {
    
      ...
        
      private LoadingCache configCache;
    
      private LoadingCache> configIdCache;
    
      @Override
      protected Release findActiveOne(long id, ApolloNotificationMessages clientMessages) {
        Tracer.logEvent(TRACER_EVENT_CACHE_GET_ID, String.valueOf(id));
        return configIdCache.getUnchecked(id).orElse(null);
      }
    
      /**
       * (核心逻辑,重点关注)拉取数据,本地缓存没有就拉取DB
       * @return
       */
      @Override
      protected Release findLatestActiveRelease(String appId, String clusterName, String namespaceName,
                                                ApolloNotificationMessages clientMessages) {
        String key = ReleaseMessageKeyGenerator.generate(appId, clusterName, namespaceName);
    
        Tracer.logEvent(TRACER_EVENT_CACHE_GET, key);
    
        // 获取namespace对应的缓存配置信息,此处key为(appId+clusterName+namespaceName),获取到的信息包括通知编号(notificationId和具体配置信息Release)
        ConfigCacheEntry cacheEntry = configCache.getUnchecked(key);
    
        //cache is out-dated
        // 缓存过期
        if (clientMessages != null && clientMessages.has(key) &&
            clientMessages.get(key) > cacheEntry.getNotificationId()) {
          //invalidate the cache and try to load from db again
          // 清除缓存(guava cache)
          invalidate(key);
          // 重新从DB中拉取缓存,获取该namespace下的最新通知编号(notificationId)及其对应的配置信息
          cacheEntry = configCache.getUnchecked(key);
        }
    
        // 如果缓存的版本信息大于当前客户端所上传的版本,则直接返回最新配置信息
        return cacheEntry.getRelease();
      }
    
      private void invalidate(String key) {
        configCache.invalidate(key);
        Tracer.logEvent(TRACER_EVENT_CACHE_INVALIDATE, key);
      }
    
      @Override
      public void handleMessage(ReleaseMessage message, String channel) {
        logger.info("message received - channel: {}, message: {}", channel, message);
        if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message.getMessage())) {
          return;
        }
    
        try {
          // 清除缓存
          invalidate(message.getMessage());
    
          //warm up the cache
          // 重新从DB中拉取缓存,获取该namespace下的最新通知编号(notificationId)及其对应的配置信息
          configCache.getUnchecked(message.getMessage());
        } catch (Throwable ex) {
          //ignore
        }
      }
    
      private static class ConfigCacheEntry {
        private final long notificationId;
        private final Release release;
    
        public ConfigCacheEntry(long notificationId, Release release) {
          this.notificationId = notificationId;
          this.release = release;
        }
    
        public long getNotificationId() {
          return notificationId;
        }
    
        public Release getRelease() {
          return release;
        }
      }
    }
    

    四、最后

    《码头工人的一千零一夜》是一位专注于技术干货分享的博主,追随博主的文章,你将深入了解业界最新的技术趋势,以及在Java开发和安全领域的实用经验分享。无论你是开发人员还是对逆向工程感兴趣的爱好者,都能在《码头工人的一千零一夜》找到有价值的知识和见解。

    懂得不多,做得太少。欢迎批评、指正。

  • 相关阅读:
    webpack--插件
    UniPro集成华为云WeLink 为企业客户构建互为联接的协作平台
    ROS系列(二):rosbag 中提取视频数据
    融合模型权限管理设计方案
    记录一次典型oom的处理过程
    ADS8866 ADC转换芯片驱动调试
    想持续“遥遥领先”,中国需要自己的光刻胶
    AWS 网络
    金仓数据库KingbaseES服务器应用参考手册--2. initdb
    【历史上的今天】9 月 16 日:乔布斯的归来;苹果崛起;易语言发布
  • 原文地址:https://www.cnblogs.com/boycelee/p/18005318