• Dubbo-服务暴露


    前言

    Dubbo源码阅读分享系列文章,欢迎大家关注点赞

    SPI实现部分

    1. Dubbo-SPI机制

    2. Dubbo-Adaptive实现原理

    3. Dubbo-Activate实现原理

    4. Dubbo SPI-Wrapper

    注册中心

    1. Dubbo-聊聊注册中心的设计

    2. Dubbo-时间轮设计

    通信

    1. Dubbo-聊聊通信模块设计

    RPC

    1. 聊聊Dubbo协议

    AbstractProtocol

    在介绍RPC核心接口的时候我们说过Protocol核心作用是将Invoker服务暴露出去以及引用服务将Invoker对象返回,因此我们就从Protocol开始说起。下图是Protocol的整个继承结构,从前面我们介绍的一些经验来看,我们先来看一下AbstractProtocol这个抽象接口。

    关于AbstractProtocol该接口没有直接实现export和refer方法,该接口主要实现destroy方法以及提供一些公共字段以及公共能力,首先我们看下核心字段,核心字段主要有三个exporterMap、serverMap以及invokers,exporterMap存储服务集合,serverMap存储ProtocolServer实例,invokers存储引用服务的集合。

    1. //存储暴露除去的服务
    2. protected final DelegateExporterMap exporterMap = new DelegateExporterMap();
    3. //ProtocolServer所有实例
    4. protected final Map<String, ProtocolServer> serverMap = new ConcurrentHashMap<>();
    5. //服务引用的集合
    6. protected final Set<Invoker<?>> invokers = new ConcurrentHashSet>();

    这里和介绍一下exporterMap结构,exporterMap是一个Map结构,Key是通过ProtocolUtils.serviceKey方法构建的唯一key, Exporter也就是我们需要暴露除去服务。关于Key构建是可以理解为一个四层Map,第一层按照group分组,group就是URL中配置的内容,通常可以理解为机房、区域等等;剩下的层在GroupServiceKeyCache中,分别按照 serviceName、serviceVersion、port 进行分组,key最终的结构是serviceGroup/serviceName:serviceVersion:port

    1. private String createServiceKey(String serviceName, String serviceVersion, int port) {
    2.   StringBuilder buf = new StringBuilder();
    3.   if (StringUtils.isNotEmpty(serviceGroup)) {
    4.     buf.append(serviceGroup).append('/');
    5.   }
    6.   buf.append(serviceName);
    7.   if (StringUtils.isNotEmpty(serviceVersion) && !"0.0.0".equals(serviceVersion) && !"*".equals(serviceVersion)) {
    8.     buf.append(':').append(serviceVersion);
    9.   }
    10.   buf.append(':').append(port);
    11.   return buf.toString();
    12. }

    serverMap存储所有的ProtocolServer,也就是服务端,Key是host和port组成的字符串,从URL中获取,ProtocolServer就是对RemotingServer的简单封装,serverMap的填充发生在具体的实现。

    1. private void openServer(URL url) {
    2.   // find server.
    3.   String key = url.getAddress();
    4. //client can export a service which's only for server to invoke
    5. boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    6. if (isServer) {
    7.   ProtocolServer server = serverMap.get(key);
    8.   //双重锁定
    9.   if (server == null) {
    10.     synchronized (this) {
    11.       server = serverMap.get(key);
    12.       if (server == null) {
    13.         serverMap.put(key, createServer(url));
    14.       }
    15.     }
    16.   } else {
    17.     // server supports reset, use together with override
    18.     server.reset(url);
    19.   }
    20. }
    21. }

    invokers主要用于存储被引用的集合,

    1. public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    2.   optimizeSerialization(url);
    3. // create rpc invoker.
    4. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    5. invokers.add(invoker);
    6. return invoker;
    7. }

    AbstractProtocol唯一实现的方法就是destory方法,首先会遍历Invokers集合,销毁全部的服务引用,然后遍历全部的exporterMap集合,销毁发布出去的服务。

    1. public void destroy() {
    2.   for (Invoker invoker : invokers) {
    3.     if (invoker != null) {
    4.       //移除所有的引用
    5.       invokers.remove(invoker);
    6.       try {
    7.         if (logger.isInfoEnabled()) {
    8.           logger.info("Destroy reference: " + invoker.getUrl());
    9.         }
    10.         invoker.destroy();
    11.       } catch (Throwable t) {
    12.         logger.warn(t.getMessage(), t);
    13.       }
    14.     }
    15.   }
    16.   for (Map.Entry> item : exporterMap.getExporterMap().entrySet()) {
    17.     //销毁发布出去的服务
    18.     if (exporterMap.removeExportMap(item.getKey(), item.getValue())) {
    19.       try {
    20.         if (logger.isInfoEnabled()) {
    21.           logger.info("Unexport service: " + item.getValue().getInvoker().getUrl());
    22.         }
    23.         item.getValue().unexport();
    24.       } catch (Throwable t) {
    25.         logger.warn(t.getMessage(), t);
    26.       }
    27.     }
    28.   }
    29. }

    DubboProtocol

    再开始介绍DubboProtocol之前我们来聊下看源码的另外一个方式,该方式也就是通过单元测试,对于像Dubbo这种优秀的框架,自身的单元测试的覆盖率是比较高的,此外在一些我们疑惑的地方,我们就可以使用单元测试来解决下疑惑,该种方式非常便捷,接下来我们会使用下该方法。 首先我们来看下export方法实现,该方法核心主要就是2个方法:

    1. 将invoker转化为DubboExporter,放入exporterMap缓存;

    2. 启动ProtocolServer;

    1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    2.   URL url = invoker.getUrl();
    3.   //创建Service key
    4.   String key = serviceKey(url);
    5.   //将invoker转化为DubboExporter
    6.   DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    7.   //记录到exporterMap
    8.   exporterMap.addExportMap(key, exporter);
    9.   //export an stub service for dispatching event
    10.   Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEYDEFAULT_STUB_EVENT);
    11.   Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    12.   if (isStubSupportEvent && !isCallbackservice) {
    13.     String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
    14.     if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
    15.       if (logger.isWarnEnabled()) {
    16.         logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY+
    17.                                               "], has set stubproxy support event ,but no stub methods founded."));
    18.       }
    19.     }
    20.   }
    21.   //启动ProtocolServer
    22.   openServer(url);
    23.   //序列化优化处理  该方法就是提前将被序列化的类加载到Dubbo中
    24.   optimizeSerialization(url);
    25.   return exporter;
    26. }

    DubboExporter

    DubboExporter该类会将invoker进行封装,首先我们来看一下Exporter整体的继承结构,如下图:

    在DubboExporter创建时候调用父类AbstractExporter的构造函数,

    1. public DubboExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
    2.   super(invoker);
    3.   this.key = key;
    4.   this.delegateExporterMap = delegateExporterMap;
    5. }

    在AbstractExporter中存在两个字段invoker和unexported,unexported表示服务是否被销毁,此外该类也对Exporter接口进行实现,在销毁Invoker对象的时候会判断服务的状态,然后在调用destroy进行销毁,afterUnExport方法会执行子类具体的实现,在DubboExporter是移除exporterMap中的缓存的对象。

    1. private final Invoker invoker;
    2. private volatile boolean unexported = false;
    3. @Override
    4. public Invoker getInvoker() {
    5.   return invoker;
    6. }
    7. @Override
    8. final public void unexport() {
    9.   if (unexported) {
    10.     return;
    11.   }
    12.   unexported = true;
    13.   getInvoker().destroy();
    14.   afterUnExport();
    15. }

    服务端初始化

    openServer方法是我们关键方法,该方法会将下层的Exchange、Transport层的方法进行调用,并最终创建NettyServer,此处我们也会使用调试的方式来搞清楚整个调用过程,openServer方法首先判断是否是服务端,然后判断服务是否创建,没有则创建ProtocolServer,否则进行服务重置更新。createServer的时候通过Exchangers门面模式创建,最终封装成为DubboProtocolServer。

    1. private void openServer(URL url) {
    2.   // find server.
    3.   String key = url.getAddress();
    4. //判断是否为服务端
    5. boolean isServer = url.getParameter(IS_SERVER_KEYtrue);
    6. if (isServer) {
    7.   ProtocolServer server = serverMap.get(key);
    8.   //双重锁定
    9.   if (server == null) {
    10.     synchronized (this) {
    11.       server = serverMap.get(key);
    12.       if (server == null) {
    13.         serverMap.put(key, createServer(url));
    14.       }
    15.     }
    16.   } else {
    17.     // server supports resetuse together with override
    18.     server.reset(url);
    19.   }
    20. }
    21. }
    22. private ProtocolServer createServer(URL url) {
    23.   url = URLBuilder.from(url)
    24.     //ReadOnly请求是否阻塞等待
    25.     .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEYBoolean.TRUE.toString())
    26.     //心跳间隔
    27.     .addParameterIfAbsent(HEARTBEAT_KEYString.valueOf(DEFAULT_HEARTBEAT))
    28.     //Codec2扩展实现
    29.     .addParameter(CODEC_KEY, DubboCodec.NAME)
    30.     .build();
    31.   //获取服务端实现  默认是netty
    32.   String str = url.getParameter(SERVER_KEYDEFAULT_REMOTING_SERVER);
    33.   //检查服务端扩展实现是否支持
    34.   if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    35.     throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    36.   }
    37.   ExchangeServer server;
    38.   try {
    39.     //通过Exchangers门面类创建ExchangeServer
    40.     server = Exchangers.bind(url, requestHandler);
    41.   } catch (RemotingException e) {
    42.     throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    43.   }
    44.   //检测客户端服务实现是否支持
    45.   str = url.getParameter(CLIENT_KEY);
    46.   if (str != null && str.length() > 0) {
    47.     Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
    48.     if (!supportedTypes.contains(str)) {
    49.       throw new RpcException("Unsupported client type: " + str);
    50.     }
    51.   }
    52.   //将ExchangeServer包装为DubboProtocolServer
    53.   return new DubboProtocolServer(server);
    54. }

    在前面我们讲过Transport的设计,对于Exchange是Transport的上层,也就是和Protocol进行交互的,今天我们就从这里来分析Exchange以及Transport调用的整个过程,这样大家就更加理解了Dubbo服务暴露的整个过程,

    接下来调用链比较长,我们直接通过单元测试来梳理清楚整个调用链,我们先来查看下export被调用的地方,如下图,我们可以看到该方法被很多地方调用,应为我们是在DubboProtocol类下的方法,因此我们直接使用DubboProtocolTest类下的单元测试就可以。 DubboProtocolTest类下面有很多单测的方法如下图,从名字我们我就可以看出和我们相关应该就是testDemoProtocol和testGetDubboProtocol,这两个方法我们看断言上面来说的话testGetDubboProtocol方法最符合我们的使用,因此我们使用该单元测试。

    1. @Test
    2. public void testGetDubboProtocol(){
    3.   DemoService service = new DemoServiceImpl();
    4.   int port = NetUtils.getAvailablePort();
    5.   protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:" + port + "/" + DemoService.class.getName())));
    6.                                    Assertions.assertTrue(DubboProtocol.getDubboProtocol().getServers().size() > 0);
    7. }

    我们直接将断点放到createServer方法内部,我们可以看到构建URL为,Transporter使用的NettyTransporter,编解码器默认采用DubboCodec。

    接下来我们断点放入到Exchangers类的bind方法中,该类采用SPI加载Exchanger,通过调试我们可以发现,最终是采用的是HeaderExchanger, 在HeaderExchanger类中创建HeaderExchangeServer,HeaderExchangeServer该类会创建心跳检测服务,服务端初始化核心的代码在Transporters中,getTransporter方法采用SPI的自适应拓展类,在运行时动态选择NettyTransporter作为实现,

    1. public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
    2.   if (url == null) {
    3.     throw new IllegalArgumentException("url == null");
    4.   }
    5.   if (handlers == null || handlers.length == 0) {
    6.     throw new IllegalArgumentException("handlers == null");
    7.   }
    8.   ChannelHandler handler;
    9.   if (handlers.length == 1) {
    10.     handler = handlers[0];
    11.   } else {
    12.     handler = new ChannelHandlerDispatcher(handlers);
    13.   }
    14.   return getTransporter().bind(url, handler);
    15. }
    16. public static Transporter getTransporter() {
    17.   return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    18. }

    接下来我们看一下NettyTransporter类,在该类中直接创建NettyServer;

    1. public class NettyTransporter implements Transporter {
    2.     public static final String NAME = "netty";
    3.     @Override
    4.     public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
    5.         return new NettyServer(url, handler);
    6.     }
    7.     @Override
    8.     public Client connect(URL url, ChannelHandler handler) throws RemotingException {
    9.         return new NettyClient(url, handler);
    10.     }
    11. }

    在NettyServer调用父类的AbstractServer,这部分内容我们在通信模块中已经讲过,这里我们就是要将这部分调用的串联起来;

    1. public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    2.   // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
    3.   // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
    4.   super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    5. }

    在AbstractServer中,会调用NettyServer的doOpen方法,用来完成NettyServer的启动;

    1. public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    2.   //调用父类
    3.   super(url, handler);
    4.   //从URL获取本地地址
    5.   localAddress = getUrl().toInetSocketAddress();
    6.   String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    7.   int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    8.   if (url.getParameter(ANYHOST_KEYfalse) || NetUtils.isInvalidLocalHost(bindIp)) {
    9.     bindIp = ANYHOST_VALUE;
    10.   }
    11.   //绑定地址
    12.   bindAddress = new InetSocketAddress(bindIp, bindPort);
    13.   //连接数
    14.   this.accepts = url.getParameter(ACCEPTS_KEYDEFAULT_ACCEPTS);
    15.   try {
    16.     doOpen();
    17.     if (logger.isInfoEnabled()) {
    18.       logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
    19.     }
    20.   } catch (Throwable t) {
    21.     throw new RemotingException(url.toInetSocketAddress(), null"Failed to bind " + getClass().getSimpleName()
    22.                                 + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    23.   }
    24.   //创建该服务对应的线程池
    25.   executor = executorRepository.createExecutorIfAbsent(url);
    26. }

    NettyServer的启动就是Netty的常规的使用,启动过程中要注意下NettyServerHandler,关于该Handler作用就是当服务消费者调用服务提供者的服务时,提供者用来处理各个消息事件,在整一套的调用链上会形成下图的结构,关于这部分内容我们使用一个章节来详细介绍一下,至此就完成整个服务端的启动,最后就会包装成为DubboProtocolServer。

    1. protected void doOpen() throws Throwable {
    2.   //创建ServerBootstrap
    3.   bootstrap = new ServerBootstrap();
    4. //创建boss EventLoopGroup
    5. bossGroup = NettyEventLoopFactory.eventLoopGroup(1"NettyServerBoss");
    6. //创建worker EventLoopGroup
    7. workerGroup = NettyEventLoopFactory.eventLoopGroup(
    8.   getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
    9.   "NettyServerWorker");
    10. //创建一个Netty的ChannelHandler
    11. final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    12. //此处的Channel是Dubbo的Channel
    13. channels = nettyServerHandler.getChannels();
    14. //会话保持
    15. boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEYBoolean.FALSE);
    16. bootstrap.group(bossGroup, workerGroup)
    17.   .channel(NettyEventLoopFactory.serverSocketChannelClass())
    18.   .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    19.   .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    20.   .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
    21.   .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    22.   .childHandler(new ChannelInitializer<SocketChannel>() {
    23.     @Override
    24.     protected void initChannel(SocketChannel ch) throws Exception {
    25.       // FIXME: should we use getTimeout()?
    26.       //连接空闲超时时间
    27.       int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
    28.       //创建Netty实现的decoder和encoder
    29.       NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
    30.       if (getUrl().getParameter(SSL_ENABLED_KEYfalse)) {
    31.         //如果配置HTTPS 要实现SslHandler
    32.         ch.pipeline().addLast("negotiation",
    33.                               SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
    34.       }
    35.       ch.pipeline()
    36.         .addLast("decoder", adapter.getDecoder())
    37.         .addLast("encoder", adapter.getEncoder())
    38.         //心跳检查
    39.         .addLast("server-idle-handler", new IdleStateHandler(00, idleTimeout, MILLISECONDS))
    40.         //注册nettyServerHandler
    41.         .addLast("handler", nettyServerHandler);
    42.     }
    43.   });
    44. // bind
    45. ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    46. //等待绑定完成
    47. channelFuture.syncUninterruptibly();
    48. channel = channelFuture.channel();
    49. }

    image.png

    结束

    欢迎大家点点关注,点点赞!

  • 相关阅读:
    java 工程管理系统源码+项目说明+功能描述+前后端分离 + 二次开发
    安卓RadioButton设置图片大小
    112.HBase Endpoint类型的Coprocessor开发与部署
    SpringBoot集成Swagger的使用
    The platform “win32“ is incompatible with this module.
    《轻购优品》新零售玩法:消费积分认购+众筹新玩法
    Docker-compose安装mysql
    goland报错:“package command-line-arguments is not a main package”解决方案
    XCTF1-web Robots
    root权限发现没Java等环境:sudo su与sudo su -
  • 原文地址:https://blog.csdn.net/weixin_38592881/article/details/128177496