• Dubbo-聊聊Dubbo协议


    前言

    Dubbo源码阅读分享系列文章,欢迎大家关注点赞

    SPI实现部分

    1. Dubbo-SPI机制

    2. Dubbo-Adaptive实现原理

    3. Dubbo-Activate实现原理

    4. Dubbo SPI-Wrapper

    注册中心

    1. Dubbo-聊聊注册中心的设计

    2. Dubbo-时间轮设计

    通信

    1. Dubbo-聊聊通信模块设计

    什么是协议

    在网络交互中是以字节流的形式传递的,对于字节流都是二进制格式,这样我们就面临一个问题就是如何转化为我们可以识别的字符,协议就是来解决这个问题的,协议用通俗易懂地解释就是通信双方需要遵循的约定。 在日常开发中,我们常见的网络传输协议有TCP、UDP、HTTP。常用的中间件也会定义对应的协议,如Redis、Mysql、Zookeeper等都有自己约定的协议,同样Dubbo的通信也采用一种协议,这些都是应用层协议,都是基于TCP或者UDP设计的。

    如何定义协议

    应用层协议一般的形式有三种:定长协议、特殊结束符和变长协议,聊到这里就可以抛出来一个常见的面试题,如何解决网络通信粘包和拆包的问题?该问题的解决方案也就是通过约定协议,下面我们就来聊聊这三种模式优缺点以及使用场景。

    定长协议

    定长的协议是指协议内容的长度是固定的,比如协议byte长度是50,当从网络上读取50个byte后,就进行decode解码操作。

    优点

    定长协议在读取或者写入时,效率比较高,因为数据大小都是确定的。

    缺点

    定长协议的缺点在于适应性不足,网络传输中传输的内容的大小不可能都是相同的,因此对于一些长度不够的消息,明显过于的浪费带宽。

    特殊结束符

    特殊结束符就是在每次传输结束的时候使用一个特殊的结束符,在Redis中的协议采用了特殊结束符,客户端和服务器发送的命令一律使用\r\n(CRLF)结尾。

    优点

    与定长协议一样读取或者写入时,效率比较高,同时解决定长协议的尴尬。

    缺点

    特殊结束符方式的问题是必须要有一个完整的消息体才能进行传输,除此之外必须要防止用户传输的数据不能同结束符相同,否则就会出现紊乱。

    变长协议

    变长协议由定长以及不定长两部分组成,定长部分一般是协议头,此部分会包含变长部分的描述,变长协议我们经常使用的HTTP协议采用变长协议,HTTP请求报文格式是由三部分组成:

    1. 请求行:包括Url、Version等,由空格分隔,\r\n结尾;

    2. 请求头:多行,每行是key:value的格式,以\r\n结尾;

    3. 请求体:请求头与请求体直接由一个空白行分隔,请求体的长度在请求头中由content-length给出;

    优点

    灵活性比较高,解决了定长协议以及特殊结束符的所有缺点。

    缺点

    复杂性比较高,需要自定义一套标准,所有消息都需要按照该格式发送以及解析。

    Dubbo协议

    Dubbo框架支持很多协议,默认采用Dubbo协议,Dubbo协议采用的是变长协议的设计,整体的格式如下:

    1. 0~7位和8~15位分别是Magic High和Magic Low,是固定魔数值(0xdabb),我们可以通过这两个Byte,判断是否为Dubbo协议;

    2. 16位是Req/Res标识,用于标识当前消息是请求还是响应;

    3. 17位是2Way标识,用于标识当前消息是单向还是双向,如果需要来自服务器的返回值,则设置为1;

    4. 18位是Event标识,用于标识当前消息是否为事件消息;

    5. 19~23位是序列化类型的标志,用于标识当前消息使用哪一种序列化算法;

    6. 24~31位是Status状态,用于记录响应的状态,当Req/Res为0时才有用;

    7. 32~95位是Request ID,用于记录请求的唯一标识;

    8. 96~127位是序列化后的内容长度,该值是按字节计算;

    9. 128位之后是可变的数据,被特定的序列化类型序列化后,每个部分都是一个 byte [] 或者byte,如果是请求包,则每个部分依次为:Dubbo version、Service name、Service version、Method name、Method parameter types、Method arguments 和 Attachments。如果是响应包,则每个部分依次为:返回值类型、返回值;

    image.png

    优点

    Dubbo协议整体设计比较简洁,能采用1个bit表示的,不会用一个byte来表示;此外请求头和响应头一致,整体采用一套解析标准就可以,代码实现起来相对简单。

    缺点

    由于整体的设计相对简洁,导致扩展性不够;

    Dubbo协议是如何解析的

    在通信篇中我们讲过Codec2该接口,该接口提供了encode和decode个方法来实现消息与字节流之间的相互转换,关于该接口的实现我们没有讲解,这里我们来看看此部分和Dubbo协议有什么关系。

    AbstractCodec抽象类没有实现Codec2中定义的接口方法,而是提供了几个给子类用的基础方法。

    1. getSerialization方法:通过SPI获取当前使用的序列化方式;

    2. checkPayload方法:检查编解码数据的长度,如果数据超长,会抛出异常;

    3. isClientSide、isServerSide方法:判断当前是Client端还是Server端;

    接下来我们就来聊聊子类如何被解析的,我们可以看到四个子类的继承关系,重点介绍的是ExchangeCodec和DubboCodec,其他就是做一下简单介绍。 TransportCodec该类已经被标注为弃用,该类内部也就是根据getSerialization方法选择的序列化方法,对传入消息或ChannelBuffer进行序列化或反序列化。 TelnetCodec继承了TransportCodec的能力,该类主要是提供了对Telnet命令处理的能力,该功能主要是对服务进行治理的功能,这里后续我们画一点时间来进行介绍。

    ExchangeCodec

    ExchangeCodec继承了TelnetCodec,在该类基础上增加Dubbo协议头的处理能力,接下来我们首先来看下其核心字段,

    1. //协议头长度
    2. protected static final int HEADER_LENGTH = 16;
    3. //魔数 判断是否是Dubbo协议
    4. protected static final short MAGIC = (short) 0xdabb;
    5. protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    6. protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    7. //设置请求响应标志位
    8. protected static final byte FLAG_REQUEST = (byte) 0x80;
    9. //单向还是双向标志位
    10. protected static final byte FLAG_TWOWAY = (byte) 0x40;
    11. //是否事件消息标志位
    12. protected static final byte FLAG_EVENT = (byte) 0x20;
    13. //序列化协议标志位
    14. protected static final int SERIALIZATION_MASK = 0x1f;

    通过核心字段我们可以发现其实和我们介绍的Dubbo的协议是一致的,因此接下来的encode和decode就是对Dubbo协议头的解密和编码,我们来下看encode方法,在encode方法中会根据需要编码的消息类型进行分类, 分为三类:Request、Response、telenet,encodeRequest方法专门对Request对象进行编码,encodeResponse方法对Response对象进行编码。

    1. @Override
    2.   public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    3.     //Request
    4.     if (msg instanceof Request) {
    5.       encodeRequest(channel, buffer, (Request) msg);
    6.       //Response
    7.     } else if (msg instanceof Response) {
    8.       encodeResponse(channel, buffer, (Response) msg);
    9.     } else {
    10.       //telenet
    11.       super.encode(channel, buffer, msg);
    12.     }
    13.   }
    14. protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    15.   Serialization serialization = getSerialization(channel, req);
    16.   //存储协议头
    17.   byte[] header = new byte[HEADER_LENGTH];
    18.   // set magic number.
    19.   Bytes.short2bytes(MAGIC, header);
    20.   //设置协议头标志位
    21.   header[2= (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    22.   if (req.isTwoWay()) {
    23.     header[2] |= FLAG_TWOWAY;
    24.   }
    25.   if (req.isEvent()) {
    26.     header[2] |= FLAG_EVENT;
    27.   }
    28.   //记录请求ID
    29.   Bytes.long2bytes(req.getId(), header, 4);
    30.   //序列化请求 并统计序列化以后字节数
    31.   int savedWriteIndex = buffer.writerIndex();
    32.   //将写入位置后移16
    33.   buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    34.   //请求序列化
    35.   ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    36.   //是否心跳检查 为空就是心跳检查
    37.   if (req.isHeartbeat()) {
    38.     // heartbeat request data is always null
    39.     bos.write(CodecSupport.getNullBytesOf(serialization));
    40.   } else {
    41.     ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    42.     //事件序列化
    43.     if (req.isEvent()) {
    44.       //事件序列化
    45.       encodeEventData(channel, out, req.getData());
    46.     } else {
    47.       //Dubbo请求序列化
    48.       encodeRequestData(channel, out, req.getData(), req.getVersion());
    49.     }
    50.     out.flushBuffer();
    51.     if (out instanceof Cleanable) {
    52.       ((Cleanable) out).cleanup();
    53.     }
    54.   }
    55.   bos.flush();
    56.   bos.close();
    57.   //获取字节数
    58.   int len = bos.writtenBytes();
    59.   //检查字节长度
    60.   checkPayload(channel, len);
    61.   //将字节数写入header数组中
    62.   Bytes.int2bytes(len, header, 12);
    63.   //重置写入位置
    64.   buffer.writerIndex(savedWriteIndex);
    65.   //写入消息头
    66.   buffer.writeBytes(header);
    67.   //buffer写出去的位置从writeIndex开始 加上header长度 数据长度
    68.   buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    69. }
    70. protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
    71.   int savedWriteIndex = buffer.writerIndex();
    72.   try {
    73.     //序列化
    74.     Serialization serialization = getSerialization(channel, res);
    75.     //协议头  长度为16字节
    76.     byte[] header = new byte[HEADER_LENGTH];
    77.     //魔数
    78.     Bytes.short2bytes(MAGIC, header);
    79.     //序列化方式
    80.     header[2= serialization.getContentTypeId();
    81.     //心跳还是正常消息
    82.     if (res.isHeartbeat()) {
    83.       header[2] |= FLAG_EVENT;
    84.     }
    85.     //响应状态
    86.     byte status = res.getStatus();
    87.     header[3= status;
    88.     //设置请求ID
    89.     Bytes.long2bytes(res.getId(), header, 4);
    90.     //写入时候真需要加上协议头长度
    91.     buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    92.     ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    93.     //对响应信息进行编码
    94.     if (status == Response.OK) {
    95.       if(res.isHeartbeat()){
    96.         //心跳
    97.         bos.write(CodecSupport.getNullBytesOf(serialization));
    98.       }else {
    99.         //正常响应
    100.         ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    101.         if (res.isEvent()) {
    102.           encodeEventData(channel, out, res.getResult());
    103.         } else {
    104.           encodeResponseData(channel, out, res.getResult(), res.getVersion());
    105.         }
    106.         out.flushBuffer();
    107.         if (out instanceof Cleanable) {
    108.           ((Cleanable) out).cleanup();
    109.         }
    110.       }
    111.     } else {
    112.       //错误消息
    113.       ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    114.       out.writeUTF(res.getErrorMessage());
    115.       out.flushBuffer();
    116.       if (out instanceof Cleanable) {
    117.         ((Cleanable) out).cleanup();
    118.       }
    119.     }
    120.     bos.flush();
    121.     bos.close();
    122.     //写入的长度
    123.     int len = bos.writtenBytes();
    124.     //检查消息长度
    125.     checkPayload(channel, len);
    126.     Bytes.int2bytes(len, header, 12);
    127.     //重置写入位置
    128.     buffer.writerIndex(savedWriteIndex);
    129.     //写入消息头
    130.     buffer.writeBytes(header);
    131.     //buffer写出去的位置从writeIndex开始 加上header长度 数据长度
    132.     buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    133.   } catch (Throwable t) {
    134.     // clear buffer
    135.     buffer.writerIndex(savedWriteIndex);
    136.     // send error message to Consumer, otherwise, Consumer will wait till timeout.
    137.     if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
    138.       Response r = new Response(res.getId(), res.getVersion());
    139.       r.setStatus(Response.BAD_RESPONSE);
    140.       if (t instanceof ExceedPayloadLimitException) {
    141.         logger.warn(t.getMessage(), t);
    142.         try {
    143.           r.setErrorMessage(t.getMessage());
    144.           channel.send(r);
    145.           return;
    146.         } catch (RemotingException e) {
    147.           logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
    148.         }
    149.       } else {
    150.         // FIXME log error message in Codec and handle in caught() of IoHanndler?
    151.         logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
    152.         try {
    153.           r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
    154.           channel.send(r);
    155.           return;
    156.         } catch (RemotingException e) {
    157.           logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
    158.         }
    159.       }
    160.     }
    161.     // Rethrow exception
    162.     if (t instanceof IOException) {
    163.       throw (IOException) t;
    164.     } else if (t instanceof RuntimeException) {
    165.       throw (RuntimeException) t;
    166.     } else if (t instanceof Error) {
    167.       throw (Error) t;
    168.     } else {
    169.       throw new RuntimeException(t.getMessage(), t);
    170.     }
    171.   }
    172. }

    ExchangeCodec的decode方法是encode方法的逆过程,会先检查魔数,然后读取协议头和后续消息的长度,最后根据协议头中的各个标志位构造相应的对象,以及反序列化数据。

    DubboCodec

    在ExchangeCodecencode的encode方法中,不论是encodeRequest还是encodeResponse都调用encodeRequestData方法,该方法会对Boby内容进行编码,该方法实现是在DubboCodec中,因此DubboCodec是对消息体的编解码,接下来我们来看下encodeRequestData和encodeResponseData方法的实现,

    1. protected void encodeRequestData(Channel channel, ObjectOutput out, Object dataString version) throws IOException {
    2.   RpcInvocation inv = (RpcInvocation) data;
    3.   //dubbo服务版本
    4.   out.writeUTF(version);
    5.   // https://github.com/apache/dubbo/issues/6138
    6.   String serviceName = inv.getAttachment(INTERFACE_KEY);
    7.   if (serviceName == null) {
    8.     //服务path
    9.     serviceName = inv.getAttachment(PATH_KEY);
    10.   }
    11.   //服务名
    12.   out.writeUTF(serviceName);
    13.   //版本号
    14.   out.writeUTF(inv.getAttachment(VERSION_KEY));
    15.   //方法名
    16.   out.writeUTF(inv.getMethodName());
    17.   //方法类型描述
    18.   out.writeUTF(inv.getParameterTypesDesc());
    19.   Object[] args = inv.getArguments();
    20.   if (args != null) {
    21.     for (int i = 0; i < args.length; i++) {
    22.       //参数值
    23.       out.writeObject(encodeInvocationArgument(channel, inv, i));
    24.     }
    25.   }
    26.   //附加属性
    27.   out.writeAttachments(inv.getObjectAttachments());
    28. }
    29. @Override
    30.   protected void encodeResponseData(Channel channel, ObjectOutput out, Object dataString version) throws IOException {
    31.   Result result = (Result) data;
    32.   //检验版本
    33.   boolean attach = Version.isSupportResponseAttachment(version);
    34.   Throwable th = result.getException();
    35.   if (th == null) {
    36.     Object ret = result.getValue();
    37.     if (ret == null) {
    38.       //空结果
    39.       out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
    40.     } else {
    41.       //正常写入
    42.       out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
    43.       out.writeObject(ret);
    44.     }
    45.   } else {
    46.     //异常
    47.     out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
    48.     out.writeThrowable(th);
    49.   }
    50.   if (attach) {
    51.     //Dubbo版本号
    52.     result.getObjectAttachments().put(DUBBO_VERSION_KEY, Version.getProtocolVersion());
    53.     out.writeAttachments(result.getObjectAttachments());
    54.   }
    55. }

    结束

    欢迎大家点点关注,点点赞!

  • 相关阅读:
    关于#单片机#的问题:第七行打印完他就会获取到温湿度,单片机的连接也正常,reset也没用单片机中烧写的完整代码如下图
    软件过程与管理学习之:项目计划(Project Schedule)
    一个非常简单的变分量子分类器 (VQC)
    C++之STL容器(2/3)
    shiro学习之HashedCredentialsMatcher密码匹配过程
    分享一个在linux中运行通义千问的方法
    Java 入门指南:JVM(Java虚拟机)——类的生命周期与加载过程
    微服务使用SockJs+Stomp实现Websocket 前后端实例 | Vuex形式断开重连、跨域等等问题踩坑(一)
    小程序源码:网课查题微信小程序源码下载,题库资源丰富自动采集,支持语音拍照识别
    从Oracle迁移数据到Hadoop
  • 原文地址:https://blog.csdn.net/weixin_38592881/article/details/127953674