• 5-Dubbo架构设计与底层原理-服务导出源码分析(中)


    导出 Dubbo 服务

    服务导出,分为导出到本地 (JVM),和导出到远程。在深入分析服务导出源码前,先来从宏观层面上看一下服务导出逻辑。

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        // 省略无关代码
        
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            // 加载 ConfiguratorFactory,并生成 Configurator 配置 url
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
    
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // 如果 scope = none,则什么都不做
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            // scope != remote,导出到本地
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
    
            // scope != local,导出到远程
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY,                                             registryURL.getParameter(Constants.DYNAMIC_KEY));
                        // 加载监视器链接
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            // 将监视器链接作为参数添加到 url 中
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, 
                                                             monitorUrl.toFullString());
                        }
    
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, 
                                                                   proxy);
                        }
    
                        // 为服务提供类(ref)生成 Invoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) 								interfaceClass, registryURL.addParameterAndE                                                (Constants.EXPORT_KEY, url.toFullString()));
                        // DelegateProviderMetaDataInvoker 仅用于持有 Invoker 和 	
                        // ServiceConfig
                        DelegateProviderMetaDataInvoker wrapperInvoker = new 		
                            DelegateProviderMetaDataInvoker(invoker, this);
    
                        // 导出服务,并生成 Exporter
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {    // 不存在注册中心,仅导出服务
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) 
                                                                 interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new 
                        DelegateProviderMetaDataInvoker(invoker, this);
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:

    • scope = none,不导出服务
    • scope != remote,导出到本地
    • scope != local,导出到远程

    不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker。

    Invoker 创建过程

    在 Dubbo 中,Invoker 是一个非常重要的模型。在服务提供端,以及服务引用端均会出现 Invoker。Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

    Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。如下:

    //  -- JavassistProxyFactory
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    	// 为目标类创建 Wrapper
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
    			// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。覆写后的 doInvoke 逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可通过 getWrapper(Class) 方法创建子类。在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码,和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例。

     public static Wrapper getWrapper(Class<?> c) {
        while (ClassGenerator.isDynamicClass(c))
            c = c.getSuperclass();
    
        if (c == Object.class)
            return OBJECT_WRAPPER;
    
        // 访存
        Wrapper ret = WRAPPER_MAP.get(c);
        if (ret == null) {
            // 缓存未命中,创建 Wrapper
            ret = makeWrapper(c);
            // 写入缓存
            WRAPPER_MAP.put(c, ret);
        }
        return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    getWrapper 方法只是包含了一些缓存操作逻辑,非重点。重点关注 makeWrapper 方法。

    private static Wrapper makeWrapper(Class<?> c) {
        // 检测 c 是否为私有类型,若是则抛出异常
        if (c.isPrimitive())
            throw new IllegalArgumentException("Can not create wrapper for primitive 				type: " + c);
    
        String name = c.getName();
        ClassLoader cl = ClassHelper.getClassLoader(c);
    
        // c1 用于存储 setPropertyValue 方法代码
        StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, 			String n, Object v){ ");
        // c2 用于存储 getPropertyValue 方法代码
        StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, 			String n){ ");
        // c3 用于存储 invokeMethod 方法代码
        StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, 				String n, Class[] p, Object[] v) throws " + 								
            InvocationTargetException.class.getName() + "{ ");
    
        // 生成类型转换代码及异常捕捉代码,比如:
        //   DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw 			new IllegalArgumentException(e); }
        c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); 					}catch(Throwable e){ throw new IllegalArgumentException(e); }");
        c2.append(name).append(" w; try{ w = ((").append(name).append(")$1);  					}catch(Throwable e){ throw new IllegalArgumentException(e); }");
        c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); 					}catch(Throwable e){ throw new IllegalArgumentException(e); }");
    
        // pts 用于存储成员变量名和类型
        Map<String, Class<?>> pts = new HashMap<String, Class<?>>();
        // ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例
        Map<String, Method> ms = new LinkedHashMap<String, Method>();
        // mns 为方法名列表
        List<String> mns = new ArrayList<String>();
        // dmns 用于存储定义在当前类中的方法的名称
        List<String> dmns = new ArrayList<String>();
    
        // 1-------------------------------------------------------
    
        // 获取 public 访问级别的字段,并为所有字段生成条件判断语句
        for (Field f : c.getFields()) {
            String fn = f.getName();
            Class<?> ft = f.getType();
            if (Modifier.isStatic(f.getModifiers()) || 	
                Modifier.isTransient(f.getModifiers()))
                // 忽略关键字 static 或 transient 修饰的变量
                continue;
    
            // 生成条件判断及赋值语句,比如:
            // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
            // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
            c1.append(" if( $2.equals(\"").append(fn).append("\") ){ 					 			w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
    
            // 生成条件判断及返回语句,比如:
            // if( $2.equals("name") ) { return ($w)w.name; }
            c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return  		  				($w)w.").append(fn).append("; }");
    
            // 存储 <字段名, 字段类型> 键值对到 pts 中
            pts.put(fn, ft);
        }
    
        // 2---------------------------------------------------------
    
        Method[] methods = c.getMethods();
        // 检测 c 中是否包含在当前类中声明的方法
        boolean hasMethod = hasMethods(methods);
        if (hasMethod) {
            c3.append(" try{");
        }
        for (Method m : methods) {
            if (m.getDeclaringClass() == Object.class)
                // 忽略 Object 中定义的方法
                continue;
    
            String mn = m.getName();
            // 生成方法名判断语句,示例如下:
            // if ( "sayHello".equals( $2 )
            c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
            int len = m.getParameterTypes().length;
            // 生成运行时传入参数的数量与方法的参数列表长度判断语句,示例如下:
            // && $3.length == 2
            c3.append(" && ").append(" $3.length == ").append(len);
    
            boolean override = false;
            for (Method m2 : methods) {
                // 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
                if (m != m2 && m.getName().equals(m2.getName())) {
                    override = true;
                    break;
                }
            }
            // 对重载方法进行处理,考虑下面的方法:
            //    1. void sayHello(Integer, String)
            //    2. void sayHello(Integer, Integer)
            // 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
            // 需要进一步判断方法的参数类型
            if (override) {
                if (len > 0) {
                    for (int l = 0; l < len; l++) {
                        // && $3[0].getName().equals("java.lang.Integer") 
                        //    && $3[1].getName().equals("java.lang.String")
                        c3.append(" && ").append("  			 			 										$3[").append(l).append("].getName().equals(\"")
                                .append(m.getParameterTypes()			
                                [l].getName()).append("\")");
                    }
                }
            }
    
            // 添加 ) {,完成方法判断语句,此时生成的方法可能如下(已格式化):
            // if ("sayHello".equals($2) 
            //     && $3.length == 2
            //     && $3[0].getName().equals("java.lang.Integer") 
            //     && $3[1].getName().equals("java.lang.String")) {
            c3.append(" ) { ");
    
            // 根据返回值类型生成目标方法调用语句
            if (m.getReturnType() == Void.TYPE)
                // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return 				null;
                c3.append("w.").append(mn).append('(')
                .append(args(m.getParameterTypes(), "$4"))
                .append(");").append(" return null;");
            else
                // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
                c3.append(" return ($w)w.").append(mn).append('(')
                .append(args(m.getParameterTypes(), "$4")).append(");");
    
            // 添加 }, 当前”方法判断条件“代码生成完毕,示例代码如下(已格式化):
            // if ("sayHello".equals($2) 
            //     && $3.length == 2
            //     && $3[0].getName().equals("java.lang.Integer") 
            //     && $3[1].getName().equals("java.lang.String")) {
            //
            //     w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); 
            //     return null;
            // }
            c3.append(" }");
    
            // 添加方法名到 mns 集合中
            mns.add(mn);
            // 检测当前方法是否在 c 中被声明的
            if (m.getDeclaringClass() == c)
                // 若是,则将当前方法名添加到 dmns 中
                dmns.add(mn);
            ms.put(ReflectUtils.getDesc(m), m);
        }
        if (hasMethod) {
            // 添加异常捕捉语句
            c3.append(" } catch(Throwable e) { ");
            c3.append("  throw new java.lang.reflect.InvocationTargetException(e); ");
            c3.append(" }");
        }
    
        // 添加 NoSuchMethodException 异常抛出代码
        c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found 		method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
    
        // 3----------------------------------------------------------
    
        Matcher matcher;
        // 处理 get/set 方法
        for (Map.Entry<String, Method> entry : ms.entrySet()) {
            String md = entry.getKey();
            Method method = (Method) entry.getValue();
            // 匹配以 get 开头的方法
            if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md))
                .matches()) {
                // 获取属性名
                String pn = propertyName(matcher.group(1));
                // 生成属性判断以及返回语句,示例如下:
                // if( $2.equals("name") ) { return ($w).w.getName(); }
                c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return 		 				($w)w.").append(method.getName()).append("(); }");
                pts.put(pn, method.getReturnType());
    
            // 匹配以 is/has/can 开头的方法
            } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN
                        .matcher(md)).matches()) {
                String pn = propertyName(matcher.group(1));
                // 生成属性判断以及返回语句,示例如下:
                // if( $2.equals("dream") ) { return ($w).w.hasDream(); }
                c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return 						($w)w.").append(method.getName()).append("(); }");
                pts.put(pn, method.getReturnType());
    
            // 匹配以 set 开头的方法
            } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN
                        .matcher(md)).matches()) {
                Class<?> pt = method.getParameterTypes()[0];
                String pn = propertyName(matcher.group(1));
                // 生成属性判断以及 setter 调用语句,示例如下:
                // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
                c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.")
                    .append(method.getName()).append("(")
                    .append(arg(pt, "$3")).append("); return; }");
                pts.put(pn, pt);
            }
        }
    
        // 添加 NoSuchPropertyException 异常抛出代码
        c1.append(" throw new " + NoSuchPropertyException.class.getName() 
                  + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in 	 				class " + c.getName() + ".\"); }");
        c2.append(" throw new " + NoSuchPropertyException.class.getName()
                  + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in 					class " + c.getName() + ".\"); }");
    
        // 4----------------------------------------------------------
    
        long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
        // 创建类生成器
        ClassGenerator cc = ClassGenerator.newInstance(cl);
        // 设置类名及超类
        cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : 
                         c.getName() + "$sw") + id);
        cc.setSuperClass(Wrapper.class);
    
        // 添加默认构造方法
        cc.addDefaultConstructor();
    
        // 添加字段
        cc.addField("public static String[] pns;");
        cc.addField("public static " + Map.class.getName() + " pts;");
        cc.addField("public static String[] mns;");
        cc.addField("public static String[] dmns;");
        for (int i = 0, len = ms.size(); i < len; i++)
            cc.addField("public static Class[] mts" + i + ";");
    
        // 添加方法代码
        cc.addMethod("public String[] getPropertyNames(){ return pns; }");
        cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); 		}");
        cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); 		}");
        cc.addMethod("public String[] getMethodNames(){ return mns; }");
        cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
        cc.addMethod(c1.toString());
        cc.addMethod(c2.toString());
        cc.addMethod(c3.toString());
    
        try {
            // 生成类
            Class<?> wc = cc.toClass();
            
            // 设置字段值
            wc.getField("pts").set(null, pts);
            wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
            wc.getField("mns").set(null, mns.toArray(new String[0]));
            wc.getField("dmns").set(null, dmns.toArray(new String[0]));
            int ix = 0;
            for (Method m : ms.values())
                wc.getField("mts" + ix++).set(null, m.getParameterTypes());
    
            // 创建 Wrapper 实例
            return (Wrapper) wc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Throwable e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            cc.release();
            ms.clear();
            mns.clear();
            dmns.clear();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252

    首先分割线1之上的代码,这段代码主要用于进行一些初始化操作。比如创建 c1、c2、c3 以及 pts、ms、mns 等变量,以及向 c1、c2、c3 中添加方法定义和类型类型转换代码。接下来是分割线1到分割线2之间的代码,这段代码用于为 public 级别的字段生成条件判断取值与赋值代码。分割线2和分隔线3之间的代码用于为定义在当前类中的方法生成判断语句,和方法调用语句。接下来是分割线3和分隔线4之间的代码,这段代码用于处理 getter、setter 以及以 is/has/can 开头的方法。处理方式是通过正则表达式获取方法类型(get/set/is/…),以及属性名。之后为属性名生成判断语句,然后为方法生成调用语句。最后我们再来看一下分隔线4以下的代码,这段代码通过 ClassGenerator 为刚刚生成的代码构建 Class 类,并通过反射创建对象。ClassGenerator 是 Dubbo 自己封装的,该类的核心是 toClass() 的重载方法 toClass(ClassLoader, ProtectionDomain),该方法通过 javassist 构建 Class。

    导出服务到本地

    按照代码执行顺序,先来分析导出服务到本地的过程。相关代码如下:

    private void exportLocal(URL url) {
        // 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                .setProtocol(Constants.LOCAL_PROTOCOL)    // 设置协议头为 injvm
                .setHost(LOCALHOST)
                .setPort(0);
            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
            //创建Invoker,并导出服务,这里的protocol会在运行时调用 InjvmProtocol 的 export 方法
            Exporter<?> exporter = protocol.export(
                proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 创建 InjvmExporter
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), 
                                    exporterMap);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    导出服务到远程

    导出服务到远程包含了服务导出与服务注册两个过程。先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。RegistryProtocol 的 export 方法。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 导出服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    
        // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
        // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?			// application=demo provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52
        //%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26
        //application%3Ddemo-provider
        URL registryUrl = getRegistryUrl(originInvoker);
    
        // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        
        // 获取已注册的服务提供者 URL,比如:
        // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true
        //&application=demo-provider&dubbo=2.0.2&generic=false&interface=
        //com.alibaba.dubbo.demo.DemoService&methods=sayHello
        final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
    
        // 获取 register 参数
        boolean register = registeredProviderUrl.getParameter("register", true);
    
        // 向服务提供者与消费者注册表中注册服务提供者
        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, 
                                                  registeredProviderUrl);
    
        // 根据 register 的值决定是否注册服务
        if (register) {
            // 向注册中心注册服务
            register(registryUrl, registeredProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }
    
        // 获取订阅 URL,比如:
        // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?
        // category=configurators&check=false&anyhost=true&application=
        //demo-provider&dubbo=2.0.2&generic=false&interface=
        //com.alibaba.dubbo.demo.DemoService&methods=sayHello
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
        // 创建监听器
        final OverrideListener overrideSubscribeListener = new 
            OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        // 向注册中心进行订阅 override 数据
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        // 创建并返回 DestroyableExporter
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, 
                                          registeredProviderUrl);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    主要做如下一些操作:

    1. 调用 doLocalExport 导出服务
    2. 向注册中心注册服务
    3. 向注册中心进行订阅 override 数据
    4. 创建并返回 DestroyableExporter

    分析 doLocalExport 方法的逻辑:

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        // 访问缓存
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) 
            bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    // 创建 Invoker 为委托类对象
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>
                        (originInvoker, getProviderUrl(originInvoker));
                    // 调用 protocol 的 export 方法导出服务
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) 
                    protocol.export(invokerDelegete), originInvoker);
                    
                    // 写缓存
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    假设运行时协议为 dubbo,此处的 protocol 会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
    
        // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
        // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        String key = serviceKey(url);
        // 创建 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 将  键值对放入缓存中
        exporterMap.put(key, exporter);
    
        // 以下代码应该和本地存根有关,代码不难看懂,但具体用途暂时不清楚,先忽略
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, 
                                                      Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, 
                                                     false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter
                (Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                // 省略日志打印代码
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
    
        // 启动服务器
        openServer(url);
        // 优化序列化
        optimizeSerialization(url);
        return exporter;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    分析 openServer 方法

    private void openServer(URL url) {
        // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
        String key = url.getAddress();
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            // 访问缓存
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                // 创建服务器实例
                serverMap.put(key, createServer(url));
            } else {
                // 服务器已创建,则根据 url 中的配置重置服务器
                server.reset(url);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    分析服务器实例的创建过程

    private ExchangeServer createServer(URL url) {
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
        // 添加心跳检测配置到 url 中
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, 
                                       String.valueOf(Constants.DEFAULT_HEARTBEAT));
    	// 获取 server 参数,默认为 netty
        String str = url.getParameter(Constants.SERVER_KEY, 
                                      Constants.DEFAULT_REMOTING_SERVER);
    
    	// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
        if (str != null && str.length() > 0 && 
            !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    
        // 添加编码解码器参数
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            // 创建 ExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server...");
        }
                                       
    	// 获取 client 参数,可指定 netty,mina
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
            Set<String> supportedType=ExtensionLoader.getExtensionLoader
                (Transporter.class).getSupportedExtensions();
            // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
            // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type...");
            }
        }
        return server;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    createServer 包含三个核心的操作。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 获取 Exchanger,默认为 HeaderExchanger。
        // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
        return getExchanger(url).bind(url, handler);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    HeaderExchanger 的 bind 方法

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    	// 创建 HeaderExchangeServer 实例,该方法包含了多步操作,本别如下:
    	//   1. new HeaderExchangeHandler(handler)
    	//	 2. new DecodeHandler(new HeaderExchangeHandler(handler))
    	//   3. Transporters.bind(url, new DecodeHandler(new 				            
        //   HeaderExchangeHandler(handler)))
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new 
            HeaderExchangeHandler(handler))));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Transporters 的 bind 方法

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
        	// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 获取自适应 Transporter 实例,并调用实例方法
        return getTransporter().bind(url, handler);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 Transporter A d a p t i v e ,也就是自适应拓展类。 T r a n s p o r t e r Adaptive,也就是自适应拓展类。Transporter Adaptive,也就是自适应拓展类。TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。分析NettyTransporter 的 bind 方法。

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    	// 创建 NettyServer
    	return new NettyServer(url, listener);
    }
    
    • 1
    • 2
    • 3
    • 4

    NettyServer

    public class NettyServer extends AbstractServer implements Server {
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            // 调用父类构造方法
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, 
            SERVER_THREAD_POOL_NAME)));
        }
    }
    
    
    public abstract class AbstractServer extends AbstractEndpoint implements Server {
        public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            // 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑
            super(url, handler);
            localAddress = getUrl().toInetSocketAddress();
    
            // 获取 ip 和端口
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, 
                                                  getUrl().getHost());
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, 
                                                 getUrl().getPort());
            if (url.getParameter(Constants.ANYHOST_KEY, false) || 
                NetUtils.isInvalidLocalHost(bindIp)) {
                // 设置 ip 为 0.0.0.0
                bindIp = NetUtils.ANYHOST;
            }
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            // 获取最大可接受连接数
            this.accepts = url.getParameter(Constants.ACCEPTS_KEY, 
                                            Constants.DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 
                                                Constants.DEFAULT_IDLE_TIMEOUT);
            try {
                // 调用模板方法 doOpen 启动服务器
                doOpen();
            } catch (Throwable t) {
                throw new RemotingException("Failed to bind ");
            }
    
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension();
            executor = (ExecutorService)dataStore.get(Constants.
                       EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
        }
        
        protected abstract void doOpen() throws Throwable;
    
        protected abstract void doClose() throws Throwable;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    关注 doOpen 抽象方法

    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        // 创建 boss 和 worker 线程池
        ExecutorService boss = Executors.newCachedThreadPool(new 
            NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new          					NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, 
            getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, 
                                          Constants.DEFAULT_IO_THREADS));
        
        // 创建 ServerBootstrap
        bootstrap = new ServerBootstrap(channelFactory);
    
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        bootstrap.setOption("child.tcpNoDelay", true);
        // 设置 PipelineFactory
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), 
                                                                  NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // 绑定到指定的 ip 和端口上
        channel = bootstrap.bind(getBindAddress());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

  • 相关阅读:
    CentOS 7 源码安装 Zabbix 6.0
    pdf怎么转换成ppt呢?看完每个人都会
    qt 虚拟键盘中的几个瑕疵
    Mac搭建vue环境
    小程序如何实现下拉刷新
    店铺营收未达标,是客单价的问题吗?
    【Json】——jsoncpp的序列化以及反序列化
    【光学】Matlab模拟等倾干涉仿真
    数据库设计(火车订票系统)
    Dubbo源码篇07---SPI神秘的面纱---原理篇---下
  • 原文地址:https://blog.csdn.net/xianghanscce/article/details/126065558