• dubbo源码解析之服务发现


    文章系列

    一、dubbo源码解析之框架粗谈
    二、dubbo源码解析之dubbo配置解析
    三、dubbo源码解析之服务发布与注册
    四、dubbo源码解析之服务发现
    五、dubbo源码解析之服务调用(通信)流程
    六、dubbo获取服务提供者IP列表

    一、DubboNamespaceHandler

    Spring 启动过程中,会扫描所有包目录 resources/META-INF/spring.handlers 文件,将其对应的 DubboNamespaceHandler 装载到 Spring IoC 容器中,并调用调用其中的 init() 方法,通过注册一个BeanDefinitionParser 解析器,完成Bean对象的注册,如下:

    spring.handlers

    http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
    http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
    
    • 1
    • 2

    DubboNamespaceHandler

    public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
    	@Override
        public void init() {
        	// 注册 Bean 定义解析器
            registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
            registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
            registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
            registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
            registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
            registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
            registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
            registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
            registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
            registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
            registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
            registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
            registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, true));
            registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    dubbo 会向 Spring IoC 容器中,注入以上 Bean 对象,关于上面 Bean 对象的各种作用,请参照【Dubbo配置及属性详解】,这里不作过多阐述。

    本系列文章参照 dubbo-2.7.17 进行源码分析。
    在这里插入图片描述

    二、ReferenceBean

    其中 registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, true)); 代码,会向Spring IoC 容器注入一个 ReferenceBean 对象,该对象用于服务发现。

    ReferenceBean 类结构图如下:
    在这里插入图片描述
    看到这里,熟悉 Spring 的小伙伴都知道,这里应用了 Spring 很多的扩展接口,并且继承了一个 ReferenceConfig 对象(注意:重点,后续服务发现会用到)。

    • ApplicationContextAware:用于获取 Spring ApplicationContext 应用上下文对象
    • FactoryBean:Spring 默认都是懒加载的,并且在进行依赖注入的时候,通过 BeanFactor 工厂创建一个 FactoryBean 对象,然后通过其 getObject() 方法,获取对象实例,进行注入。
    • InitializingBean: 初始化Bean对象,Spring 在装载当前对象后,会调用其 void afterPropertiesSet() 方法,完成一些初始化动作
    • DisposableBean:销毁Bean对象,在销毁该对象时,会调用其 void destroy() 方法释放资源

    dubbo 通过 Spring 的 FactoryBean 机制,将每个需要注入的Bean,都解析封装为一个 ReferenceBean 放入IoC容器中,然后通过 Spring 在进行依赖注入时,返回一个代理对象,完成服务的发现。

    public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
            ApplicationContextAware, InitializingBean, DisposableBean {
        
        @Override
        public Object getObject() {
        	// 获取对象实例,返回一个代理对象
            return get();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    三、服务引用时序图

    dubbo 官网有一个服务引用时序图,如下:
    在这里插入图片描述

    下面,我们根据 ReferenceConfigget() 方法具体看看做了什么。

    3.1 ReferenceConfig.get()

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    	// 获取服务实例:代理对象
    	public synchronized T get() {
            if (destroyed) {
                throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
            }
            // 判断引用对象是否为空
            if (ref == null) {
            	// 为空,进行初始化创建
                init();
            }
            return ref;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.2 ReferenceConfig.init()

    初始化创建一个代理对象。

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    	// 初始化
    	public synchronized void init() {
            if (initialized) {
                return;
            }
    
    
            if (bootstrap == null) {
                bootstrap = DubboBootstrap.getInstance();
                // compatible with api call.
                if (null != this.getRegistries()) {
                    bootstrap.registries(this.getRegistries());
                }
                // 初始化dubbo上下文
                bootstrap.initialize();
            }
    
            checkAndUpdateSubConfigs();
    		
    		// 检查是否存在本地存根
            checkStubAndLocal(interfaceClass);
            // 检查是否存在本地Mock数据
            ConfigValidationUtils.checkMock(interfaceClass, this);
    
    		/**************step1:参数组装 start**************/
            Map<String, String> map = new HashMap<String, String>();
            map.put(SIDE_KEY, CONSUMER_SIDE);
    
            ReferenceConfigBase.appendRuntimeParameters(map);
            if (!ProtocolUtils.isGeneric(generic)) {
                String revision = Version.getVersion(interfaceClass, version);
                if (revision != null && revision.length() > 0) {
                    map.put(REVISION_KEY, revision);
                }
    
                String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
                if (methods.length == 0) {
                    logger.warn("No method found in service interface " + interfaceClass.getName());
                    map.put(METHODS_KEY, ANY_VALUE);
                } else {
                    map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
                }
            }
            map.put(INTERFACE_KEY, interfaceName);
            AbstractConfig.appendParameters(map, getMetrics());
            AbstractConfig.appendParameters(map, getApplication());
            AbstractConfig.appendParameters(map, getModule());
            // remove 'default.' prefix for configs from ConsumerConfig
            // appendParameters(map, consumer, Constants.DEFAULT_KEY);
            AbstractConfig.appendParameters(map, consumer);
            AbstractConfig.appendParameters(map, this);
            MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
            if (metadataReportConfig != null && metadataReportConfig.isValid()) {
                map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
            }
            Map<String, AsyncMethodInfo> attributes = null;
            if (CollectionUtils.isNotEmpty(getMethods())) {
                attributes = new HashMap<>();
                for (MethodConfig methodConfig : getMethods()) {
                    AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                    String retryKey = methodConfig.getName() + ".retry";
                    if (map.containsKey(retryKey)) {
                        String retryValue = map.remove(retryKey);
                        if ("false".equals(retryValue)) {
                            map.put(methodConfig.getName() + ".retries", "0");
                        }
                    }
                    AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                    if (asyncMethodInfo != null) {
    //                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                        attributes.put(methodConfig.getName(), asyncMethodInfo);
                    }
                }
            }
    
            String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
            if (StringUtils.isEmpty(hostToRegistry)) {
                hostToRegistry = NetUtils.getLocalHost();
            } else if (isInvalidLocalHost(hostToRegistry)) {
                throw new IllegalArgumentException(
                        "Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
            }
            map.put(REGISTER_IP_KEY, hostToRegistry);
    
            serviceMetadata.getAttachments().putAll(map);
    		/**************step1:参数组装 end**************/
    
    		// step2:创建代理对象
            ref = createProxy(map);
    
            serviceMetadata.setTarget(ref);
            serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
            ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
            consumerModel.setProxyObject(ref);
            consumerModel.init(attributes);
    
            initialized = true;
    
            checkInvokerAvailable();
    
            // dispatch a ReferenceConfigInitializedEvent since 2.7.4
            dispatch(new ReferenceConfigInitializedEvent(this, invoker));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    可见,在 Reference.init() 中主要做了两件事:

    1. 参数组装(dubbo基于URL驱动)
    2. 创建代理对象

    3.3 Reference.createProxy()

    创建代理对象。

    public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
    	
    	// 创建代理对象
        private T createProxy(Map<String, String> map) {
        	// 是否采用Jvm引用
            if (shouldJvmRefer(map)) {
            	// 进行本地引用
                URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
                invoker = REF_PROTOCOL.refer(interfaceClass, url);
                if (logger.isInfoEnabled()) {
                    logger.info("Using injvm service " + interfaceClass.getName());
                }
            } else {
                urls.clear();
                // 用户指定的 URL,可以是点对点地址,也可以是注册中心的地址。
                if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                	// 点对点
                    String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                    if (us != null && us.length > 0) {
                        for (String u : us) {
                            URL url = URL.valueOf(u);
                            if (StringUtils.isEmpty(url.getPath())) {
                                url = url.setPath(interfaceName);
                            }
                            if (UrlUtils.isRegistry(url)) {
                                urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            } else {
                                urls.add(ClusterUtils.mergeUrl(url, map));
                            }
                        }
                    }
                } else { // assemble URL from register center's configuration
                    // if protocols not injvm checkRegistry
                    // 从注册中心获取
                    if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                        checkRegistry();
    
    					// 获取所有注册中心地址
                        List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                        if (CollectionUtils.isNotEmpty(us)) {
                            for (URL u : us) {
                                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                                if (monitorUrl != null) {
                                    map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                                }
                                urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                            }
                        }
                        if (urls.isEmpty()) {
                            throw new IllegalStateException(
                                    "No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
                                            " use dubbo version " + Version.getVersion() +
                                            ", please config  to your spring config.");
                        }
                    }
                }
    
    			// 只存在一个注册中心地址
                if (urls.size() == 1) {
                	// 使用不同协议,创建一个引用远程服务的invoker
                    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
                } else {
                	// 多注册中心场景
                    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                    URL registryURL = null;
                    for (URL url : urls) {
                        // For multi-registry scenarios, it is not checked whether each referInvoker is available.
                        // Because this invoker may become available later.
                        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
    
                        if (UrlUtils.isRegistry(url)) {
                            registryURL = url; // use last registry url
                        }
                    }
    
                    if (registryURL != null) { // registry url is available
                        // for multi-subscription scenario, use 'zone-aware' policy by default
                        String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                        // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                        invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                    } else { // not a registry url, must be direct invoke.
                        String cluster = CollectionUtils.isNotEmpty(invokers)
                                ?
                                (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
                                        Cluster.DEFAULT)
                                : Cluster.DEFAULT;
                        invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                    }
                }
            }
    
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
            }
    
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            MetadataUtils.publishServiceDefinition(consumerURL);
    
            // create service proxy
            // 创建接口代理类
            return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    所以,在 Reference.createProxy() 主要做了三件事:

    1. 获取所有注册中心地址
    2. 创建一个引用远程服务的invoker:REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    3. 创建接口代理类:PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic))

    1. 创建一个可调用 Service 的抽象

    创建一个可调用 Service 的抽象。

    REF_PROTOCOL.refer(interfaceClass, urls.get(0));

    其中 urls.get(0) 获取的是注册中心的URL,格式如下:

    registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService
    ?application=dubbo-consumer-demo
    &dubbo=2.0.2
    &id=org.apache.dubbo.config.RegistryConfig#0
    &pid=15640
    &qos.enable=false
    &refer=application%3Ddubbo-consumer-demo%26check%3Dfalse%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.example.demo.provider.DemoProvider%26metadata-type%3Dremote%26methods%3Dmethod1%2Cmethod2%26pid%3D15640%26qos.enable%3Dfalse%26register.ip%3D192.168.0.4%26release%3D2.7.14%26revision%3D1.0-SNAPSHOT%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1661669152381
    &registry=zookeeper
    &release=2.7.14
    &timeout=300000
    &timestamp=1661669152394
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    所以,ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension().refer() 最终调用的是 RegistryProtocol 中的 refer() 方法。

    RegistryProtocol.refer()

    基于注册中心,创建一个引用远程服务的invoker.

    public class RegistryProtocol implements Protocol {
    	// 
    	public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    		// step1:获取注册中心地址,例如以Zookeeper为注册中心,地址为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?省略其他参数...
            url = getRegistryUrl(url);
            
    		// step2:基于url,获取一个注册中心实例
            Registry registry = getRegistry(url);
            if (RegistryService.class.equals(type)) {
                return proxyFactory.getInvoker((T) registry, type, url);
            }
    
            // group="a,b" or group="*"
            // step3:处理服务分组
            Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
            String group = qs.get(GROUP_KEY);
            if (group != null && group.length() > 0) {
                if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                	// step5:返回一个引用远程服务的invoker
                    return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
                }
            }
    
    		// step4:通过SPI,创建一个集群容错类
            Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
            // step5:返回一个引用远程服务的invoker
            return doRefer(cluster, registry, type, url, qs);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    RegistryProtocol.refer() 中,存在几个步骤:

    1. step1:获取注册中心地址,例如以Zookeeper为注册中心,地址为:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?省略其他参数…
    2. step2:基于url,获取一个注册中心实例,例如以Zookeeper为注册中心,创建实例为 ZookeeperRegistry。
    3. step3:如果存在服务分组,创建一个 MergeableClusterInvoker
    4. step4:通过SPI,创建一个集群容错类,默认failover(失败重试,FailoverClusterInvoker)
    5. step5:返回一个引用远程服务的invoker

    在 step4:通过SPI,创建一个集群容错类 中,默认为 failover(失败重试),对应 dubbo 集群容错模式:

    • Failover Cluster:失败自动切换,当出现失败,重试其它服务器。
    • Failfast Cluster:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
    • Failsafe Cluster:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
      Failback Cluster:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
      Forking Cluster:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。
      Broadcast Cluster:广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

    下面我们详细分析一下 step5:返回一个引用远程服务的invoker,是什么创建一个invoker的。

    RegistryProtocol.doRefer()

    支持 Invoker 创建逻辑。

    public class RegistryProtocol implements Protocol {
    	// 创建并返回 Invoker
    	protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
    		// 构建一个consumer URL,结构如下:consumer://192.168.0.4/com.example.demo.provider.DemoProvider?省略其他参数...
            URL consumerUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    
    		// 创建一个 ServiceDiscoveryMigrationInvoker:服务发现迁移调用Invoker
            ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
            
            // 执行Invoker拦截动作,返回一个Invoker
            return interceptInvoker(migrationInvoker, url, consumerUrl);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    构建一个consumer URL,结构如下:

    consumer://192.168.0.4/com.example.demo.provider.DemoProvider?application=dubbo-consumer-demo		// 应用name
    &check=false		// 是否检查服务存在,true:不存在会服务启动异常
    &dubbo=2.0.2	// 版本
    &init=false
    &interface=com.example.demo.provider.DemoProvider	// 接口全路径
    &metadata-type=remote		// 远程
    &methods=method1,method2		// 接口方法名称
    &pid=16752
    &qos.enable=false
    &release=2.7.14
    &revision=1.0-SNAPSHOT
    &side=consumer
    &sticky=false
    &timestamp=1661675741769
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    interceptInvoker(migrationInvoker, url, consumerUrl); 执行Invoker拦截动作,返回一个Invoker

    public class RegistryProtocol implements Protocol {
    	// 执行Invoker拦截动作,返回一个Invoker
    	protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
    		// SPI,返回激活扩展点:MigrationRuleListener
            List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
            if (CollectionUtils.isEmpty(listeners)) {
                return invoker;
            }
    
            for (RegistryProtocolListener listener : listeners) {
            	// 执行 MigrationRuleListener 中的 OnRefer 方法
                listener.onRefer(this, invoker, consumerUrl);
            }
            return invoker;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    MigrationRuleListener.onRefer(RegistryProtocol registryProtocol, ClusterInvoker invoker, URL url)

    @Activate    // 激活扩展点
    public class MigrationRuleListener implements RegistryProtocolListener, ConfigurationListener {
    	@Override
        public synchronized void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL url) {
            MigrationInvoker<?> migrationInvoker = (MigrationInvoker<?>) invoker;
    
            MigrationRuleHandler<?> migrationListener = new MigrationRuleHandler<>(migrationInvoker);
            listeners.add(migrationListener);
    		
    		// rawRule:在MigrationRuleListener 构造函数中进行赋值,最终rawRule=INIT
            migrationListener.doMigrate(rawRule);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    MigrationRuleHandler.doMigrate(String rawRule)

    public class MigrationRuleHandler<T> {
        private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
    
        private MigrationInvoker<T> migrationInvoker;
    
        public MigrationRuleHandler(MigrationInvoker<T> invoker) {
            this.migrationInvoker = invoker;
        }
    
        private MigrationStep currentStep;
    
    	// 执行迁移动作 rawRule=INIT
        public void doMigrate(String rawRule) {
        	// rawRule=INIT
            MigrationRule rule = MigrationRule.parse(rawRule);
    
            if (null != currentStep && currentStep.equals(rule.getStep())) {
                if (logger.isInfoEnabled()) {
                    logger.info("Migration step is not change. rule.getStep is " + currentStep.name());
                }
                return;
            } else {
                currentStep = rule.getStep();
            }
    
            migrationInvoker.setMigrationRule(rule);
    
            if (migrationInvoker.isMigrationMultiRegistry()) {
                if (migrationInvoker.isServiceInvoker()) {
                    migrationInvoker.refreshServiceDiscoveryInvoker();
                } else {
                    migrationInvoker.refreshInterfaceInvoker();
                }
            } else {
                switch (rule.getStep()) {
                	// 看到这里疑惑的小伙伴,可以通过断点调试的方式,进行代码跟踪
                	// 实际上我也是通过断点的方式跟踪到这里
                    case APPLICATION_FIRST: // 应用第一次启动
                        migrationInvoker.migrateToServiceDiscoveryInvoker(false);
                        break;
                    case FORCE_APPLICATION:
                        migrationInvoker.migrateToServiceDiscoveryInvoker(true);
                        break;
                    case FORCE_INTERFACE:
                    default:
                        migrationInvoker.fallbackToInterfaceInvoker();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
    	@Override
        public synchronized void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
        	// 是否强制迁移,false
            if (!forceMigrate) {
            	// 刷新服务发现:关键代码
                refreshServiceDiscoveryInvoker();
                // 刷新接口
                refreshInterfaceInvoker();
                setListener(invoker, () -> {
                    this.compareAddresses(serviceDiscoveryInvoker, invoker);
                });
                setListener(serviceDiscoveryInvoker, () -> {
                    this.compareAddresses(serviceDiscoveryInvoker, invoker);
                });
            } else {
                refreshServiceDiscoveryInvoker();
                setListener(serviceDiscoveryInvoker, () -> {
                    this.destroyInterfaceInvoker(this.invoker);
                });
            }
        }
    
    	@Override
        public synchronized void refreshServiceDiscoveryInvoker() {
        	// // 刷新服务发现:关键代码
            clearListener(serviceDiscoveryInvoker);
            if (needRefresh(serviceDiscoveryInvoker)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Re-subscribing instance addresses, current interface " + type.getName());
                }
                
                // 获取服务发现Invoker对象
                serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
    
                if (migrationMultiRegistry) {
                    setListener(serviceDiscoveryInvoker, () -> {
                        this.setAddressChanged();
                    });
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    registryProtocol.getServiceDiscoveryInvoker()

    通过 registryProtocol.getServiceDiscoveryInvoker() 方法,获取一个服务发现 Invoker 对象。

    public class RegistryProtocol implements Protocol {
    	// 获取一个服务发现 Invoker 对象
    	public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
    		// 动态服务发现注册表目录:可通过简单注册中心进行动态变化,用于获取服务列表
            DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);
            return doCreateInvoker(directory, cluster, registry, type);
        }
    	
    	/**
    	 * 创建
    	 * @param directory 动态服务发现注册表目录,用于获取服务列表
    	 * @param cluster 集群处理对象,包括集群容错处理,结构为MockClusterWrapper(FailoverCluster)
    	 * @param registry 注册中心对象,如 ZookeeperRegistry
    	 * @param type 接口Class
    	 */
    	protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
            directory.setRegistry(registry);	// 设置注册中心对象
            directory.setProtocol(protocol);	// 设置协议对象
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    
    		// 构建一个consumer://127.0.0.1/com.example.demo.provider.DemoProvider?....的URL
            URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (directory.isShouldRegister()) {
            	// 向注册中心注册一个consumer目录内容
                directory.setRegisteredConsumerUrl(urlToRegistry);
                registry.register(directory.getRegisteredConsumerUrl());
            }
            // 构建一个路由链
            directory.buildRouterChain(urlToRegistry);
            // 订阅服务列表的变化:
            // 1. 从注册中心拿到 provider 地址
            // 2. 基于 provider 地址建立通信
            directory.subscribe(toSubscribeUrl(urlToRegistry));
    
    		// MockClusterWrapper(FailoverCluster)
            return (ClusterInvoker<T>) cluster.join(directory);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    可见,在 RegistryProtocel.doCreateInvoker() 方法中,主要做了一下几件事:

    1. 向注册中心注册一个 consumer://ip:port/interfaceName… 目录
    2. 从注册中心拿到 provider 地址
    3. 基于 provider 地址建立通信
    4. 构建并返回 Invoker 对象

    在这其中,贯穿了一个非常核心的对象 DynamicDirectory,关于该类的类关系图如下:
    在这里插入图片描述
    所以,在调用 directory.subscribe(toSubscribeUrl(urlToRegistry)); 进行服务订阅时,会进入 ServiceDiscoveryRegistryDirectory 类的 subscribe(URL url) 方法中。

    public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
    	@Override
        public void subscribe(URL url) {
            super.subscribe(url);
            if (ApplicationModel.getEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
                enableConfigurationListen = true;
                CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
                referenceConfigurationListener = new ReferenceConfigurationListener(this, url);
            } else {
                enableConfigurationListen = false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    super.subscribe(url); 进入父类的 subscribe(url) 方法中

    public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    	public void subscribe(URL url) {
            setConsumerUrl(url);
            // 调用 registry.subscribe
            registry.subscribe(url, this);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    到了这里,我们就需要搞懂,registry 到底是那个对象。在 RegistryProtocol.refer() 方法中,会通过 Registry registry = getRegistry(url); 获取 Registry 对象,然后一直传递,则如果采用 Zookeeper 为注册中心,Registry 对象为:

    RegistryFactoryWrapper.getRegistry(URL url) -> AbstractRegistryFactory.getRegistry(URL url) -> ZookeeperRegistryFactory.createRegistry(URL url)
    返回 ZookeeperRegistry 对象

    所以在 DynamicDirectory 构造函数方法中,registry.subscribe(url, this); 调用链路如下:

    ListenerRegistryWrapper.subscribe() -> FailbackRegistry.subscribe() -> ZookeeperRegistry.doSubscribe()

    ZookeeperRegistry.doSubscribe()
    public class ZookeeperRegistry extends FailbackRegistry {
    	
        @Override
        public void doSubscribe(final URL url, final NotifyListener listener) {
            try {
            	// "*".equals(url.getServiceInterface())
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    // 省略无关代码...
                } else {
                    CountDownLatch latch = new CountDownLatch(1);
                    List<URL> urls = new ArrayList<>();
                    
                    // toCategoriesPath(url):返回当前类目录下三个字目录路径,如下:
                    // - /dubbo/com.example.demo.provider.DemoProvider/providers
                    // - /dubbo/com.example.demo.provider.DemoProvider/configurators
                    // - /dubbo/com.example.demo.provider.DemoProvider/routers
                    for (String path : toCategoriesPath(url)) {
                        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                        ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, latch));
                        if (zkListener instanceof RegistryChildListenerImpl) {
                            ((RegistryChildListenerImpl) zkListener).setLatch(latch);
                        }
                        // 创建
                        zkClient.create(path, false);
                        
                        // 为每个目录添加 Listener 监听,返回对应目录下的所有子节点
                        // 如果为 
                        List<String> children = zkClient.addChildListener(path, zkListener);
                        if (children != null) {
                            urls.addAll(toUrlsWithEmpty(url, path, children));
                        }
                    }
    
    				// 通知
                    notify(url, listener, urls);
                    latch.countDown();
                }
            } catch (Throwable e) {
                throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    调用 notify(url, listener, urls); 通知方法,进入父类的 notify 方法中,最终进入 AbstractRegistry.notify()

    public abstract class AbstractRegistry implements Registry {
        protected void notify(URL url, NotifyListener listener, List<URL> urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((CollectionUtils.isEmpty(urls))
                    && !ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
            }
    
    		/**
    		 * urls 内容如下:
    		 * 0 = dubbo://127.0.0.1:20880/com.example.demo.provider.DemoProvider?...
    		 * 1 = empty://127.0.0.1/com.example.demo.provider.DemoProvider?...
    		 * 2 = empty://127.0.0.1/com.example.demo.provider.DemoProvider?...
    		 */
            // keep every provider's category.
            Map<String, List<URL>> result = new HashMap<>();
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                    List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                    categoryList.add(u);
                }
            }
            if (result.size() == 0) {
                return;
            }
    
    		/**
    		 * result 内容如下
    		 * routers -> {ArrayList}
    		 * configurators -> {ArrayList}
    		 * providers -> {ArrayList}
    		 */
            Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
            for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            	// 类型,存在四种类型:providers consumers configurators routers
                String category = entry.getKey();
                List<URL> categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
    
    			// 通知,注意 Listener为 RegistryDirectory
                listener.notify(categoryList);
                
                // 保存到文件中
                saveProperties(url);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    注意:如果监听到服务节点发生变化,会执行 RegistryChildListenerImpl.RegistryChildListenerImpl.childChanged(String path, List children) 方法,最终也会调用到 AbstractRegistry.notify() 进而完成服务的动态感知。

    RegistryDirectory.notify()
    public class RegistryDirectory<T> extends DynamicDirectory<T> {
        // 缓存服务 url 到调用者映射
        protected volatile Map<URL, Invoker<T>> urlInvokerMap;
        
    	@Override
        public synchronized void notify(List<URL> urls) {
        	// key -> 类型 providers consumers configurators routers
        	// value  ->  URL List
            Map<String, List<URL>> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(this::judgeCategory));
    
            List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
            /**
             * 3.x added for extend URL address
             */
            ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
            List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
            if (supportedListeners != null && !supportedListeners.isEmpty()) {
                for (AddressListener addressListener : supportedListeners) {
                    providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);
                }
            }
            // 刷新本地 Invoker 对象
            refreshOverrideAndInvoker(providerURLs);
        }
        
        private synchronized void refreshOverrideAndInvoker(List<URL> urls) {
            // mock zookeeper://xxx?mock=return null
            overrideDirectoryUrl();
            // 刷新本地 Invoker 对象
            refreshInvoker(urls);
        }
        
        private void refreshInvoker(List<URL> invokerUrls) {
            Assert.notNull(invokerUrls, "invokerUrls should not be null");
    
            if (invokerUrls.size() == 1
                    && invokerUrls.get(0) != null
                    && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // Forbid to access
                this.invokers = Collections.emptyList();
                routerChain.setInvokers(this.invokers);
                destroyAllInvokers(); // Close all invokers
            } else {
            	// 服务启动urlInvokerMap=null
                Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
                if (invokerUrls == Collections.<URL>emptyList()) {
                    invokerUrls = new ArrayList<>();
                }
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                	// 设置缓存
                    this.cachedInvokerUrls = new HashSet<>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                if (invokerUrls.isEmpty()) {
                    return;
                }
                this.forbidden = false; // Allow to access
    
    			// 将 url 列表转换为 Invoker 映射
                Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
    
                if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                            .toString()));
                    return;
                }
    
                List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
                
                routerChain.setInvokers(newInvokers);
                this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
                this.urlInvokerMap = newUrlInvokerMap;
    
                // Close the unused Invoker
                // 关闭未使用的 Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
    
            }
    
            // notify invokers refreshed
            // 通知调用者刷新
            this.invokersChanged();
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    分析到这里后,dubbo消费端在服务启动中,通过注册中心,获取到服务Provider URL 信息就完成了。

    核心对象为 Map> urlInvokerMap;,用于缓存服务 url 到调用者映射。

    • key:URL
    • Value:对应服务提端的连接
    RegistryDirectory.toInvokers(List urls)

    通过 Map> newUrlInvokerMap = toInvokers(invokerUrls); 将URL转换为 Invoker。

    public class RegistryDirectory<T> extends DynamicDirectory<T> {
    	private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
            Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>();
            if (CollectionUtils.isEmpty(urls)) {
                return newUrlInvokerMap;
            }
            Set<URL> keys = new HashSet<>();
            String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
            for (URL providerUrl : urls) {
                if (queryProtocols != null && queryProtocols.length() > 0) {
                    boolean accept = false;
                    String[] acceptProtocols = queryProtocols.split(",");
                    for (String acceptProtocol : acceptProtocols) {
                        if (providerUrl.getProtocol().equals(acceptProtocol)) {
                            accept = true;
                            break;
                        }
                    }
                    if (!accept) { // 不匹配,continue
                        continue;
                    }
                }
                // 空协议,continue
                if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                    continue;
                }
                // 非法协议,continue
                if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                    logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                            " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                            " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                            ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                    continue;
                }
                // 合并URL
                URL url = mergeUrl(providerUrl);
    
    			// 去重
                if (keys.contains(url)) { // Repeated url
                    continue;
                }
                keys.add(url);
                
                Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
                Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(url);
                if (invoker == null) { // Not in the cache, refer again
                    try {
                        boolean enabled = true;
                        if (url.hasParameter(DISABLED_KEY)) {
                            enabled = !url.getParameter(DISABLED_KEY, false);
                        } else {
                            enabled = url.getParameter(ENABLED_KEY, true);
                        }
                        if (enabled) {
                        	// 核心代码:通过URL,创建一个Invoker
                            invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                        }
                    } catch (Throwable t) {
                        logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                    }
                    if (invoker != null) { // Put new invoker in cache
                        newUrlInvokerMap.put(url, invoker);
                    }
                } else {
                    newUrlInvokerMap.put(url, invoker);
                }
            }
            keys.clear();
            return newUrlInvokerMap;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    创建 Invoker 对象

    RegistryDirectory.toInvokers(List urls) 方法中,将 URL 转换为 Invoker,其中有一段非常核心的代码,如下:

    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
    
    • 1

    通过 protocol.refer(serviceType, url),创建一个对应的 Invoker 对象,其 url 为 dubbo 协议,所有,最终进入 DubboProtocol 的 refer() 方法中,最后会调用其 protocolBindingRefer(Class type, URL url) 方法。

    public class DubboProtocol extends AbstractProtocol {
    	@Override
        public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
            optimizeSerialization(url);
    
            // create rpc invoker.
            DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
            invokers.add(invoker);
    
            return invoker;
        }
    
    	// 获取对应的连接
        private ExchangeClient[] getClients(URL url) {
            // whether to share connection
            int connections = url.getParameter(CONNECTIONS_KEY, 0);
            // if not configured, connection is shared, otherwise, one connection for one service
            if (connections == 0) {
                /*
                 * The xml configuration should have a higher priority than properties.
                 */
                String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
                connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                        DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
                return getSharedClient(url, connections).toArray(new ExchangeClient[0]);
            } else {
                ExchangeClient[] clients = new ExchangeClient[connections];
                for (int i = 0; i < clients.length; i++) {
                    clients[i] = initClient(url);
                }
                return clients;
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    最后再 DubboProtocol#getClients(URL url) 会获取当前 url 对应的客户端连接,即 NettyClient。

    NettyClient
    public class NettyClient extends AbstractClient {
        private static final ChannelFactory CHANNEL_FACTORY = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
                Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
                Constants.DEFAULT_IO_THREADS);
        private ClientBootstrap bootstrap;
    
        private volatile Channel channel; // volatile, please copy reference to use
    
        public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
            super(url, wrapChannelHandler(url, handler));
        }
    
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            bootstrap = new ClientBootstrap(CHANNEL_FACTORY);
            // config
            // @see org.jboss.netty.channel.socket.SocketChannelConfig
            bootstrap.setOption("keepAlive", true);
            bootstrap.setOption("tcpNoDelay", true);
            bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder", adapter.getDecoder());
                    pipeline.addLast("encoder", adapter.getEncoder());
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
        }
    
        @Override
        protected void doConnect() throws Throwable {
            long start = System.currentTimeMillis();
            ChannelFuture future = bootstrap.connect(getConnectAddress());
            try {
                boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
    
                if (ret && future.isSuccess()) {
                    Channel newChannel = future.getChannel();
                    newChannel.setInterestOps(Channel.OP_READ_WRITE);
                    try {
                        // Close old channel
                        Channel oldChannel = NettyClient.this.channel; // copy reference
                        if (oldChannel != null) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                                }
                                oldChannel.close();
                            } finally {
                                NettyChannel.removeChannelIfDisconnected(oldChannel);
                            }
                        }
                    } finally {
                        if (NettyClient.this.isClosed()) {
                            try {
                                if (logger.isInfoEnabled()) {
                                    logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                                }
                                newChannel.close();
                            } finally {
                                NettyClient.this.channel = null;
                                NettyChannel.removeChannelIfDisconnected(newChannel);
                            }
                        } else {
                            NettyClient.this.channel = newChannel;
                        }
                    }
                } else if (future.getCause() != null) {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
                } else {
                    throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                            + getRemoteAddress() + " client-side timeout "
                            + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
                }
            } finally {
                if (!isConnected()) {
                    future.cancel();
                }
            }
        }
    
        @Override
        protected void doDisConnect() throws Throwable {
            try {
                NettyChannel.removeChannelIfDisconnected(channel);
            } catch (Throwable t) {
                logger.warn(t.getMessage());
            }
        }
    
        @Override
        protected void doClose() throws Throwable {
            /*try {
                bootstrap.releaseExternalResources();
            } catch (Throwable t) {
                logger.warn(t.getMessage());
            }*/
        }
    
        @Override
        protected org.apache.dubbo.remoting.Channel getChannel() {
            Channel c = channel;
            if (c == null || !c.isConnected()) {
                return null;
            }
            return NettyChannel.getOrAddChannel(c, getUrl(), this);
        }
    
        Channel getNettyChannel() {
            return channel;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122

    最终 Invoker 链路如下:
    在这里插入图片描述

    2. 创建接口代理类

    // Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()
    // invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    // ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    
    • 1
    • 2
    • 3
    • 4

    默认采用 JavassistProxyFactory 创建代理对象,如下:

    public class JavassistProxyFactory extends AbstractProxyFactory {
    
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        	// jdk代理模式,InvokerInvocationHandler 处理类
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3. 总结

    下面,我们根据上面的源码分析,画一个简单的流程图。
    在这里插入图片描述

    最终,创建的代理类结构如下:

    在这里插入图片描述

  • 相关阅读:
    联想集团:2022/23财年第一季度业绩
    【强大的数据迁移和恢复解决方案】KernelApps及其产品介绍
    论文阅读:Point-to-Voxel Knowledge Distillation for LiDAR Semantic Segmentation
    聊聊推荐系统的评测(下)
    【心得】来聊聊令人头疼的前端内存泄漏~
    验收测试做得好,质量验收没烦恼
    如何解决msvcp110.dll丢失问题,分享5个有效的解决方法
    一文带你看透手机号码归属地
    vue 监听页面卷去的高度,获取元素离页面顶部的距离
    pandas dataframe 统计某一列的值出现的次数并形成一列新的列
  • 原文地址:https://blog.csdn.net/qq_33375499/article/details/126567715