• 【Nacos】Nacos服务注册服务端源码分析(一)


    上篇简单看了下Nacos客户端在服务注册时做了什么
    本篇开始分析Nacos在服务注册时,服务端的相关逻辑。

    建议先阅读这篇文章:支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型

    回顾一下,上篇我们看了Nacos在服务注册时,客户端的相关源码。Nacos2.X通过grpc支持了长链接,那么客户端发起调用,肯定就有一个grpc的服务端在接收请求。那么就从这个grpc的相关代码看起~

    grpc server

    abstract class BaseRpcServernacos-core中一个抽象类,有一个@PostConstruct 修饰的start方法。

        @PostConstruct
        public void start() throws Exception {
            String serverName = getClass().getSimpleName();
            String tlsConfig = JacksonUtils.toJson(grpcServerConfig);
            Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);
            
            //启动grpc服务端
            startServer();
        
            Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);
            
            //钩子函数:处理退出信号
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
                try {
                    BaseRpcServer.this.stopServer();
                    Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
                } catch (Exception e) {
                    Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
                }
            }));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    这个startServer()是一个抽象方法,我们看下其实现。

        /**
         * Start sever.
         *
         * @throws Exception exception throw if start server fail.
         */
        public abstract void startServer() throws Exception;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    追踪代码发现,这个方法是当前类BaseRpcServer的子类BaseGrpcServer实现的

    public abstract class BaseGrpcServer extends BaseRpcServer
    
    • 1

    看下startServer()的代码:

        @Override
        public void startServer() throws Exception {
            final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
            
            //注册服务
            addServices(handlerRegistry, new GrpcConnectionInterceptor());
            
            NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
    
            if (grpcServerConfig.getEnableTls()) {
                if (grpcServerConfig.getCompatibility()) {
                    builder.protocolNegotiator(new OptionalTlsProtocolNegotiator(getSslContextBuilder()));
                } else {
                    builder.sslContext(getSslContextBuilder());
                }
            }
    
            server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
                    .compressorRegistry(CompressorRegistry.getDefaultInstance())
                    .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
                    .addTransportFilter(new AddressTransportFilter(connectionManager))
                    .keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
                    .keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
                    .permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS)
                    .build();
    
    		//启动grpc的server服务
            server.start();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    研读框架源码时,先不要陷入细节当中,第一遍梳理清楚整个框架的即可,关于grpc-server就先看到这里。

    上文中我们提到了抽象类BaseRpcServer,简单分析下这个类。

    BaseRpcServer

    在这里插入图片描述
    BaseRpcServerBaseGrpcServer 都是抽象类,GrpcClusterServerGrpcSdkServer都是抽象实现类,并且这两个实现类都有@Service注解标注,那么就意味着这两个类会被注册为spring bean 。
    上文我们提过BaseRpcServer.start()有一个@PostConstruct注解,那么也就意味着具体调用时使用了GrpcClusterServerGrpcSdkServer的任何一个类,都会去调用BaseRpcServer.start()方法去启动grpc-server

    GrpcClusterServerGrpcSdkServer的区别

    从名字上可以看出,一个是Cluster服务调用,一个是SDK调用。
    那么客户端注册使用的是哪个Server?
    BaseRpcServer中定义了一个获取端口偏移量的方法:

        /**
         * the increase offset of nacos server port for rpc server port.
         *
         * @return delta port offset of main port.
         */
        public abstract int rpcPortOffset();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    GrpcSdkServer对此给出的实现是返回一个常量定义:

    public static final Integer SDK_GRPC_PORT_DEFAULT_OFFSET = 1000;
    
    • 1

    GrpcClusterServer给出的常量中定义的端口是CLUSTER_GRPC_PORT_DEFAULT_OFFSET = 1001

    即然定义了端口,那么这个常量在创建这两个server的时候肯定会用到,我们通过这个常量去寻找到调用方,自然而然也就找到了server创建的逻辑,进而找到这两个server使用上的区别。

    通过常量Constants.SDK_GRPC_PORT_DEFAULT_OFFSET发现一个新的类GrpcSdkClientrpcPortOffset()方法使用了这个常量。
    在这里插入图片描述
    在这里插入图片描述

    通过观察构造方法的调用,我们找到了RpcClient的创建工厂RpcClientFactory

    	//本地缓存,存储client,key为clientName
    	private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();
    
        public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,
                                             Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {
    
            if (!ConnectionType.GRPC.equals(connectionType)) {
                throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
            }
            
            //如果当前clientName不存在,那么执行GrpcSdkClient的创建逻辑
            return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
                LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
                try {
                    return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
                } catch (Throwable throwable) {
                    LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
                    throw throwable;
                }
                
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    那么再看下上边源码中createClient的调用方是谁,查看代码得知分别有两个调用方,ClientWorkerNamingGrpcClientProxy,第二个调用方是不是有点熟悉?
    对的,我们在上篇文章看客户端注册服务的代码时,看到过如下代码:

    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
    
    private NamingClientProxy getExecuteClientProxy(Instance instance) {
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    那么这个grpcClientProxy是哪一个实现呢?

        @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
            NamingUtils.checkInstanceIsLegal(instance);
            clientProxy.registerService(serviceName, groupName, instance);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    再次观察创建服务实例的代码,我们可以看到在实际的注册服务时,使用了一个客户端代理clientProxy来做处理,我们来看下这个代理的是怎么创建的。

        private NamingClientProxy clientProxy;
    
        private void init(Properties properties) throws NacosException {
            final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
            
            ValidatorUtils.checkInitParam(nacosClientProperties);
            this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
            InitUtils.initSerialization();
            InitUtils.initWebRootContext(nacosClientProperties);
            initLogName(nacosClientProperties);
        
            this.notifierEventScope = UUID.randomUUID().toString();
            this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
            NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
            NotifyCenter.registerSubscriber(changeNotifier);
            this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
    		//一目了然
            this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    this.clientProxy = new NamingClientProxyDelegate,可以看到这个代理实际上是一个创建了一个委托类,那继续看下这个委托类的构造方法。

        public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,
                InstancesChangeNotifier changeNotifier) throws NacosException {
            this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
                    changeNotifier);
            this.serverListManager = new ServerListManager(properties, namespace);
            this.serviceInfoHolder = serviceInfoHolder;
            this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),
                    NamingHttpClientManager.getInstance().getNacosRestTemplate());
            initSecurityProxy(properties);
            this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
            //终于,我们找到了NamingGrpcClientProxy
            this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
                    serviceInfoHolder);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    回过头来再次看NacosNamingServieregisterInstance方法。

        @Override
        public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
            NamingUtils.checkInstanceIsLegal(instance);
            clientProxy.registerService(serviceName, groupName, instance);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们已经知道了clientProxyNamingClientProxyDelegate,那么就看下它是如何实现registerService即可。

    别着急,谜底马上揭开😎

        @Override
        public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
            getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
        }
    
        private NamingClientProxy getExecuteClientProxy(Instance instance) {
            return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    instance.封装了客户端要注册的服务,ephemeral默认为true。所以默认会选择grpcClientProxy去注册服务。

    this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
                    serviceInfoHolder);
    
    • 1
    • 2

    回答我们自己提出的问题:GrpcClusterServerGrpcSdkServer的区别?
    后者是客户端注册使用的,那么前者肯定就是服务内部调用使用的了。

    总结

    梳理关键类和方法

    • 客户端
      • NacosDiscoveryAutoRegister
        • 是一个ApplicationListener,监听WebServerInitializedEvent事件后,注册当前实例
      • NacosNamingService
        • private void init(Properties properties) throws NacosException
          • 初始化时,定义了创建客户端的代理委托类clientProxyNamingClientProxyDelegate
        • registerInstance
          • 使用clientProxy注册服务
      • NamingClientProxyDelegate
        • 构造方法注入: grpcClientProxy = new NamingGrpcClientProxy
        • instance.isEphemeral() ? grpcClientProxy : httpClientProxy
      • NamingGrpcClientProxy
        • registerService
        • requestToServer
          • this.rpcClient.request(request) 这里的rpcClient其实就是GrpcSdkClient
    • 服务端
      • RpcClientFactory
        • createClient :返回GrpcSdkClient
      • GrpcSdkClient
        • 未完待续…

    本篇很大篇幅上讲了服务注册时,服务端grpc-server相关的一些逻辑。从中可以看到服务端使用了很多委托类、代理类来抽象、封装相关业务逻辑,所以刚开始看框架源码如果一头雾水的时候,不要着急。抓大放小,先建立整体认知后,再回过头来深入细节。

    下篇从源码上深入服务端在注册服务时的业务细节。

    下班!🕶️

  • 相关阅读:
    认识物联网
    umich cv-2-1
    GitHub神坛变动,10W字Spring Cloud Alibaba笔记,30W星标登顶第一
    网络基础(一)
    视频声音怎么翻译?这几个办法教你实现视频声音翻译成中文
    C语言实现各类排序算法
    解除word文档限制,快速操作,步骤简单,不可错过。
    如何在小程序中设置页面显示的文字
    react(Hooks)实现国际化
    AD9361寄存器功能笔记之本振频率设定
  • 原文地址:https://blog.csdn.net/Paranoia_ZK/article/details/132899139