• [Dubbo3.0.8源码解析系列]-25-为调用器对象增加容错和过滤器等功能


    25-为调用器对象增加容错和过滤器等功能

    25.1 简介

    将思路拉回到RegistryProtocol的创建Invoker对象的doCreateInvoker代码

     protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<>(directory.getConsumerUrl().getParameters());
            URL urlToRegistry = new ServiceConfigURL(
                parameters.get(PROTOCOL_KEY) == null ? CONSUMER : parameters.get(PROTOCOL_KEY),
                parameters.remove(REGISTER_IP_KEY),
                0,
                getPath(parameters, type),
                parameters
            );
            urlToRegistry = urlToRegistry.setScopeModel(directory.getConsumerUrl().getScopeModel());
            urlToRegistry = urlToRegistry.setServiceModel(directory.getConsumerUrl().getServiceModel());
            if (directory.isShouldRegister()) {
                directory.setRegisteredConsumerUrl(urlToRegistry);
                registry.register(directory.getRegisteredConsumerUrl());
            }
            directory.buildRouterChain(urlToRegistry);
            directory.subscribe(toSubscribeUrl(urlToRegistry));
    
            return (ClusterInvoker<T>) cluster.join(directory, true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    不论是接口级还是应用级注册都会调用代码

     return (ClusterInvoker<T>) cluster.join(directory, true);
    
    • 1

    25.2 MockClusterWrapper类型的join方法

    我们详细看下:
    MockClusterWrapper类型的join方法

    buildFilterChain参数为true

      public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
            return new MockClusterInvoker<T>(directory,
                    this.cluster.join(directory, buildFilterChain));
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    默认的cluster类型为FailoverCluster

    25.3 FailoverCluster的join方法

    我们看FailoverCluster的join方法

    FailoverCluster没有实现join方法需要先调用它的父类型AbstractCluster的join方法如下:

       public <T> Invoker<T> join(Directory<T> directory, boolean buildFilterChain) throws RpcException {
            //带有过滤器将走上面这个逻辑
            if (buildFilterChain) {
                return buildClusterInterceptors(doJoin(directory));
            } else {
                return doJoin(directory);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    //过滤器参数为true将走上面这个逻辑 先创建Invoker对象再创建过滤器

           if (buildFilterChain) {
               return buildClusterInterceptors(doJoin(directory));
    
    
    • 1
    • 2
    • 3

    doJoin方法将有默认类AbstractCluster的doJoin抽象方法调用具体方法FailoverCluster的doJoin方法如下
    FailoverCluster的doJoin方法

     public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<>(directory);
        }
    
    • 1
    • 2
    • 3

    可以看到这里调用链路创建了一个失效转移的Invoker对象FailoverClusterInvoker

    FailoverClusterInvoker的构造器

      public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
    • 1
    • 2
    • 3

    父调用器AbstractClusterInvoker的构造器

     public AbstractClusterInvoker(Directory<T> directory) {
            this(directory, directory.getUrl());
        
    }
    
    • 1
    • 2
    • 3
    • 4

    重载的构造器如下:

    
        public AbstractClusterInvoker(Directory<T> directory, URL url) {
            if (directory == null) {
                throw new IllegalArgumentException("service directory == null");
            }
    
            this.directory = directory;
            //sticky: invoker.isAvailable() should always be checked before using when availablecheck is true.
            this.availableCheck = url.getParameter(CLUSTER_AVAILABLE_CHECK_KEY, DEFAULT_CLUSTER_AVAILABLE_CHECK);
            Configuration configuration = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultModuleModel());
            this.reselectCount = configuration.getInt(RESELECT_COUNT, DEFAULT_RESELECT_COUNT);
            this.enableConnectivityValidation = configuration.getBoolean(ENABLE_CONNECTIVITY_VALIDATION, true);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    初始化过滤器链路的代码AbstractCluster类型的

    25.4 buildClusterInterceptors

    buildClusterInterceptors方法如下:

      private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker) {
    //        AbstractClusterInvoker last = clusterInvoker;
            AbstractClusterInvoker<T> last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker));
    
            if (Boolean.parseBoolean(ConfigurationUtils.getProperty(clusterInvoker.getDirectory().getConsumerUrl().getScopeModel(), CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) {
                return build27xCompatibleClusterInterceptors(clusterInvoker, last);
            }
            return last;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    25.5 ClusterFilterInvoker构造器

    ClusterFilterInvoker构造器

      public ClusterFilterInvoker(AbstractClusterInvoker invoker) {
                //过滤器构造链DefaultFilterChainBuilder
                List builders = ScopeModelUtil.getApplicationModel(invoker.getUrl().getScopeModel()).getExtensionLoader(FilterChainBuilder.class).getActivateExtensions();
                if (CollectionUtils.isEmpty(builders)) {
                    filterInvoker = invoker;
                } else {
                    ClusterInvoker tmpInvoker = invoker;
                    for (FilterChainBuilder builder : builders) {
                        tmpInvoker = builder.buildClusterInvokerChain(tmpInvoker, REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
                    }
                    filterInvoker = tmpInvoker;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    DefaultFilterChainBuilder类型的buildClusterInvokerChain构造过滤器链路
    
    ```java
    public  ClusterInvoker buildClusterInvokerChain(final ClusterInvoker originalInvoker, String key, String group) {
        ClusterInvoker last = originalInvoker;
        URL url = originalInvoker.getUrl();
        List moduleModels = getModuleModelsFromUrl(url);
        List filters;
        if (moduleModels != null && moduleModels.size() == 1) {
            //通过扩展查询匹配的消费者过滤器列表这里可以查询4个
            filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
        } else if (moduleModels != null && moduleModels.size() > 1) {
            filters = new ArrayList<>();
            List directors = new ArrayList<>();
            for (ModuleModel moduleModel : moduleModels) {
                List tempFilters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, moduleModel).getActivateExtension(url, key, group);
                filters.addAll(tempFilters);
                directors.add(moduleModel.getExtensionDirector());
            }
            filters = sortingAndDeduplication(filters, directors);
    
        } else {
            filters = ScopeModelUtil.getExtensionLoader(ClusterFilter.class, null).getActivateExtension(url, key, group);
        }
        //过滤器不为空则拼接到调用链表之中
        if (!CollectionUtils.isEmpty(filters)) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final ClusterFilter filter = filters.get(i);
                final Invoker next = last;
                last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
            }
            return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
        }
    
        return last;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    
    默认为4个过滤器:
    - ConsumerContextFilter
    - FutureFilter
    - MonitorClusterFilter
    - RouterSnapshotFilter
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 相关阅读:
    21 Spring Boot整合Redis
    【读书笔记】【Effective C++】继承与面向对象设计
    【计算机网络笔记】网络应用的体系结构
    基于云边协同架构的五大应用场景革新
    力扣:第81场双周赛
    PHP 获取当前时间戳,精确到毫秒
    Embind进阶用法(vector)
    java计算机毕业设计框架的电脑测评系统源码+数据库+lw文档+系统
    面试网络-0x01 http中的GET和POST区别?
    综合管廊UWB人员定位系统
  • 原文地址:https://blog.csdn.net/songjunyan/article/details/126573599