• dubbo 核心源码分析


    Dubbo 核源码分析

    Dubbo的有很多的设计值得学习和借鉴。需要理解的几个点:

    • SPI 机制
    • 自适应扩展点
    • Ioc 和 Aop
    • Dubbo 如何与 Spring 集成

    Dubbo 核心之SPI

    ​ 在Dubbo的源码中,很多地方会存在下面这样三种代码,分别是自适应扩展点,指定名称的扩展点,激活扩展点:

    ExtensionLoader.getExtensionLoader(XXX.class).getAdaptiveExtension();
    ExtensionLoader.getExtensionLoader(XXX.class).getExtension(name);
    ExtensionLoader.getExtensionLoader(XXX.class).getActivateExtension(url,key);
    
    • 1
    • 2
    • 3

    ​ 这种扩展点实际上就是Dubbo 中的 SPI机制。关于SPI,不知道大家是否还有印象,Springboot 自动装配的SpringFactoiesLoader,它也是一种SPI机制。

    java SPI 扩展点实现

    SPI 全称是 Service Provider Interface,原本是JDK 内置的一种服务提供发现机制,它主要用来做服务的扩展实现。SPI 机制在很多场景中都有运用,比如数据库连接,JDK提供了 java.sql.Driver 接口,这个驱动类在JDK中并没有实现,而是由不同的数据库厂商来实现,比如Oracle、Mysql这些数据库驱动包都会实现这个接口,然后JDK利用SPI 机制从classpath 下找到相应的驱动来获取得到指定数据库的连接。这种插拔式的扩展加载方式,也同样遵循一定的协议约定,比如所有的扩展点必须要放在 resources/META-INF/services 目录下,SPI机制会默认扫描这个路径下的属性文件来完成加载。

    下面举个栗子:

    • 创建一个普通的Maven 工程 Driver,定义一个接口。这个接口只是一个规范,并没有实现,由第三方厂商来提供实现。
    public interface Driver {
        String connect();
    }
    
    • 1
    • 2
    • 3
    • 创建另一个普通的Maven工程 Mysql-Driver,添加Driver 的 Maven依赖
    <dependency>
        <groupId>com.scmpe.book.spi</groupId>
        <artifactId>Driver</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 创建MysqlDriver,实现Driver接口,这个接口表示一个第三方的扩展实现
    public class MysqlDriver implements Driver {
        @Override
        public String connect() {
            return "连接Mysql 数据库";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 在 resources/META-INF/services 目录下穿件一个以Driver 接口全路径命名的文件com.scmpe.book.spi.Driver ,在里面填写这个Driver的实现类

      com.scmpe.book.spi.MysqlDriver
      
      • 1
    • 创建一个测试类,使用ServiceLoader 加载

    @Test
    public void connectTest() {
        ExtensionLoader<Driver> extensionLoader = ExtensionLoader.getExtensionLoder(Driver.class);
        Driver driver = extensionLoader.getExtension("mysqlDriver");
        System.out.println(driver.connect());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Dubbo SPI 扩展点源码

    ​ 前面我们用 ExtensionLoader.getExtensionLoader.getExtension() 演示了Dubbo中 SPI 用法,下面我们基于这个方法来分析Dubbo源码中是如何实现SPI的.

    ​ 这段代码分为两个部分:首先我们通过ExtensionLoader.getExtensionLoader 来获得一个ExtensionLoader 实例,然后通过getExtension() 方法获得指定名称的扩展点。先来分析第一部分。

    ExtensionLoder.getExtensionLoader

    这个方法用于返回一个ExtensionLoader实例,主要逻辑为:

    1. 先从缓存中获取与扩展类对应的ExtensionLoader;
    2. 如果缓存未命中,则创建一个新的实例,保存到EXTENSION_LOADERS集合中缓存起来。
    3. 在 ExtensionLoader 构造方法中,初始化一个objectFactory,后续会用到,暂时先不管
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        // 省略部分代码
        ExtensionLoder<T> loder = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loder == null) {
            EXTENSION_LOADERS.putIfAbsent(type,new ExtensionLoader<T>(type));
            loder = (ExtensinoLoder<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    //构造方法
    private ExtensionLoader(Class<?> type) {
        this.type = type;
        ObjectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    getExtension()

    ​ 这个方法用于 根据指定名称获得对应的扩展点并返回。在前面的演示案例中,如果name是mysqlDriver,那么返回的实现类应该MysqlDriver。

    • name 用于参数的判断,其中,如果 name=“true”, 则返回一个默认的扩展实现。

    • 创建一个Holder对象,用户缓存该扩展点的实例。

    • 如果缓存中不存在,则通过createExtension(name) 创建一个扩展点。

    public T getExtension(String name) {
        if(StringUtils.isEpty(name)) {
            throw new IllegalArgumentException("Extension name = null");
        }
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        // 创建或者返回一个Holder对象,用于缓存实例
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        if (instance == null) { //如果缓存不存在,则创建一个实例
     		synchronized (holder) {
                instance = holder.get();
    			if (instance == null) {
                    instance = createExtension(name);
                    holder.set(instance);
                }            
            }       
           
        }
        return (T)instance;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    上面代码的意思就是先查缓存,缓存未命中,则创建一个扩展对象。 不难猜出,createExtension() 应该就是去指定路径下找name对应的扩展点的实现,并且实例化之后返回。

    • 通过 getExtensionClasses().get(name)获取一个扩展类
    • 通过反射实例化之后缓存到EXTENSION_INSTANCES集合中。
    • injectExtension 实现依赖注入
    • 把扩展类对象通过Wrapper进行包装。
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clzz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz,clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            //依赖注入
            injectExtension(instance);
            //通过Wrapper包装
            Set<Class<?>> WrapperClasses = cachedWrapperClasses;
            if (CollectUtils.isNotEmpty(wrapperClasses)) {
            	for(Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) WrapperClass.getConstructor(type).newInstance(instance))
                }
            }
            initExtension(instance);
            return instance;
        } catch(Throwable t) {
            throw new IllegalStateException()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 从·缓存中获取已经被加载的扩展类
    • 如果未命中缓存,则调用loadExtensionClasses 加载扩展类
    private Map<String,Class<?>> getExtensionClasses() {
        Map<String,Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Dubbo 中代码实现套路基本差不多,先访问缓存,缓存未命中再通过loadExtensionClasses加载2扩展类,这个方法主要做两件事。

    • 通过cacheDefaultExtensionName 方法回去当前扩展接口的默认扩展对象,并且缓存
    • 同通过过loadDirectory 方法加载指定文件目录下的配置文件。
    private Map<String,Class<?>> loadExtensionclasses() {
    	cacheDefaultExtensionName();//获得当前type接口默认的扩展类
    	Map<String,Class<?>> extensionclasses = new HashMap<>();
        //解析指定路径下的文件
        loadDirectory(extensionclasses, DUBBO_INTERNAL_DIRECTORY, type.getName(), true);
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"),true);
        loadDirectory(extensionclasses,DUBBO_DIRECTORY,type.getName());
        loadDirectory(extensionClasses, DUBBO_DIRECTORY,type.getName().replace("org.apache","com.alibaba"));
        loadDirectory(extensionclasses,SERVICES_DIRECTORY,type.getName());
        loadDirectory(extensionclasses,SERVICES_DIRECTORY,type.getName().replace("org.apache","com.alibaba"));
        return extensionClasses;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    loadDirectory 方法的逻辑比较简单,就是从指定目录下,根据传入的type全路径名找到对应的文件,解析内容后加载并保存到extensionClasses集合中。

    cacheDefaultExtensionName 方法也比较简单,但是它和业务有一定的关系。

    • 获得指定扩展接口的@SPI 注解
    • 得到@SPI 注解中的名字,保存到cachedDefaultName属性中。
    private void cacheDefaultExtensionName() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation == null) {
            return;
        }
        // 得到注解中的value值
        String value = defaultAnnotation.value();
        if (value = value.tirm().lenth > 0) {
            String[] names = NAME_SEPARATOR.spilt(value);
            if (names.length > 1) {
                throw new IllegalStateException()
            }
            if (names.lenth == 1) {
                cachedDefault = names[0];
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    以 Dubbo 中的org.apache.dubbo.rpc.Protocol 为例,在@SPI 注解中有一个默认值dubbo,这意味着如果没有显式的指定协议类型,默认采用Dubbo协议来发布服务。

    @SPI("dubbo")
    public interface Protocol {
        //...
    }
    
    • 1
    • 2
    • 3
    • 4

    这便是dubbo中指定名称的扩展类加载流程。

    自适应扩展点

    ​ 自适应扩展点可以理解为适配器扩展点。简单来说就是能够根据上下文动态配一个扩展类。

    ExtensionLoder.getExtensionLoader(class).getAdaptiveExtension();
    
    • 1

    自适应扩展点通过@Adaptive注解来声明,它有两种使用方式

    • Adaptive 注解定义在类上面,表示当前类为自适应扩展类

      @Adaptive
      public class AdativeCompiler implents Compiler {
          // 省略
      }
      
      • 1
      • 2
      • 3
      • 4

      AdaptiveCompiler 类就是自适应扩展类,通过 ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();可以返回AdaptiveCompiler类的实例。

    • @Adaptive 注解定义在方法层面,会通过动态代理的方式生成一个动态字节码,进行自适应匹配。、。

    public interface Protocol {
        int getDefaultPort();
        
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
        @Adaptive
        <T> Invoker<T> refer(Class<T> type,URL url) throws RpcException;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Protocol 扩展类中的两个方法声明了 @Adaptive 注解,意味着这是一个自适应方法。在Dubbo 源码中有很多地方通过下面这行代码来获得一个自适应扩展点

    Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    
    • 1

    接下来,基于Protocol 的自适应扩展点方法 ExtensionLoader.getExtensinoLoader(Protocol.class).getAdaptiveExtensino() 来分析它的源码实现。

    从源码来看,getAdaptiveExtension 方法非常简单,只做了两件事:

    • 从缓存中获取自适应扩展点实例。
    • 如果缓存未命中,则通过createAdaptiveExtension 创建自适应扩展点。
    public T getAdaptiveExtension() {
        Object instance = this.cachedAdaptiveInstance.get();
        if (instance == null) {
            if (this.createAdaptiveInstanceError != null) {
                throw new IllegalStateException();
            }
            // 创建自适应扩展点实例,并放置到缓存中
            synchronized(this.cachedAdativeInstance) {
                instance = this.cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = this.createAdaptiveExtension();
                        this.cachedAdaptiveInstance.set(instance);
                    }
                    }catch(Throwable var5) {
                    this.createAdaptiveInstanceError = var5;
                    throw new IllegalstateException("Failed to create adaptive instance:+var5.toString(), var5);
    
                }
                
            }
        }
        return instance;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    按照之前对于自适应扩展点的分析,可以基本猜测出createAdaptiveExtension 方法的实现机制,、

    • getAdaptiveExtensionClasses 获取一个自适应扩展类的实例
    • injectExtension完成依赖注入
    private T createAdaptiveExtension( {
        try {
        return this.injectExtension(this.getAdaptiveExtensionClass().newInstance());}catch (Exception var2){
        throw new IllegalStateException("can't create adaptive extension " + this.type +", cause:" +var2.getMessage(), var2);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    injectExtension后面再分析,先看getAdaptiveExtensinoClass.

    • 通过 getExtensionClasses 方法加载当前春如类型的所有扩展点,缓存到一个集合里面
    • 如果 cachedAdaptiveClass为空,则调用createAdaptiveExtensionCalss 进行创建
    private Class<?> getAdaptiveExtensionClass() {
        this.getExtensionClasses();
        return this.cachedAdaptiveClass != null ? this.cachedAdaptiveClass : (this.cachedAdaptiveClass = this.createAdaptiveExtensionClass());
    }
    
    • 1
    • 2
    • 3
    • 4

    getExtensionClasses方法之前讲过,直接看createAdaptiveExtensionClass 方法,它涉及动态字节码的生成·和加载。

    • code 是一个动态拼接的类。
    • 通过Compiler 进行·动态编译。
    private Class<?> createAdaptiveExtensionClass() {
        String code = (new AdaptiveClassCodeGenerator(this.type,this.cachedDefaultName)).generate();
        ClassLoader classLoader = findClassLoader();
        Compiler compiler = (Compiler)getExtensionLoader(Compiler.class).getAdaptiveExtension();
        return compiler.compile(code,classLoader);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在基于 Protocol接口的自适应扩展点加载中,此时code拼接的字符串如下(为了排版美观,去掉了一些无用的代码))。

    public class Protocol$Adaptive implements Protocol {
        //省略部分代码
        public Exporter export(Invoker arg0) throws org.apache. dubbo.rpc.RpcException {
        if (arge == null) throw new IllegalArgumentException("Invoker argument == null");
        if (arg0.getUrl()== nul1)
        	throw new IllegalArgumentException("Invoker argument getUrl() - null");
        URL url = arg0.getUr1();
        String extName = (url.getProtocol() == null ?"dubbo":url.getProtoco1());\
        if (extName == null)
        	throw new IllegalstateException("Failed to get extension (Protocol) name from”+url.toString() +")use keys([protocol])");
        //根据名称获得指定扩展点
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension. export(arg0);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    public Invoker refer(Class arg0,URL arg1) throws RpcException {
        if (arg1 ==null) throw new IllegalArgumentException("url == null");
        URL url = arg1;
       	String extName =(url.getProtocol()== null ? "dubbo":url.getProtocol());
        if (extName == null) throw new IllegalstateException("Failed to get extension (Protocol) name from ur1("+ url.toString() +") use keys([protocol])");
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName):
        return extension.refer(arg0,arg1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Protocol$Adaptive 是一个动态生成的自适应扩展类,可以按照下面这种方式使用:

    Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    protocol.export( ...);
    
    • 1
    • 2

    当调用 protocol.export() 时,实际上会调用Protocol$Adaptive类中的export方法。而这个方法,无非就是根据Dubbo服务配置的协议名称,通过getExtension获得相应的扩展类。

    Dubbo 中的 IOC 和 AOP

    IOC 和 AOP 我们并不陌生,他是Spring Framework中的核心功能,实际上Dubbo中也用到了这两种机制。下面从源码层面逐个分析这两种机制的体现。

    IOC

    ​ IoC 中一个非常重要的思想是,系统运行时,动态地向某个对象提供它需要的其他对象,这种机制是通过DI(依赖注入)实现的。

    在分析Dubbo SPI机制时,createExtension方法中有一段代码如下:

    private T createExtension(String name) {
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T)EXTENSION_INSTANCES.get(clazz);
            }
    		injectExtension(instance);
            //省略部分代码
            return instance
        }catch(Throwable t) {
            //省略部分代码
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    injectExtension就是依赖注入的实现,整体逻辑比较简单。

    • 遍历被加载的扩展类中所有set方法
    • 得到set方法中的参数类型,如果参数类型是对象类型,则获得这个set方法中的属性名称。
    • 使用自适应扩展点加载该属性名对应的扩展类。
    • 调用set方法完成赋值。
    private T injectExtension(T instance) {
        if (objectFactory == null) {
            return instance;
        }
       	try {
            for (Method method : instance.getClass().getMethods()) {
                if (!isSetter(method)) {
                    continue;
                }
                if (method.getAnnotation(DisableInject.class) != null) {
                    continue;
                }
                // 获得扩展类中方法的参数类型
                Class<?> pt = method.getParameterTypes()[0];
                // 如果不是对象类型,跳过
                if (ReflectUtils.isPrimitives(pt)) {
                    continue;
                }
                try {
                    // 获取方法对应的属性名称
                    String property = getSetterProperty(method);
                    // 根据class及name,使用自适应扩展点加载并且通过set方法进行赋值
                    Object object = objectFactory.getExtension(pt,property);
                    if (object != null) {
                        method.invoke(instance,object);
                    }
                }catch(Exception e) {
                   logger.error("Failed to inject via method " + method.getName()+"of interface " + type.getName() +":" +e.getMessage(),e);
            	} 
            }catch(Exception e) {
                    logger.error(e.getMessage,e);
            }
            return instance;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    总结一下,injectExtension主要功能是,如果当前加载的扩展类中存在一个需要注入的对象,(该对象必须提供setter方法)那么就会通过自适应扩展点加载并赋值。

    以org.apache.dubbo.registry.integration.RegistryProtocol为例,它里面就有一个Protocol成员对象,并且为它提供了setProtocol方法,那么当RegistryProtocol扩展类被加载时,就会自动注入protocol成员属性的实例。

    public class RegistryProtocol implements Protocol {
        //省略部分代码
        private Protocol protocol;
    	public void setProtocol(Protocol protocol) {
            this.protocol = protocol;
        }
        //省略部分代码
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    AOP

    ​ AOP全称为Aspect Oriented Programming,意思是面向切面编程,它是一种思想或者编程范式。它的主要意图是把业务逻辑和功能逻辑分离,然后再运行期间或者类加载期间进行织入。这样做的好处是,可以降低代码的复杂性,提高重用性。

    ​ 在Dubbo API机制中,同样在ExtensionLoader类中的createExtension 方法中体现了AOP的设计思想。

    private T createExtension(String name){
        //..
        try {
            //...
            Set<class<?>> wrapperClasses = cachedwrapperClasses;if (collectionutils.isNotEmpty(wrapperclasses)){
            for (class<?> wrapperClass : wrapperClasses){
                instance = injectExtension((T) wrapperclass.getConstructor(type).newInstance(instance));
                }
            }
            initExtension( instance);
            return instance;
        }catch (Throwable t){
            //...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这段代码再前面的章节中讲过,仔细分析下下面这行代码

    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
    
    • 1

    其中分别用了依赖注入和AOP思想,AOP 思想的体现是基于 Wrapper 装饰器类实现对原有的扩展类instance 进行包装。

  • 相关阅读:
    java 业务幂等解决方案
    视频讲解vue2基础之style样式class类名绑定
    面试必问JVM篇
    关于 “原型” 的那些事你真的理解了吗?【上篇】
    Postman连接MySQL数据库并操作
    【ROS进阶篇】第六讲 ROS中的录制与回放(rosbag)
    【计算机视觉】3D视觉
    【原创教程】埃斯顿机器人:弯管机推力解决方式(上)
    从源码看vue(v2.7.10)中的v-model(双向绑定)的原理
    经典算法试题(二)
  • 原文地址:https://blog.csdn.net/qq_45473439/article/details/125377403