• Dubbo-聊聊通信模块设计


    前言

    Dubbo源码阅读分享系列文章,欢迎大家关注点赞

    SPI实现部分

    1. Dubbo-SPI机制

    2. Dubbo-Adaptive实现原理

    3. Dubbo-Activate实现原理

    4. Dubbo SPI-Wrapper

    注册中心

    1. Dubbo-聊聊注册中心的设计

    通信模块介绍

    Dubbo通信模块主要的目的就是解决客户端以服务端通信的问题,核心代码都在dubbo-remoting模块,该模块提供了多种客户端和服务端通信的功能。Dubbo的通信主要包括是三部分:Exchange、Transport和Serialize,对于序列化部分的设计在单独的模块中,我们再单独聊,这篇文章主要聊Exchange、Transport设计。

    对于Dubbo来说没有自己的网络框架,使用现有第三方类库,因此需要设计一套标准API来兼容多种不同的通信框架,dubbo-remoting 模块的结构就是目前Dubbo兼容的所有的通信框架。 在整体模块设计上,dubbo-remoting-api是其他模块上层抽象,其他子模块都是依赖第三方NIO库实现 dubbo-remoting-api模块的。因此我们想要了解清楚dubbo-remoting设计必须要理解dubbo-remoting-api的设计。 对于dubbo-remoting-api大致可以分为四类,

    1. 核心API设计,主要是包括端口、编码、解码等等核心接口的抽象;

    2. buffer,主要是定义缓冲区相关的接口、抽象类以及实现类;

    3. exchange,抽Request和Response概念抽象以及扩展;

    4. transport,网络传输层的抽象,但它只负责消息的传输;

    源码分析

    核心API设计

    Endpoint

    Endpoint被翻译端点,这里可以理解为通信中对IP和Port的抽象,Client和Server端共同的抽象,两个端通过Endpoint建立TCP连接,进行通信。

    对于该Endpoint接口定义了三类方法:

    1. get类方法,主要获取Endpoint的本地地址、关联的URL信息以及底层Channel关联的ChannelHandle,也就是获取建立连接需要的属性;

    2. send方法主要负责发送数据;

    3. close类方法,主要是用来关闭连接;

    Channel

    Channel可以理解为Client和Server端连接的通道,是NIO框架设计中不可缺少的概念,Channel继承Endpoint,因此拥有Endpoint的能力,对于Channel来说,可以给自身设计一些额外属性。

    ChannelHandler

    ChannelHandler可以理解为Channel的处理器,ChannelHandler 可以处理Channel的连接建立以及连接断开事件,还可以处理读取到的数据、发送的数据以及捕获到的异常。

    Codec2

    Codec2实现编码和解码,实现字节与消息体之间的转换,类似Netty中编码和解码。此外,Codec2接口被@SPI 接口修饰了,说明该接口是一个扩展接口,同时encode方法和 decode方法都被@Adaptive注解修饰,因此也会生成适配器类,可以根据URL中的codec值确定具体的扩展实现类,这里就体现SPI和URL灵活配置的特性。

    1. @SPI
    2. public interface Codec2 {
    3.     @Adaptive({Constants.CODEC_KEY})
    4.     void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
    5.     @Adaptive({Constants.CODEC_KEY})
    6.     Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
    7.     enum DecodeResult {
    8.         NEED_MORE_INPUT, SKIP_SOME_INPUT
    9.     }
    10. }

    此外还存在DecodeResult的枚举,该枚举是处理粘包和拆包使用的。

    Client

    Client继承了Endpoint、Channel等相关的接口,因此对于Client也具备收发消息能力,Client只可以关联一个 Channel。

    RemotingServer

    Server与Client不太一样地方就是可以接收多个Client发起的Channel连接,因此RemotingServer接口中存在获取多个Channel列表的接口。

    Transporter

    Transporter接口是Dubbo在Client和Server上又封装的一层,我们可以看到改接口被@SPI以及@Adaptive注解修饰,因此这个是个可扩展的接口,默认使用Netty的扩展,@Adaptive表示可以动态生成该适配的类,根据设置的值确定具体实现的类。

    1. @SPI("netty")
    2. public interface Transporter {
    3.     @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    4.     RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
    5.     @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    6.     Client connect(URL url, ChannelHandler handler) throws RemotingException;
    7. }

    Transporter的实现类有主要有以下几种,每个对应的具体的NIO的实现都在其各自的包中,这样可以通过灵活配置来进行切换不同的实现。

    为了验证是否正确,我们简单再来看一下RemotingServer的实现,RemotingServer的实现中,包含每个具体NIO框架的实现,因此这里更加印证Transporter的的抽象,让我们可以通过Dubbo SPI修改具体Transporter扩展实现,从而切换到不同的Client和 RemotingServer实现,从而达到NIO库切换,这里我们无需修改任何代码,真正的做到开放-闭合的原则。

    Transporters

    Transporters该类是一种门面模式的设计,主要是解决和多个不同子模块直接进行交互的问题,通过该类设计,将公共的行为Transporter对象的创建以及ChannelHandler的处理,大家可以直接依赖Transporters类,这部分调用是在Dubbo协议初始化时候发起的,这部分我们到时候在细讲,这个章节暂时先不讲解。

    但是这里需要在这个看一下关于ChannelHandler的处理,此处传入了多个ChannelHandler,将多个ChannelHandler包装成为ChannelHandlerDispatcher,ChannelHandlerDispatcher实现ChannelHandler,内部维护了一个 CopyOnWriteArraySet,对外提供操作ChannelHandler方法,此处主要是为了引出后续Handler的处理流程,后续一层处理模型的源头都在这里。

    到这里我们大概对Dubbo的通讯模型有了一个轮廓,我们来进行一个简单的总结,可以参考下图:

    1. 上层通过会Transporters获取到具体的Transporter扩展实现,然后通过Transporter获取Client和 RemotingServer实现;

    2. Client与RemotingServer都是通过Channel进行交互,Channel使用ChannelHandler进行数据传输,此外通过Codec2进行编解码;

    Buffer设计

    image.png

    接口设计

    ChannelBuffer的设计类似于Netty的Buffer的设计,大致可以分为五类,对于具体的实现我们在后面AbstractChannelBuffer等实现类里面进行讲解。

    接下来我们来看一下ChannelBufferFactory,该接口都是用来创建ChannelBuffer的,并且每个具体的实现都是单例的,可以理解为一个简单工厂的设计,可以有不同类型的ChannelBuffer的实现。

    AbstractChannelBuffer

    AbstractChannelBuffer维护两类索引,一类用于读写,另外一类用于读写标记;

    关于读写类索引就是记录当前读到什么位置以及写到什么位置了,标记类索引就是为了做数据备份和回滚使用,为了对缓冲区重复利用。该类的方法都主要是利用四个属性来操作,用来检测是否有数据可读或者还是否有空间可写等方法,做一些前置条件的校验以及索引的设置,具体的实现都是需要子类来实现。

    1.     @Override
    2.     public void readBytes(byte[] dst, int dstIndex, int length) {
    3.         //检查位置是否足够
    4.         checkReadableBytes(length);
    5.         //此处可以理解为将readerIndex后移length个字节读取到dst数组中
    6.         //也就是数组dst的dstIndex~dstIndex+length位置
    7.         getBytes(readerIndex, dst, dstIndex, length);
    8.         //readerIndex后移length个字节
    9.         readerIndex += length;
    10.     }
    11.     @Override
    12.     public void readBytes(byte[] dst, int dstIndex, int length) {
    13.         //检查位置是否足够
    14.         checkReadableBytes(length);
    15.         //此处可以理解为将readerIndex后移length个字节读取到dst数组中
    16.         //也就是数组dst的dstIndex~dstIndex+length位置
    17.         getBytes(readerIndex, dst, dstIndex, length);
    18.         //readerIndex后移length个字节
    19.         readerIndex += length;
    20.     }
    21.     @Override
    22.     public void writeBytes(byte[] src, int srcIndex, int length) {
    23.         //将src数组中srcIndex~srcIndex+length位置的数据写到当前的buffer中
    24.         setBytes(writerIndex, src, srcIndex, length);
    25.         //将当前的writerIndex后移length
    26.         writerIndex += length;
    27.     }

    HeapChannelBuffer

    HeapChannelBuffer是ChannelBuffer的一种具体的实现,该类是基于字节数组的ChannelBuffer实现,通过byte[]数组来进行数据的存储,setBytes和getBytes通过System.arraycopy来进行对数组的操作。

    1.     //此缓冲区包装的基础堆字节数组
    2.     protected final byte[] array;
    3.     
    4.     @Override
    5.     public void getBytes(int index, byte[] dst, int dstIndex, int length) {
    6.         System.arraycopy(array, index, dst, dstIndex, length);
    7.     }
    8.     @Override
    9.     public void setBytes(int index, byte[] src, int srcIndex, int length) {
    10.         System.arraycopy(src, srcIndex, array, index, length);
    11.     }

    对于HeapChannelBuffer的具体的工厂的实现是HeapChannelBufferFactory,该工厂是一个单例模式,HeapChannelBufferFactory通过ChannelBuffers工具类创建固定容量的HeapChannelBuffer,此外也可以通过拷贝的形式创建HeapChannelBuffer。

    1.     @Override
    2.     public ChannelBufferFactory factory() {
    3.         return HeapChannelBufferFactory.getInstance();
    4.     }

    DynamicChannelBuffer

    DynamicChannelBuffer可以理解为一个扩展类,也就是对装饰者模式,就是对ChannelBuffer的增加强,增加动态扩容的能力,关于该类默认的实现HeapChannelBufferFactory,我可以通过指定HeapChannelBufferFactory为对应的实现添加动态扩容的能力。

    1.     //具体的ChannelBufferFactory的实现
    2.     private final ChannelBufferFactory factory;
    3.     //需要扩容的buffer
    4.     private ChannelBuffer buffer;
    5.     public DynamicChannelBuffer(int estimatedLength) {
    6.         //默认实现
    7.         this(estimatedLength, HeapChannelBufferFactory.getInstance());
    8.     }
    9.     //指定具体的实现
    10.     public DynamicChannelBuffer(int estimatedLength, ChannelBufferFactory factory) {
    11.         if (estimatedLength < 0) {
    12.             throw new IllegalArgumentException("estimatedLength: " + estimatedLength);
    13.         }
    14.         if (factory == null) {
    15.             throw new NullPointerException("factory");
    16.         }
    17.         this.factory = factory;
    18.         buffer = factory.getBuffer(estimatedLength);
    19.     }

    关于如何实现ChannelBuffer的动态扩容,看懂Java ArryList扩容的,我相信一定能理解,也就是我们要控制写入时候的判断写入的空间是否足够就可以了。DynamicChannelBuffer通过ensureWritableBytes方法来实现扩容,我们来看下他是如何做的:

    1.     @Override
    2.     public void ensureWritableBytes(int minWritableBytes) {
    3.         //如果写入字节数小于等于可写的字节数
    4.         if (minWritableBytes <= writableBytes()) {
    5.             return;
    6.         }
    7.         //新增容量
    8.         int newCapacity;
    9.         //缓存区字节数为0
    10.         if (capacity() == 0) {
    11.             //设置为1
    12.             newCapacity = 1;
    13.         } else {
    14.             //新增容量为缓冲区字节数
    15.             newCapacity = capacity();
    16.         }
    17.         //最小新增容量 = 当前写入字节数的索引+最小写入的字节数
    18.         int minNewCapacity = writerIndex() + minWritableBytes;
    19.         //如果新增容量小于最小新增容量
    20.         while (newCapacity < minNewCapacity) {
    21.             //新增容量左移1位,加倍
    22.             newCapacity <<= 1;
    23.         }
    24.         //通过工厂类创建该容量
    25.         ChannelBuffer newBuffer = factory().getBuffer(newCapacity);
    26.         //从buffer中读取数据到newBuffer中
    27.         newBuffer.writeBytes(buffer, 0, writerIndex());
    28.         //替换原来的缓存区
    29.         buffer = newBuffer;
    30.     }

    ByteBufferBackedChannelBuffer

    ByteBufferBackedChannelBuffer该类是基于Java NIO的ByteBuffer实现的ChannelBuffer,都是通过操作ByteBuffer的API进行实现,这里我们就不展开了。

    1.     //NIO ByteBuffer
    2.     private final ByteBuffer buffer;
    3.     //初始化容量
    4.     private final int capacity;
    5.     public ByteBufferBackedChannelBuffer(ByteBuffer buffer) {
    6.         if (buffer == null) {
    7.             throw new NullPointerException("buffer");
    8.         }
    9.         this.buffer = buffer.slice();
    10.         capacity = buffer.remaining();
    11.         writerIndex(capacity);
    12.     }
    13.     public ByteBufferBackedChannelBuffer(ByteBufferBackedChannelBuffer buffer) {
    14.         this.buffer = buffer.buffer;
    15.         capacity = buffer.capacity;
    16.         setIndex(buffer.readerIndex(), buffer.writerIndex());
    17.     }

    ChannelBufferInputStream

    ChannelBufferInputStream该类实现InputStream输入流的的方法,内部维护了ChannelBuffer、startIndex以及endIndex,该方法内部都是读取ChannelBuffer中的数据,startIndex和endIndex控制读取数据位置,这样就完成 InputStream的扩展实现。

    1.     //ChannelBuffer
    2.     private final ChannelBuffer buffer;
    3.     //开始位置
    4.     private final int startIndex;
    5.     //结束位置
    6.     private final int endIndex;
    7.     @Override
    8.     public int read() throws IOException {
    9.         if (!buffer.readable()) {
    10.             return -1;
    11.         }
    12.         return buffer.readByte() & 0xff;
    13.     }

    ChannelBufferOutputStream

    ChannelBufferOutputStream该类实现OutputStream输出流,内部维护了ChannelBuffer、startIndex,该方法内部都是写入到ChannelBuffer中,startIndex是标记开始写入位置。 Buffer的整体的设计到此就介绍完成,通过ChannelBufferOutputStream、ChannelBufferInputStream控制数据的输入输出,内部通过ChannelBuffer存储数据,ChannelBuffer可以根据需要进行不同的实现。

    Transport设计

    Transport在核心API中介绍上层访问都是通过该接口访问的,接下来我们就来探秘下Transport层都做了哪些事情。

    AbstractPeer

    AbstractPeer该抽象类可以理解为服务器概念,继承了Endpoint、ChannelHandler接口,内部有四个核心的属性,URL代表自身服务的地址,closing、closed表示当前服务器状态,handler就是ChannelHandler,AbstractPeer内部实现了都是委托给ChannelHandler,这是一种典型的装饰器设计模式。

    1.     //ChannelHandler
    2.     private final ChannelHandler handler;
    3.     //自身地址
    4.     private volatile URL url;
    5.     //服务器状态
    6.     private volatile boolean closing;
    7.     private volatile boolean closed;
    8.     public AbstractPeer(URL url, ChannelHandler handler) {
    9.         if (url == null) {
    10.             throw new IllegalArgumentException("url == null");
    11.         }
    12.         if (handler == null) {
    13.             throw new IllegalArgumentException("handler == null");
    14.         }
    15.         this.url = url;
    16.         this.handler = handler;
    17.     }

    AbstractEndpoint

    AbstractEndpoint继承AbstractPeer,可以理解为端口的抽象,内部增加Codec2和connectTimeout两个属性,在AbstractEndpoint在初始化的时候会将这两个字段初始化。

    1.     private Codec2 codec;
    2.     private int connectTimeout;
    3.     public AbstractEndpoint(URL url, ChannelHandler handler) {
    4.         //调用父类
    5.         super(url, handler);
    6.         //根据URL中的codec参数值,确定此处具体的Codec2实现类
    7.         this.codec = getChannelCodec(url);
    8.         //设置connectTimeout
    9.         this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    10.     }
    11.     protected static Codec2 getChannelCodec(URL url) {
    12.         //获取URL协议
    13.         String codecName = url.getProtocol();
    14.         //判断有没有该扩展名
    15.         if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
    16.             //通过ExtensionLoader加载具体实现类
    17.             return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    18.         } else {
    19.             //没有匹配到从扩展类进行加载
    20.             return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
    21.                     .getExtension(codecName));
    22.         }
    23.     }

    此外该接口实现Resetable接口,该接口内部只有一个reset方法,该方法通过获取URL参数信息,重置了connectTimeout的信息以及Codec2的信息。

    AbstractServer

    AbstractServer是对服务端的抽象,该抽象类实现AbstractEndpoint和RemotingServer,该抽象类内部有五个核心属性,localAddress、bindAddress这两个属性都是在URL参数中获取,表示Server本地的地址以及绑定的地址,默认两个值是一致的,accepts表示是Server最大的连接次数,默认是0,表述没有限制,executorRepository、executor线程池相关的属性,executorRepository负责管理线程池,executor表示当前服务管理的线程池。

    1.     //当前服务关联的线程池
    2.     ExecutorService executor;
    3.     //本机地址
    4.     private InetSocketAddress localAddress;
    5.     //绑定地址
    6.     private InetSocketAddress bindAddress;
    7.     //最大连接数
    8.     private int accepts;
    9.     //管理线程池
    10.     private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

    AbstractServer初始化也就是在构造函数中完成初始化的,然后通过调用其抽象方法doOpen实现启动服务器。

    1.     public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    2.         //调用父类
    3.         super(url, handler);
    4.         //从URL获取本地地址
    5.         localAddress = getUrl().toInetSocketAddress();
    6.         String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    7.         int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    8.         if (url.getParameter(ANYHOST_KEYfalse) || NetUtils.isInvalidLocalHost(bindIp)) {
    9.             bindIp = ANYHOST_VALUE;
    10.         }
    11.         //绑定地址
    12.         bindAddress = new InetSocketAddress(bindIp, bindPort);
    13.         //连接数
    14.         this.accepts = url.getParameter(ACCEPTS_KEYDEFAULT_ACCEPTS);
    15.         try {
    16.             //调用该抽象方法启动服务
    17.             doOpen();
    18.             if (logger.isInfoEnabled()) {
    19.                 logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
    20.             }
    21.         } catch (Throwable t) {
    22.             throw new RemotingException(url.toInetSocketAddress(), null"Failed to bind " + getClass().getSimpleName()
    23.                     + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    24.         }
    25.         //创建该服务对应的线程池
    26.         executor = executorRepository.createExecutorIfAbsent(url);
    27.     }

    AbstractClient

    AbstractClient是对客户端的抽象,同样它的继承和AbstractServer也一样,只是在实现不同而已,接下来我们来看看AbstractClient的实现,该类内部有4个关键的字段,对于executor和executorRepository这两个字段与AbstractServer功能类似,这里重点来介绍connectLock和needReconnect,connectLock是当客户端进行连接、断开、重连等操作时,需要获取该锁进行同步操作,needReconnect 在客户端发送数据之前,会检查客户端的连接是否断开,如果断开了,则会根据needReconnect字段,决定是否重连。

    AbstractClient整体的初始化是在构造函数实现的,我们可以看到AbstractClient 定义了 doOpen、doClose、doConnect和doDisConnect四个抽象方法给子类实现,整体的设计与AbstractServer类似。

    1.     public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    2.         //调用父类构造方法
    3.         super(url, handler);
    4.         //从URL获取是否重连字段 默认是
    5.         needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEYtrue);
    6.         //初始化Executor
    7.         initExecutor(url);
    8.         try {
    9.             //初始化具体的底层实现client
    10.             doOpen();
    11.         } catch (Throwable t) {
    12.             //关闭
    13.             close();
    14.             throw new RemotingException(url.toInetSocketAddress(), null,
    15.                     "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    16.                             + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    17.         }
    18.         try {
    19.             //创建连接
    20.             connect();
    21.             if (logger.isInfoEnabled()) {
    22.                 logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
    23.             }
    24.         } catch (RemotingException t) {
    25.             if (url.getParameter(Constants.CHECK_KEYtrue)) {
    26.                 close();
    27.                 throw t;
    28.             } else {
    29.                 logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    30.                         + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
    31.             }
    32.         } catch (Throwable t) {
    33.             close();
    34.             throw new RemotingException(url.toInetSocketAddress(), null,
    35.                     "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    36.                             + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    37.         }
    38.     }

    AbstractChannel

    AbstractChannel的设计也是类似模板类的设计,对于不同的NIO框架来说有不同的Channel的实现,因此对于Dubbo来说也必须去抽象该实现,具体的不同交由子类进行实现,子类做映射。该类内部只有有一个Send方法,为了判断当前的连接是否还在,没有实现具体的发送消息。

    Netty4

    NettyTransporter

    NettyTransporter实现Transporter,当SPI机制触发的时候会自动加载实现NettyServer、NettyClient初始化创建。

    NettyServer

    接下来我们来看下Netty4中关于doOpen方法的实现,此处就是Netty Server启动的核心,也是Dubbo网络通信的服务端能力的提供者,就是Dubbo和Netty结合的核心。

    1.     protected void doOpen() throws Throwable {
    2.         //创建ServerBootstrap
    3.         bootstrap = new ServerBootstrap();
    4.         //创建boss EventLoopGroup
    5.         bossGroup = NettyEventLoopFactory.eventLoopGroup(1"NettyServerBoss");
    6.         //创建worker EventLoopGroup
    7.         workerGroup = NettyEventLoopFactory.eventLoopGroup(
    8.                 getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
    9.                 "NettyServerWorker");
    10.         //创建一个Netty的ChannelHandler
    11.         final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    12.         //此处的Channel是Dubbo的Channel
    13.         channels = nettyServerHandler.getChannels();
    14.         //会话保持
    15.         boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEYBoolean.FALSE);
    16.         bootstrap.group(bossGroup, workerGroup)
    17.                 .channel(NettyEventLoopFactory.serverSocketChannelClass())
    18.                 .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    19.                 .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    20.                 .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
    21.                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    22.                 .childHandler(new ChannelInitializer<SocketChannel>() {
    23.                     @Override
    24.                     protected void initChannel(SocketChannel ch) throws Exception {
    25.                         // FIXME: should we use getTimeout()?
    26.                         //连接空闲超时时间
    27.                         int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
    28.                         //创建Netty实现的decoder和encoder
    29.                         NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
    30.                         if (getUrl().getParameter(SSL_ENABLED_KEYfalse)) {
    31.                             //如果配置HTTPS 要实现SslHandler
    32.                             ch.pipeline().addLast("negotiation",
    33.                                     SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
    34.                         }
    35.                         ch.pipeline()
    36.                                 .addLast("decoder", adapter.getDecoder())
    37.                                 .addLast("encoder", adapter.getEncoder())
    38.                                 //心跳检查
    39.                                 .addLast("server-idle-handler", new IdleStateHandler(00, idleTimeout, MILLISECONDS))
    40.                                 //注册nettyServerHandler
    41.                                 .addLast("handler", nettyServerHandler);
    42.                     }
    43.                 });
    44.         // bind
    45.         ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    46.         //等待绑定完成
    47.         channelFuture.syncUninterruptibly();
    48.         channel = channelFuture.channel();
    49.     }

    此处与Netty启动不同的地方在于替换了Channel的实现为Dubbo实现,然后通过doOpen完成Server启动,大家也可以借助下图来进行理解:

    NettyCodecAdapter

    NettyCodecAdapter该类是对编解码的实现,主要是将Netty规则替换为为Dubbo的规则,该类内部有5个核心的属性,其中encoder和decoder是NettyCodecAdapter内部类,

    1.     //Netty Channel 编码
    2.     private final ChannelHandler encoder = new InternalEncoder();
    3.     //Netty Channel 解码
    4.     private final ChannelHandler decoder = new InternalDecoder();
    5.     //Dubbo 的编解码
    6.     private final Codec2 codec;
    7.     //URL参数
    8.     private final URL url;
    9.     //Dubbo ChannelHandler
    10.     private final org.apache.dubbo.remoting.ChannelHandler handler;

    encoder和decoder是对Netty中的ByteToMessageDecoder和MessageToByteEncoder的实现,也正是此处的实现将真正的编码委托给Codec2进行实现,

    1.     private class InternalEncoder extends MessageToByteEncoder {
    2.         @Override
    3.         protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    4.             //Netty对ChannelBuffer的实现 操作字节数组
    5.             //将Netty  ByteBuf 包装为 Dubbo  ChannelBuffer
    6.             ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
    7.             //获取关联的Channel
    8.             Channel ch = ctx.channel();
    9.             NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
    10.             //codec进行编码
    11.             codec.encode(channel, buffer, msg);
    12.         }
    13.     }
    14.     private class InternalDecoder extends ByteToMessageDecoder {
    15.         @Override
    16.         protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
    17.             //将Netty  ByteBuf 包装为 Dubbo  ChannelBuffer
    18.             ChannelBuffer message = new NettyBackedChannelBuffer(input);
    19.             //获取关联的Channel
    20.             NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    21.             // decode object.
    22.             do {
    23.                 //记录当前读到的位置
    24.                 int saveReaderIndex = message.readerIndex();
    25.                 //codec进行解码
    26.                 Object msg = codec.decode(channel, message);
    27.                 //判断消息长度是否足够
    28.                 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
    29.                     //重置读取的位置
    30.                     message.readerIndex(saveReaderIndex);
    31.                     break;
    32.                 } else {
    33.                     //边界值判断
    34.                     if (saveReaderIndex == message.readerIndex()) {
    35.                         throw new IOException("Decode without read data.");
    36.                     }
    37.                     //将消息传递给其他Handler
    38.                     if (msg != null) {
    39.                         out.add(msg);
    40.                     }
    41.                 }
    42.             } while (message.readable());
    43.         }
    44.     }

    NettyServerHandler

    NettyServerHandler该类继承了Netty的ChannelDuplexHandler,该类具备同时处理Inbound和Outbound的能力,我们来看下整体的继承结构,整体的继承结构确实也是一样的。

    该类内部主要有3个核心字段,这里相对比较重要的是channels和handler, channels字段缓存当前所有Server创建的Channel,所有的创建、断开连接的时候都会操作channels该对象,handler在内部所有的实现都是通过Dubbo ChannelHandler,这样就完成对Netty的替换;代码如下:

    1.     @Override
    2.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
    3.         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    4.         if (channel != null) {
    5.             //新建的链接 增加缓存
    6.             channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
    7.         }
    8.         //使用Dubbo ChannelHandler建立连接
    9.         handler.connected(channel);
    10.         if (logger.isInfoEnabled()) {
    11.             logger.info("The connection of " + channel.getRemoteAddress() + " -> " + channel.getLocalAddress() + " is established.");
    12.         }
    13.     }
    14.     @Override
    15.     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    16.         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    17.         try {
    18.             //关闭连接 移除缓存
    19.             channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()));
    20.             //关闭释放Dubbo ChannelHandler
    21.             handler.disconnected(channel);
    22.         } finally {
    23.             //NettyChannel也同时移除
    24.             NettyChannel.removeChannel(ctx.channel());
    25.         }
    26.         if (logger.isInfoEnabled()) {
    27.             logger.info("The connection of " + channel.getRemoteAddress() + " -> " + channel.getLocalAddress() + " is disconnected.");
    28.         }
    29.     }

    在NettyServer创建的时候,有下图代码,这里的this指的就是NettyServer,在NettyServerHandler里面第二个参数是ChannelHandler,同时NettyServer又继承了实现ChannelHandler的AbstractPeer,因此NettyServerHandler在创建的时候就会将所有数据委托给ChannelHandler进行处理,此处体现多态的魅力。

    到此相信你也对Netty Server以及Dubbo Server设计有了一个深入的了解,可以参考下图,上层是对Client、Channel等能力的抽象,这些抽象能力抽象接口实现,这样子该抽象方法子类又可以有不同的实现,这样子就完成上层能力的建设,下层又可以根据自身特点完成自己编解码以及服务的实现,做到了灵活多变,自由扩展。

    NettyClient

    NettyClient实现与NettyServer类似,都是初始化自身服务,这里我们来看下实现;

    1.   @Override
    2.     protected void doOpen() throws Throwable {
    3.         //创建NettyClientHandler 做法与Server类似
    4.         final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    5.         bootstrap = new Bootstrap();
    6.         bootstrap.group(EVENT_LOOP_GROUP)
    7.                 .option(ChannelOption.SO_KEEPALIVE, true)
    8.                 .option(ChannelOption.TCP_NODELAY, true)
    9.                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    10.                 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
    11.                 .channel(socketChannelClass());
    12.         //设置超时时间
    13.         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));
    14.         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    15.             @Override
    16.             protected void initChannel(SocketChannel ch) throws Exception {
    17.                 //设置心跳的间隔
    18.                 int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
    19.                 if (getUrl().getParameter(SSL_ENABLED_KEYfalse)) {
    20.                     ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
    21.                 }
    22.                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
    23.                 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
    24.                         //解密编码
    25.                         .addLast("decoder", adapter.getDecoder())
    26.                         .addLast("encoder", adapter.getEncoder())
    27.                         //设置心跳
    28.                         .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 00, MILLISECONDS))
    29.                         //注册nettyClientHandler
    30.                         .addLast("handler", nettyClientHandler);
    31.                 //如果需要Socks5Proxy,需要添加Socks5ProxyHandler(略
    32.                 String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
    33.                 if(socksProxyHost != null) {
    34.                     int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
    35.                     Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
    36.                     ch.pipeline().addFirst(socks5ProxyHandler);
    37.                 }
    38.             }
    39.         });
    40.     }

    形成通信的通道的图也是类似:

    对于NettyClientHandler实现整体上与NettyServerHandler的设计思路类似,这里就不进行介绍了,

    NettyChannel

    NettyChannel是对AbstractChannel一种实现,有四个字段,

    1.     //缓存Netty Channel 和 Dubbo Channel的对应关系
    2.     private static final ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP = new ConcurrentHashMap<Channel, NettyChannel>();
    3.     //Netty Channel
    4.     private final Channel channel;
    5.     //Channnel附加的属性缓存
    6.     private final Map<StringObject> attributes = new ConcurrentHashMap<StringObject>();
    7.     //标识当前channel是否可用
    8.     private final AtomicBoolean active = new AtomicBoolean(false);
    9.     //炒作Channel也会操作缓存的内容
    10.     static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
    11.         if (ch == null) {
    12.             return null;
    13.         }
    14.         NettyChannel ret = CHANNEL_MAP.get(ch);
    15.         if (ret == null) {
    16.             NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
    17.             if (ch.isActive()) {
    18.                 nettyChannel.markActive(true);
    19.                 ret = CHANNEL_MAP.putIfAbsent(ch, nettyChannel);
    20.             }
    21.             if (ret == null) {
    22.                 ret = nettyChannel;
    23.             }
    24.         }
    25.         return ret;
    26.     }
    27.     /**
    28.      * Remove the inactive channel.
    29.      *
    30.      * @param ch netty channel
    31.      */
    32.     static void removeChannelIfDisconnected(Channel ch) {
    33.         if (ch != null && !ch.isActive()) {
    34.             NettyChannel nettyChannel = CHANNEL_MAP.remove(ch);
    35.             if (nettyChannel != null) {
    36.                 nettyChannel.markActive(false);
    37.             }
    38.         }
    39.     }

    接下来就是核心send方法的实现,此处会关联Netty Channel,将数据发送出去,此处就是子类具体的实现。

    1.     public void send(Object message, boolean sent) throws RemotingException {
    2.         //调用父类 判断连接是否可用
    3.         super.send(message, sent);
    4.         boolean success = true;
    5.         int timeout = 0;
    6.         try {
    7.             //netty channel 发送数据
    8.             ChannelFuture future = channel.writeAndFlush(message);
    9.             if (sent) {
    10.                 //等待发送结束
    11.                 timeout = getUrl().getPositiveParameter(TIMEOUT_KEYDEFAULT_TIMEOUT);
    12.                 success = future.await(timeout);
    13.             }
    14.             //判断是否异常
    15.             Throwable cause = future.cause();
    16.             if (cause != null) {
    17.                 throw cause;
    18.             }
    19.         } catch (Throwable e) {
    20.             //异常断开netty连接 移除缓存关系
    21.             removeChannelIfDisconnected(channel);
    22.             throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    23.         }
    24.         if (!success) {
    25.             throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
    26.                     + "in timeout(" + timeout + "ms) limit");
    27.         }
    28.     }

    未完待续

    欢迎大家点点关注,点点赞!

  • 相关阅读:
    R语言ggplot2可视化:使用ggpubr包的ggdensity函数可视化密度图、使用fill参数自定义密度图的填充色
    ElementUI之CUD+表单验证
    上周热点回顾(3.6-3.12)
    什么???CSS也能原子化!
    为何在中国 Navicat 远比 DBeaver 流行
    Python实现FPGA板卡仿真验证方法
    【云原生】Docker小工具:runlike与whaler(打印容器的启动命令与导出镜像的dockerfile)
    全志R128芯片应用开发案例——ADC驱动烟雾传感器
    python+vue驾校驾驶理论考试模拟系统
    B083-SpringCloud-eureka ribbon feign hystrix
  • 原文地址:https://blog.csdn.net/weixin_38592881/article/details/127759408