• dubbo源码解析之服务调用(通信)流程


    文章系列

    一、dubbo源码解析之框架粗谈
    二、dubbo源码解析之dubbo配置解析
    三、dubbo源码解析之服务发布与注册
    四、dubbo源码解析之服务发现
    五、dubbo源码解析之服务调用(通信)流程
    六、dubbo获取服务提供者IP列表

    一、服务发布

    dubbo源码解析之服务发布与注册 一文中,存在步骤 4.4.3 服务发布,通过 DubboProtocol.export() 暴露一个本地端口,用于监听并处理客户端连接请求。

    public class DubboProtocol extends AbstractProtocol {
    	@Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            URL url = invoker.getUrl();
    
            // export service.
    		
    		// key=serviceClassName + : + port,如com.example.demo.provider.DemoProvider:20880
            String key = serviceKey(url);
            DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
            exporterMap.addExportMap(key, exporter);
    
            //export an stub service for dispatching event
            Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
            Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
            if (isStubSupportEvent && !isCallbackservice) {
                String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
                if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                                "], has set stubproxy support event ,but no stub methods founded."));
                    }
    
                }
            }
    
    		// 打开服务器:暴露一个本地端口,用于监听并处理客户端连接请求。
            openServer(url);
            // 优先采用的序列化算法
            optimizeSerialization(url);
    
            return exporter;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    如果采用Netty进行远程通信,最终会通创建一个 NettyServer 对象。

    1.1 NettyServer

    public class NettyServer extends AbstractServer implements RemotingServer {
    
        private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
        /**
         * the cache for alive worker channel.
         * 
         */
        private Map<String, Channel> channels;
        /**
         * netty server bootstrap.
         */
        private ServerBootstrap bootstrap;
        /**
         * the boss channel that receive connections and dispatch these to worker channel.
         */
    	private io.netty.channel.Channel channel;
    
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
    
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        	// 会在父类的构造函数中,调用 doOpen() 方法:初始化并启动 netty 服务器
            super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
        }
    
        /**
         * 初始化并启动 netty 服务器
         */
        @Override
        protected void doOpen() throws Throwable {
            bootstrap = new ServerBootstrap();
    
            bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
            workerGroup = NettyEventLoopFactory.eventLoopGroup(
                    getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    "NettyServerWorker");
    
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
    
            boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
    
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NettyEventLoopFactory.serverSocketChannelClass())
                    .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // FIXME: should we use getTimeout()?
                            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                                ch.pipeline().addLast("negotiation",
                                        SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                            }
                            ch.pipeline()
                                    .addLast("decoder", adapter.getDecoder())
                                    .addLast("encoder", adapter.getEncoder())
                                    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                    .addLast("handler", nettyServerHandler);
                        }
                    });
            // bind
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    NettyServer 中,主要是初始化并启动了一个 netty 服务器,然后构造了一个 Handler 处理链。

    1.2 ChannlHandler 处理链

    NettyServer 构造函数中,存在代码 ChannelHandlers.wrap(handler, url),返回一个 ChannlHandler 处理链。

    public class ChannelHandlers {
    
        private static ChannelHandlers INSTANCE = new ChannelHandlers();
    
        protected ChannelHandlers() {
        }
    
        public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        	// 调用其 wrapInternal 进行包装 handler
            return ChannelHandlers.getInstance().wrapInternal(handler, url);
        }
    
        protected static ChannelHandlers getInstance() {
            return INSTANCE;
        }
    
        static void setTestingChannelHandlers(ChannelHandlers instance) {
            INSTANCE = instance;
        }
    
    	// 包装 handler
        protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    最终,ChannelHandler 链路如下:

    MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> handler

    在这里插入图片描述
    其中 handler 为 DubboProtocol 中的成员变量。
    在这里插入图片描述

    二、服务发现

    dubbo源码解析之服务发现 一文中,通过步骤 3.3 Reference.createProxy() 创建了并返回了一个代理对象(最终创建 Invoker 对象为 DubboInvoker)。

    最终,创建的代理类结构如下:

    在这里插入图片描述

    三、服务调用(通信)流程

    3.1 客户端代理类执行链路

    在 DubboInvoker 打一个断点,通过对其执行链路分析,最终调用链路如下:

    // 代理对象Invoker方法
    1. org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke(Object proxy, Method method, Object[] args)
    
    // 调用
    2. result = invoker.invoke(rpcInvocation)
    	// 故障转移
    	2.1 MigrationInvoker#invoke(Invocation invocation)
    	// Mock
    	2.2 MockClusterInvoker#invoke(Invocation invocation)
    	// 集群拦截器:包含一些前置、后置拦截器
    	2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)
    		// 执行拦截方法
    		2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation)
    		2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)
    		// 集群容错模式
    		2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
    		// 包装
    		2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)
    		// 过滤器
    		2.3.5 FilterNode#invoke(Invocation invocation)
    			  2.3.5.1 ConsumerContextFilter
    			  2.3.5.2 FutureFilter
    			  2.3.5.3 MonitorFilter
    		// 添加监听
    		2.3.6 ListenerInvokerWrapper
    		// 异步、同步处理器
    		2.3.7 AsyncToSyncInvoker
    	// dubbo调用
    	2.4 DubboInvoker
    
    // 获取结果集
    3. result.recreate()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    1.InvokerInvocationHandler#invoke

    通过 JavassistProxyFactory 生成,采用 JDK 动态代理,生成过程如下:

    public class JavassistProxyFactory extends AbstractProxyFactory {
        @Override
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可见,InvokerInvocationHandler 一定实现了 InvocationHandler 接口,最终,执行对象方法,都会进入到其 Object invoke(Object proxy, Method method, Object[] args) 中。

    public class InvokerInvocationHandler implements InvocationHandler {
    	@Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        	// Object方法直接执行
            if (method.getDeclaringClass() == Object.class) {
                return method.invoke(invoker, args);
            }
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 0) {
                if ("toString".equals(methodName)) {
                    return invoker.toString();
                } else if ("$destroy".equals(methodName)) {
                    invoker.destroy();
                    return null;
                } else if ("hashCode".equals(methodName)) {
                    return invoker.hashCode();
                }
            } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
                return invoker.equals(args[0]);
            }
    
    		/**
    		 * 构建 rpc 调用对象:核心对象,用于在rpc调用的时候进行传输
    		 * method:需要调用的方法
    		 * invoker.getInterface().getName():调用方法所属接口ClassName,
    		 *                                   如com.example.demo.provider.TestProvider
    		 * protocolServiceKey:如com.example.demo.provider.TestProvider:dubbo
    		 * args:方法实参
    		 */
            RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
            
            // 目标服务名称:如com.example.demo.provider.TestProvider
            String serviceKey = invoker.getUrl().getServiceKey();
            rpcInvocation.setTargetServiceUniqueName(serviceKey);
    
            // invoker.getUrl() returns consumer url.
            // 为当前上下文添加url参数:如dubbo://127.0.0.1/com.example.demo.provider.TestProvider?省略其他参数...
            RpcContext.setRpcContext(invoker.getUrl());
    
            if (consumerModel != null) {
            	// 添加属性
                rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
                rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
            }
    
    		// 调用:invoker.invoke(rpcInvocation)
    		// 获取结果:result.recreate()
            return invoker.invoke(rpcInvocation).recreate();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    2. result = invoker.invoke(rpcInvocation)

    调用。

    2.1 MigrationInvoker#invoke(Invocation invocation)

    故障转移 Invoker。通过在 RegistryProtocol.doRefer() 进行创建,代码如下:

    public class RegistryProtocol implements Protocol {
    	protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
            URL consumerUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            // 创建并获取一个 MigrationInvoker
            ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
            return interceptInvoker(migrationInvoker, url, consumerUrl);
        }
        
        protected <T> ClusterInvoker<T> getMigrationInvoker(RegistryProtocol registryProtocol, Cluster cluster, Registry registry,
                                                            Class<T> type, URL url, URL consumerUrl) {
            return new ServiceDiscoveryMigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    MigrationInvoker

    public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
    	@Override
        public Result invoke(Invocation invocation) throws RpcException {
        	// 检查 serviceDiscoveryInvoker 是否可用,初始不可用
            if (!checkInvokerAvailable(serviceDiscoveryInvoker)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Using interface addresses to handle invocation, interface " + type.getName() + ", total address size " + (invoker.getDirectory().getAllInvokers() == null ? "is null" : invoker.getDirectory().getAllInvokers().size()));
                }
                // 走这里
                return invoker.invoke(invocation);
            }
    
        	// 检查 invoker 是否可用
            if (!checkInvokerAvailable(invoker)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Using instance addresses to handle invocation, interface " + type.getName() + ", total address size " + (serviceDiscoveryInvoker.getDirectory().getAllInvokers() == null ? " is null " : serviceDiscoveryInvoker.getDirectory().getAllInvokers().size()));
                }
                return serviceDiscoveryInvoker.invoke(invocation);
            }
    
            return currentAvailableInvoker.invoke(invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    2.2 MockClusterInvoker#invoke(Invocation invocation)

    执行 Mock 逻辑,在 RegistryProtocol.doCreateInvoker() 中进行创建,代码如下:

    public class RegistryProtocol implements Protocol {
    	protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
            URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
            if (directory.isShouldRegister()) {
                directory.setRegisteredConsumerUrl(urlToRegistry);
                registry.register(directory.getRegisteredConsumerUrl());
            }
            directory.buildRouterChain(urlToRegistry);
            directory.subscribe(toSubscribeUrl(urlToRegistry));
    		
    		// 创建并返回 Invoker
    		// cluster 为ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap) 获取的对象
    		//默认情况下name=Cluster.DEFAULT=failover  wrap=true
            return (ClusterInvoker<T>) cluster.join(directory);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    所以,cluster 对象为 MockClusterWrapper(FailoverCluster) ,当执行 cluster.join(directory) 逻辑,最终会先到 MockClusterWrapperjoin() 方法中,进行包装增强。

    public class MockClusterWrapper implements Cluster {
    
        private Cluster cluster;
    
        public MockClusterWrapper(Cluster cluster) {
            this.cluster = cluster;
        }
    
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        	// 创建一个 MockClusterInvoker
            return new MockClusterInvoker<T>(directory,
                    this.cluster.join(directory));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    MockClusterWrapper 中,会创建一个 MockClusterInvoker

    public class MockClusterInvoker<T> implements ClusterInvoker<T> {
    	@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result result = null;
    
    		// 获取url中的 mock 参数
            String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
            if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
                // no mock
                result = this.invoker.invoke(invocation);
            } else if (value.startsWith("force")) {
                if (logger.isWarnEnabled()) {
                    logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
                }
                //force:direct mock
                // force 模式:直接返回mock数据
                result = doMockInvoke(invocation, null);
            } else {
                //fail-mock
                // 异常执行mock逻辑
                try {
                    result = this.invoker.invoke(invocation);
    
                    //fix:#4585
                    if(result.getException() != null && result.getException() instanceof RpcException){
                        RpcException rpcException= (RpcException)result.getException();
                        if(rpcException.isBiz()){
                            throw  rpcException;
                        }else {
                            result = doMockInvoke(invocation, rpcException);
                        }
                    }
    
                } catch (RpcException e) {
                    if (e.isBiz()) {
                        throw e;
                    }
    
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
            return result;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    2.3 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation)

    集群拦截器:包含一些前置、后置拦截器

    在 MockClusterWrapper 构造函数中创建 MockClusterInvoker 时,会通过 this.cluster.join(directory) 创建一个 invoker 对象返回,如下:

    new MockClusterInvoker<T>(directory, this.cluster.join(directory));
    
    • 1

    其中,this.cluster 为 dubbo SPI 注入的一个 cluster 对象,默认为 FailoverCluster

    public class FailoverCluster extends AbstractCluster {
    
        public static final String NAME = "failover";
    
        @Override
        public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<>(directory);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    当调用其 join(directory) 方法时,会进入 AbstractCluster#invoke(directory) 中,如下:

    public abstract class AbstractCluster implements Cluster {
    	@Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        	// 构建集群拦截器
            return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
        }
        
        /**
       	 * 构建集群拦截器 
       	 * doJoin(directory):模版方法设计模式,执行子类的doJoin方法,默认情况下返回一个 FailoverClusterInvoker
       	 * directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY):获取url中reference.interceptor参数值
       	 */
        private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
            AbstractClusterInvoker<T> last = clusterInvoker;
            // SPI
            List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);
    
            if (!interceptors.isEmpty()) {
                for (int i = interceptors.size() - 1; i >= 0; i--) {
                    final ClusterInterceptor interceptor = interceptors.get(i);
                    final AbstractClusterInvoker<T> next = last;
                    // 责任链模式,返回一个 InterceptorInvokerNode
                    last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
                }
            }
            return last;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    InterceptorInvokerNode 为 AbstractCluster 中的一个内部类,所以最终执行 invoke() 方法时,进入 AbstractCluster.InterceptorInvokerNode#invoke(Invocation invocation) 中,如下:

    public abstract class AbstractCluster implements Cluster {
    	protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
    		@Override
            public Result invoke(Invocation invocation) throws RpcException {
                Result asyncResult;
                try {
                	// 前置拦截处理器
                    interceptor.before(next, invocation);
                    // 执行拦截方法,返回一个异步结果
                    asyncResult = interceptor.intercept(next, invocation);
                } catch (Exception e) {
                    // onError callback
                    if (interceptor instanceof ClusterInterceptor.Listener) {
                        ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                        listener.onError(e, clusterInvoker, invocation);
                    }
                    throw e;
                } finally {
                	// 后置拦截处理器
                    interceptor.after(next, invocation);
                }
                // 异步 CallBack
                return asyncResult.whenCompleteWithContext((r, t) -> {
                    // onResponse callback
                    if (interceptor instanceof ClusterInterceptor.Listener) {
                        ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                        if (t == null) {
                            listener.onMessage(r, clusterInvoker, invocation);
                        } else {
                            listener.onError(t, clusterInvoker, invocation);
                        }
                    }
                });
            }
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    2.3.1 ClusterInterceptor#intercept(AbstractClusterInvoker clusterInvoker, Invocation invocation)

    执行拦截方法,返回一个异步结果。

    @SPI
    public interface ClusterInterceptor {
    
        void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
    
        void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
    
        default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
        	// 调用invoke,返回一个异步结果。
            return clusterInvoker.invoke(invocation);
        }
    
        interface Listener {
    
            void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
    
            void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    2.3.2 AbstractClusterInvoker#invoke(final Invocation invocation)

    AbstractClusterInvoker#invoke(final Invocation invocation) 方法中存在几个重要步骤:

    1. 获取所有 invoker
    2. 初始化负载均衡算法
    3. 执行invoke
    public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
    	@Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // binding attachments into invocation.
            // 绑定当前上下文中其他参数
            Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
            }
    
    		// step1:获取所有 invoker
            List<Invoker<T>> invokers = list(invocation);
    
    		// step2:初始化负载均衡算法
            LoadBalance loadbalance = initLoadBalance(invokers, invocation);
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            
            // step3:执行invoke
            return doInvoke(invocation, invokers, loadbalance);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    step1:获取所有 invoker

    从 directory 中获取所有可调用的 Invoker 对象

    • DynamicDirectory:动态
    • StaticDirectory:静态

    step2:初始化负载均衡算法

    protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation 	invocation) {
       if (CollectionUtils.isNotEmpty(invokers)) {
       		// 默认随机
            return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                   .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
       } else {
       		// 默认随机
            return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    dubbo 内置负载均衡算法如下:

    • Random LoadBalance:随机,按权重设置随机概率(默认随机)。
    • RoundRobin LoadBalance:轮询,按公约后的权重设置轮询比率。
    • LeastActive LoadBalance:最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
    • ConsistentHash LoadBalance:一致性 Hash,相同参数的请求总是发到同一提供者。

    step3:执行invoke

    AbstractClusterInvoker 采用了抽象模版方法设计模式,实现类有如下(对应dubbo集群容错模式):

    • FailoverClusterInvoker:失败自动切换,当出现失败,重试其它服务器。
    • FailfastClusterInvoker:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
    • FailsafeClusterInvoker:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
    • FailbackClusterInvoker:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
    • ForkingClusterInvoker:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。
    • BroadcastClusterInvoker:广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

    默认采用 FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器。

    2.3.3 FailoverClusterInvoker#doInvoke(Invocation invocation, final List invokers, LoadBalance loadbalance)

    FailoverClusterInvoker 失败自动切换,当出现失败,重试其它服务器

    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    	/**
    	 * 执行
    	 * @param invocation:方法调用元数据
    	 * @param Invokers:当前方法对应的所有url
    	 * @param LoadBalance:负载均衡算法
    	 */
    	@Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            int len = calculateInvokeTimes(methodName);
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
    	
    		// 失败自动切换,当出现失败,重试其它服务器
    		// len 默认为3,也就是说,执行一次,异常最多重试2次
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                
                // 根据负载均衡算法、invoker 列表 获取一个invoker 对象
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                
                	// 执行调用 invoke
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName
                                + " in the service " + getInterface().getName()
                                + " was successful by the provider " + invoker.getUrl().getAddress()
                                + ", but there have been failed providers " + providers
                                + " (" + providers.size() + "/" + copyInvokers.size()
                                + ") from the registry " + directory.getUrl().getAddress()
                                + " on the consumer " + NetUtils.getLocalHost()
                                + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                                + le.getMessage(), le);
                    }
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method "
                    + methodName + " in the service " + getInterface().getName()
                    + ". Tried " + len + " times of the providers " + providers
                    + " (" + providers.size() + "/" + copyInvokers.size()
                    + ") from the registry " + directory.getUrl().getAddress()
                    + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                    + Version.getVersion() + ". Last error is: "
                    + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    在 FailoverClusterInvoker 的 doInvoke() 方法中,会根据负载均衡算法,从 invokers 列表中选举出一个可执行的 invoke 对象,进而执行其 invoke() 方法。

    其中,invoker 对象为 RegistryDirectory.InvokerDelegate(FilterNode(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))),创建过程如下:

    public class RegistryDirectory<T> extends DynamicDirectory<T> {
    	private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
    		// 省略其他代码.....
    	
    		invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
    		
    		// 省略其他代码.....
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    2.3.4 RegistryDirectory.InvokerDelegate#invoke(Invocation invocation)

    包装。

    public class RegistryDirectory<T> extends DynamicDirectory<T> {
    	private static class InvokerDelegate<T> extends InvokerWrapper<T> {
            private URL providerUrl;
    
            public InvokerDelegate(Invoker<T> invoker, URL url, URL providerUrl) {
                super(invoker, url);
                this.providerUrl = providerUrl;
            }
    
            public URL getProviderUrl() {
                return providerUrl;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    InvokerDelegate 为 RegistryDirectory 中的一个私有内部类,执行其 invoke() 方法,最终会调用其父类的 invoke() 方法,如下:

    public class InvokerWrapper<T> implements Invoker<T> {
    	@Override
        public Result invoke(Invocation invocation) throws RpcException {
        	// invoker  ->   FilterNode
            return invoker.invoke(invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2.3.5 FilterNode#invoke(Invocation invocation)

    过滤器节点。

    class FilterNode<T> implements Invoker<T>{
    	@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
            	// 执行其过滤链
                asyncResult = filter.invoke(next, invocation);
            } catch (Exception e) {
                if (filter instanceof ListenableFilter) {
                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
                    try {
                        Filter.Listener listener = listenableFilter.listener(invocation);
                        if (listener != null) {
                            listener.onError(e, invoker, invocation);
                        }
                    } finally {
                        listenableFilter.removeListener(invocation);
                    }
                } else if (filter instanceof Filter.Listener) {
                    Filter.Listener listener = (Filter.Listener) filter;
                    listener.onError(e, invoker, invocation);
                }
                throw e;
            } finally {
    
            }
            return asyncResult.whenCompleteWithContext((r, t) -> {
                if (filter instanceof ListenableFilter) {
                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
                    Filter.Listener listener = listenableFilter.listener(invocation);
                    try {
                        if (listener != null) {
                            if (t == null) {
                                listener.onResponse(r, invoker, invocation);
                            } else {
                                listener.onError(t, invoker, invocation);
                            }
                        }
                    } finally {
                        listenableFilter.removeListener(invocation);
                    }
                } else if (filter instanceof Filter.Listener) {
                    Filter.Listener listener = (Filter.Listener) filter;
                    if (t == null) {
                        listener.onResponse(r, invoker, invocation);
                    } else {
                        listener.onError(t, invoker, invocation);
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    其中 filter 对象为 dubbo SPI 扩展点(在 ProtocolFilterWrapper#buildInvokerChain() 方法中进行获取构建。)。

    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    
    • 1
    2.3.5.1 ConsumerContextFilter

    消费者上下文过滤器。

    @Activate(group = CONSUMER, order = -10000)
    public class ConsumerContextFilter implements Filter {
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            RpcContext context = RpcContext.getContext();
            context.setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
                    .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY))
                    .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation) invocation).setInvoker(invoker);
            }
    
            // pass default timeout set by end user (ReferenceConfig)
            Object countDown = context.get(TIME_COUNTDOWN_KEY);
            if (countDown != null) {
                TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
                if (timeoutCountDown.isExpired()) {
                    return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
                            "No time left for making the following call: " + invocation.getServiceName() + "."
                                    + invocation.getMethodName() + ", terminate directly."), invocation);
                }
            }
            return invoker.invoke(invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    2.3.5.2 FutureFilter

    未来结果集处理Filter。

    @Activate(group = CommonConstants.CONSUMER)
    public class FutureFilter implements Filter, Filter.Listener {
    	@Override
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        	// 触发调用回调
            fireInvokeCallback(invoker, invocation);
            // 调用前需要配置是否有返回值,以帮助调用者判断是否需要返回future。
            return invoker.invoke(invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    2.3.5.3 MonitorFilter

    监控。

    @Activate(group = {PROVIDER, CONSUMER})
    public class MonitorFilter implements Filter, Filter.Listener {
    	@Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
                invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getContext().getRemoteHost());
                getConcurrent(invoker, invocation).incrementAndGet(); // count up
            }
            return invoker.invoke(invocation); // proceed invocation chain
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    2.3.6 ListenerInvokerWrapper
    public class ListenerInvokerWrapper<T> implements Invoker<T> {
    	@Override
        public Result invoke(Invocation invocation) throws RpcException {
            return invoker.invoke(invocation);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    2.3.7 AsyncToSyncInvoker

    异步、同步处理器

    public class AsyncToSyncInvoker<T> implements Invoker<T> {
    	@Override
        public Result invoke(Invocation invocation) throws RpcException {
        	// 执行 AbstractInvoker.invoker
            Result asyncResult = invoker.invoke(invocation);
    
            try {
                if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                    /**
                     * NOTICE!
                     * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
                     * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
                     */
                    asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException e) {
                throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (ExecutionException e) {
                Throwable t = e.getCause();
                if (t instanceof TimeoutException) {
                    throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                            invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                } else if (t instanceof RemotingException) {
                    throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                            invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                } else {
                    throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                            invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
                }
            } catch (Throwable e) {
                throw new RpcException(e.getMessage(), e);
            }
            return asyncResult;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    2.4 DubboInvoker

    dubbo调用。

    public class DubboInvoker<T> extends AbstractInvoker<T> {
    	@Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(PATH_KEY, getUrl().getPath());
            inv.setAttachment(VERSION_KEY, version);
    
    		// 获取客户端连接
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
            	// 是否单向:根据有无返回值判断
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                // 超时时间
                int timeout = calculateTimeout(invocation, methodName);
                invocation.put(TIMEOUT_KEY, timeout);
                if (isOneway) { // 无返回值
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    // 通过Client端连接,进行远程通信
                    currentClient.send(inv, isSent);
                    return AsyncRpcResult.newDefaultAsyncResult(invocation);
                } else { // 有返回值
                    ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                    // 通过Client端连接,进行远程通信
                    CompletableFuture<AppResponse> appResponseFuture =
                            currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                    // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                    
                    // 保持异步结果处理对象到当前请求上下文中
                    FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                    AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                    result.setExecutor(executor);
                    // 方法
                    return result;
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    3. result.recreate()

    获取结果集。

    result 为 AsyncRpcResult 对象。

    public class AsyncRpcResult implements Result {
    	@Override
        public Object recreate() throws Throwable {
            RpcInvocation rpcInvocation = (RpcInvocation) invocation;
            if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
                return RpcContext.getContext().getFuture();
            }
    
            return getAppResponse().recreate();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.2 服务端 ChannelHandler 处理链路

    通过对 dubbo 服务发布的分析,最终,在服务收到客户端请求后,ChannelHandler 处理链路如下:

    1. MultiMessageHandler
    2. HeartbeatHandler
    3. AllChannelHandler
    4. DecodeHandler
    5. DubboProtocol.handler
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.2.1 MultiMessageHandler

    多消息处理Handler。

    public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
    
        protected static final Logger logger = LoggerFactory.getLogger(MultiMessageHandler.class);
    
        public MultiMessageHandler(ChannelHandler handler) {
            super(handler);
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
        	// 判断是否为 MultiMessage
            if (message instanceof MultiMessage) {
                MultiMessage list = (MultiMessage) message;
                // 循环调用
                for (Object obj : list) {
                    try {
                        handler.received(channel, obj);
                    } catch (ExecutionException e) {
                        logger.error("MultiMessageHandler received fail.", e);
                        handler.caught(channel, e);
                    }
                }
            } else {
                handler.received(channel, message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3.2.2 HeartbeatHandler

    心跳处理Handler。

    public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
    	@Override
        public void received(Channel channel, Object message) throws RemotingException {
            setReadTimestamp(channel);
            
            // 判断是否为心跳请求
            if (isHeartbeatRequest(message)) {
                Request req = (Request) message;
                if (req.isTwoWay()) {
                    Response res = new Response(req.getId(), req.getVersion());
                    res.setEvent(HEARTBEAT_EVENT);
                    channel.send(res);
                    if (logger.isInfoEnabled()) {
                        int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                    + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                        }
                    }
                }
                return;
            }
            if (isHeartbeatResponse(message)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
                }
                return;
            }
            handler.received(channel, message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    3.2.3 AllChannelHandler

    处理客户端调用请求。

    public class AllChannelHandler extends WrappedChannelHandler {
    	@Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getPreferredExecutorService(message);
            try {
            	// 异步执行一个 ChannelEventRunnable
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
            	if(message instanceof Request && t instanceof RejectedExecutionException){
                    sendFeedback(channel, (Request) message, t);
                    return;
            	}
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    ChannelEventRunnable
    public class ChannelEventRunnable implements Runnable {
    @Override
        public void run() {
        	// 处理客户端请求
            if (state == ChannelState.RECEIVED) {
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            } else {
                switch (state) {
                case CONNECTED: // 连接
                    try {
                        handler.connected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case DISCONNECTED: // 断开连接
                    try {
                        handler.disconnected(channel);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                    }
                    break;
                case SENT: // 发送
                    try {
                        handler.sent(channel, message);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                    }
                    break;
                case CAUGHT:
                    try {
                        handler.caught(channel, exception);
                    } catch (Exception e) {
                        logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                    }
                    break;
                default:
                    logger.warn("unknown state: " + state + ", message is " + message);
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    3.2.4 DecodeHandler

    解码Handler。

    public class DecodeHandler extends AbstractChannelHandlerDelegate {
    
        private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);
    
        public DecodeHandler(ChannelHandler handler) {
            super(handler);
        }
    
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Decodeable) {
                decode(message);
            }
    
            if (message instanceof Request) {
                decode(((Request) message).getData());
            }
    
            if (message instanceof Response) {
                decode(((Response) message).getResult());
            }
    
            handler.received(channel, message);
        }
    
        private void decode(Object message) {
            if (message instanceof Decodeable) {
                try {
                    ((Decodeable) message).decode();
                    if (log.isDebugEnabled()) {
                        log.debug("Decode decodeable message " + message.getClass().getName());
                    }
                } catch (Throwable e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                    }
                } // ~ end of catch
            } // ~ end of if
        } // ~ end of method decode
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    3.2.5 DubboProtocol.handler

    真正调用服务端方法的Handler。

    public class DubboProtocol extends AbstractProtocol {
    	private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    
    		/**
    		 * 处理客户端方法调用请求,执行服务端方法,并方法
    		 */
            @Override
            public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    
                if (!(message instanceof Invocation)) {
                    throw new RemotingException(channel, "Unsupported request: "
                            + (message == null ? null : (message.getClass().getName() + ": " + message))
                            + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
                }
    
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get(METHODS_KEY);
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(COMMA_SEPARATOR)) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(COMMA_SEPARATOR);
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 执行本地方法
                Result result = invoker.invoke(inv);
                return result.thenApply(Function.identity());
            }
    
            @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
    
                } else {
                    super.received(channel, message);
                }
            }
    
            @Override
            public void connected(Channel channel) throws RemotingException {
                invoke(channel, ON_CONNECT_KEY);
            }
    
            @Override
            public void disconnected(Channel channel) throws RemotingException {
                if (logger.isDebugEnabled()) {
                    logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
                }
                invoke(channel, ON_DISCONNECT_KEY);
            }
    
            private void invoke(Channel channel, String methodKey) {
                Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
                if (invocation != null) {
                    try {
                        received(channel, invocation);
                    } catch (Throwable t) {
                        logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                    }
                }
            }
    
            private Invocation createInvocation(Channel channel, URL url, String methodKey) {
                String method = url.getParameter(methodKey);
                if (method == null || method.length() == 0) {
                    return null;
                }
    
                RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
                invocation.setAttachment(PATH_KEY, url.getPath());
                invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
                invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
                invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
                if (url.getParameter(STUB_EVENT_KEY, false)) {
                    invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
                }
    
                return invocation;
            }
        };
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99

    最终会执行 JavassistProxyFactory 生成的 AbstractProxyInvoker 对象中的方法。

    3.3 客户端收到服务端执行结果后处理

    在进行 NettyClient 连接创建时,会传入一个 Handler,如下:

    public class NettyClient extends AbstractClient {
    	@Override
        protected void doOpen() throws Throwable {
        	// 处理Handler
            final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
            bootstrap = new Bootstrap();
            bootstrap.group(EVENT_LOOP_GROUP)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                    .channel(socketChannelClass());
    
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
    
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
                    }
    
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                            .addLast("handler", nettyClientHandler);
    
                    String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                    if(socksProxyHost != null) {
                        int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                        Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                        ch.pipeline().addFirst(socks5ProxyHandler);
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    所以当收到服务端响应结果后,会执行 NettyClientHandler#channelRead() 方法,进行处理,最终,会进入 HeaderExchangeHandler#received() 方法中。

    3.3.1 HeaderExchangeHandler

    public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    	@Override
        public void received(Channel channel, Object message) throws RemotingException {
            final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        handleRequest(exchangeChannel, request);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
            	// 处理服务端响应
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        }
    
    	// 处理服务端响应结果
        static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    3.3.2 DefaultFuture

    public class DefaultFuture extends CompletableFuture<Object> {
    	// 处理服务端响应结果
    	public static void received(Channel channel, Response response) {
            received(channel, response, false);
        }
    
    	public static void received(Channel channel, Response response, boolean timeout) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    Timeout t = future.timeoutCheckTask;
                    if (!timeout) {
                        // decrease Time
                        t.cancel();
                    }
                    // 处理服务端响应结果
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response status is " + response.getStatus()
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                            + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
        
        private void doReceived(Response res) {
            if (res == null) {
                throw new IllegalStateException("response cannot be null");
            }
            if (res.getStatus() == Response.OK) { // 是否执行成功
                this.complete(res.getResult());
            } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 超时
                this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
            } else { // 远程服务异常
                this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
            }
    
            // the result is returning, but the caller thread may still waiting
            // to avoid endless waiting for whatever reason, notify caller thread to return.
            if (executor != null && executor instanceof ThreadlessExecutor) {
                ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
                if (threadlessExecutor.isWaiting()) {
                    threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                            " which is not an expected state, interrupt the thread manually by returning an exception."));
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    四、总结

    在这里插入图片描述

  • 相关阅读:
    如何用在线模版快速制作活动海报?
    C. Yarik and Array Codeforces Round 909 (Div. 3) 1899C
    接口 vs 抽象类:如何在Java中做出正确的选择
    Linux编译器——gcc/g++的使用
    金x软件有限公司安全测试岗位面试
    深入了解vue2没有在data中定义的属性非响应式的问题
    Linux上Meson安装及使用
    Java代码审计WebGoat—— 登录注册模块初审计
    vue全局方法plugins/utils
    Spring原理
  • 原文地址:https://blog.csdn.net/qq_33375499/article/details/126575920