• Nacos配置中心集群原理及源码分析


    Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

    下面这个图,表示Nacos集群的部署图。

    Nacos集群工作原理

    Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

    Nacos的数据存储分为两部分

    /data/program/nacos-1/data/config-data/${GROUP}
    

    在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

    这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

    当配置发生变更时:

    1. Nacos会把变更的配置保存到数据库,然后再写入本地文件。
    2. 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

    另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

    配置变更同步入口

    当配置发生修改、删除、新增操作时,通过发布一个 notifyConfigChange 事件。

    1. @PostMapping
    2. @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    3. public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
    4. @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
    5. @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
    6. @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
    7. @RequestParam(value = "appName", required = false) String appName,
    8. @RequestParam(value = "src_user", required = false) String srcUser,
    9. @RequestParam(value = "config_tags", required = false) String configTags,
    10. @RequestParam(value = "desc", required = false) String desc,
    11. @RequestParam(value = "use", required = false) String use,
    12. @RequestParam(value = "effect", required = false) String effect,
    13. @RequestParam(value = "type", required = false) String type,
    14. @RequestParam(value = "schema", required = false) String schema) throws NacosException {
    15. //省略..
    16. if (StringUtils.isBlank(betaIps)) {
    17. if (StringUtils.isBlank(tag)) {
    18. persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
    19. ConfigChangePublisher
    20. .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
    21. } else {
    22. persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
    23. ConfigChangePublisher.notifyConfigChange(
    24. new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
    25. }
    26. }//省略
    27. return true;
    28. }

    AsyncNotifyService

    配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

    1. @Autowired
    2. public AsyncNotifyService(ServerMemberManager memberManager) {
    3. this.memberManager = memberManager;
    4. // Register ConfigDataChangeEvent to NotifyCenter.
    5. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    6. // Register A Subscriber to subscribe ConfigDataChangeEvent.
    7. NotifyCenter.registerSubscriber(new Subscriber() {
    8. @Override
    9. public void onEvent(Event event) {
    10. // Generate ConfigDataChangeEvent concurrently
    11. if (event instanceof ConfigDataChangeEvent) {
    12. ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
    13. long dumpTs = evt.lastModifiedTs;
    14. String dataId = evt.dataId;
    15. String group = evt.group;
    16. String tenant = evt.tenant;
    17. String tag = evt.tag;
    18. Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
    19. // 构建NotifySingleTask,并添加到队列中。
    20. Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
    21. for (Member member : ipList) { //遍历集群中的每个节点
    22. queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
    23. evt.isBeta));
    24. }
    25. //异步执行任务 AsyncTask
    26. ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
    27. }
    28. }
    29. @Override
    30. public Classextends Event> subscribeType() {
    31. return ConfigDataChangeEvent.class;
    32. }
    33. });
    34. }

    AsyncTask

    1. @Override
    2. public void run() {
    3. executeAsyncInvoke();
    4. }
    5. private void executeAsyncInvoke() {
    6. while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空
    7. NotifySingleTask task = queue.poll(); //获取task
    8. String targetIp = task.getTargetIP(); //获取目标ip
    9. if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip
    10. // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
    11. //判断目标ip的健康状态
    12. boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
    13. if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。
    14. // target ip is unhealthy, then put it in the notification list
    15. ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
    16. task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
    17. 0, task.target);
    18. // get delay time and set fail count to the task
    19. asyncTaskExecute(task);
    20. } else {
    21. //构建header
    22. Header header = Header.newInstance();
    23. header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
    24. header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
    25. if (task.isBeta) {
    26. header.addParam("isBeta", "true");
    27. }
    28. AuthHeaderUtil.addIdentityToHeader(header);
    29. //通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法
    30. restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
    31. }
    32. }
    33. }
    34. }

    目标节点接收请求

    数据同步的请求地址为,task.url=
    http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

    1. @GetMapping("/dataChange")
    2. public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
    3. @RequestParam("group") String group,
    4. @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
    5. @RequestParam(value = "tag", required = false) String tag) {
    6. dataId = dataId.trim();
    7. group = group.trim();
    8. String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    9. long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    10. String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    11. String isBetaStr = request.getHeader("isBeta");
    12. if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
    13. dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    14. } else {
    15. //
    16. dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    17. }
    18. return true;
    19. }

    dumpService.dump用来实现配置的更新,代码如下

    当前任务会被添加到DumpTaskMgr中管理。

    1. public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
    2. boolean isBeta) {
    3. String groupKey = GroupKey2.getKey(dataId, group, tenant);
    4. String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    5. dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    6. DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
    7. }

    TaskManager.addTask, 先调用父类去完成任务添加。

    1. @Override
    2. public void addTask(Object key, AbstractDelayTask newTask) {
    3. super.addTask(key, newTask);
    4. MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
    5. }

    在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

    NacosDelayTaskExecuteEngine

    TaskManager的父类是
    NacosDelayTaskExecuteEngine,

    这个类中有一个成员属性 protected final ConcurrentHashMap tasks; ,专门来保存延期执行的任务类型AbstractDelayTask.

    在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

    1. public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    2. super(logger);
    3. tasks = new ConcurrentHashMap(initCapacity);
    4. processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    5. processingExecutor
    6. .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    7. }

    ProcessRunnable

    1. private class ProcessRunnable implements Runnable {
    2. @Override
    3. public void run() {
    4. try {
    5. processTasks();
    6. } catch (Throwable e) {
    7. getEngineLog().error(e.toString(), e);
    8. }
    9. }
    10. }

    processTasks

    1. protected void processTasks() {
    2. //获取所有的任务
    3. Collection<Object> keys = getAllTaskKeys();
    4. for (Object taskKey : keys) {
    5. AbstractDelayTask task = removeTask(taskKey);
    6. if (null == task) {
    7. continue;
    8. }
    9. //获取任务处理器,这里返回的是DumpProcessor
    10. NacosTaskProcessor processor = getProcessor(taskKey);
    11. if (null == processor) {
    12. getEngineLog().error("processor not found for task, so discarded. " + task);
    13. continue;
    14. }
    15. try {
    16. // ReAdd task if process failed
    17. //执行具体任务
    18. if (!processor.process(task)) {
    19. retryFailedTask(taskKey, task);
    20. }
    21. } catch (Throwable e) {
    22. getEngineLog().error("Nacos task execute error : " + e.toString(), e);
    23. retryFailedTask(taskKey, task);
    24. }
    25. }
    26. }

    DumpProcessor.process

    读取数据库的最新数据,然后更新本地缓存和磁盘。

  • 相关阅读:
    js 事件的委派
    applicationContext.getBeansOfType 获取一个接口下所有实现类 执行方法或者获取实现类对象等 操作应用场景学习总结
    为React Ant-Design Table增加字段设置 | 京东云技术团队
    全面讲解GRASP原则
    vue拖拽删除实现
    源码分析:Websocket 和前端交互
    中间件(nginx,网关)对性能的影响的测试
    【Python】约瑟夫环问题
    【AUTOSAR】【通信安全】E2EXf
    kubernetes popeye 巡检
  • 原文地址:https://blog.csdn.net/weixin_62710048/article/details/125872699