• Dubbo-RPC核心接口介绍


    前言

    Dubbo源码阅读分享系列文章,欢迎大家关注点赞

    SPI实现部分

    1. Dubbo-SPI机制

    2. Dubbo-Adaptive实现原理

    3. Dubbo-Activate实现原理

    4. Dubbo SPI-Wrapper

    注册中心

    1. Dubbo-聊聊注册中心的设计

    2. Dubbo-时间轮设计

    通信

    1. Dubbo-聊聊通信模块设计

    RPC

    1. 聊聊Dubbo协议

    整体介绍

    Dubbo的RPC其实是对Protocol的封装,整体的结构与Remoting类似,dubbo-rpc-api是对具体协议、服务暴露、服务引用、代理等的抽象,是整个RPC中的核心,其他模块是对该层具体的实现,每个模块都是Dubbo具体支持的协议。

    dubbo-rpc-api

    dubbo-rpc-api整体模块如图所示,整体接口包括了filter、listener、protocol、proxy、support以及核心API,接下来我们先来看下核心的接口介绍。

    核心接口

    开始之前我们来先来回顾一下之前介绍RPC请求的过程,

    1. Rpc Client通过传入的IP、端口号、调用类以及方法的参数,通过动态代理找到具体的调用类的方法,将请求的类、方法序列化,传输到服务端;

    2. 当Rpc Service收到请求以后,将传入类和方法反序列化,通过反射找到对应的类的方法进行调用,最后将返回结果进行序列化,返回客户端;

    3. Rpc Client收到返回值以后,进行反序列化,最后将结果展示;

    这里为什么要回顾整个过程,这样后面介绍抽象的接口的时候大家会更更容易理解为什么这么抽象。

    Invoker

    Invoker接口内部有三个方法,分别是getInterface、invoke、destroyAll,getInterface该方法主要是获取服务接口相关的信息,invoke主要是发起一次调用以及相应信息,destroyAll主要用于销毁调用请求。

    1. public interface Invoker<T> extends Node {
    2.     //获取服务接口
    3.     Class<T> getInterface();
    4.     //发起调用
    5.     Result invoke(Invocation invocation) throws RpcException;
    6.     //销毁调用连接
    7.     default void destroyAll() {
    8.         destroy();
    9.     }
    10. }

    Invocation

    Invocation是invoke的参数,内部抽象了RPC调用的目标服务、方法信息、相关参数信息、具体的参数值以及一些附加信息。

    1. public interface Invocation {
    2.     //调用Service的唯一标识
    3.     String getTargetServiceUniqueName();
    4.     
    5.     String getProtocolServiceKey();
    6.     //调用的方法名称
    7.     String getMethodName();
    8.     //服务名称
    9.     String getServiceName();
    10.     //参数类型集合
    11.     Class<?>[] getParameterTypes();
    12.     //参数签名集合
    13.     default String[] getCompatibleParamSignatures() {
    14.         return Stream.of(getParameterTypes())
    15.                 .map(Class::getName)
    16.                 .toArray(String[]::new);
    17.     }
    18.     //调用具体的参数值
    19.     Object[] getArguments();
    20.     //调用关联的Invoker对象
    21.     Map<StringString> getAttachments();
    22.     @Experimental("Experiment api for supporting Object transmission")
    23.     Map<StringObject> getObjectAttachments();
    24.     void setAttachment(String keyString value);
    25.     @Experimental("Experiment api for supporting Object transmission")
    26.     void setAttachment(String keyObject value);
    27.     @Experimental("Experiment api for supporting Object transmission")
    28.     void setObjectAttachment(String keyObject value);
    29.     void setAttachmentIfAbsent(String keyString value);
    30.     @Experimental("Experiment api for supporting Object transmission")
    31.     void setAttachmentIfAbsent(String keyObject value);
    32.     @Experimental("Experiment api for supporting Object transmission")
    33.     void setObjectAttachmentIfAbsent(String keyObject value);
    34.     /**
    35.      * get attachment by key.
    36.      *
    37.      * @return attachment value.
    38.      * @serial
    39.      */
    40.     String getAttachment(String key);
    41.     @Experimental("Experiment api for supporting Object transmission")
    42.     Object getObjectAttachment(String key);
    43.     /**
    44.      * get attachment by key with default value.
    45.      *
    46.      * @return attachment value.
    47.      * @serial
    48.      */
    49.     String getAttachment(String keyString defaultValue);
    50.     @Experimental("Experiment api for supporting Object transmission")
    51.     Object getObjectAttachment(String keyObject defaultValue);
    52.     /**
    53.      * get the invoker in current context.
    54.      *
    55.      * @return invoker.
    56.      * @transient
    57.      */
    58.     Invoker<?> getInvoker();
    59.     //Invoker对象可以设置一些KV属性,这些属性并不会传递给Provider
    60.     Object put(Object keyObject value);
    61.     Object get(Object key);
    62.     Map<ObjectObject> getAttributes();
    63. }

    Result

    Result接口是Invoker.invoke方法的返回值,该返回值包含了被调用方返回值(或是异常)以及附加信息,我们也可以添加回调方法,在 RPC 调用方法结束时会触发这些回调。

    1. public interface Result extends Serializable {
    2.     //调用的返回值
    3.     Object getValue();
    4.     void setValue(Object value);
    5.     //异常处理方法
    6.     Throwable getException();
    7.     void setException(Throwable t);
    8.     boolean hasException();
    9.     //复合操作,如果调用发生异常,则直接抛出异常,如果没有异常,则返回结果
    10.     Object recreate() throws Throwable;
    11.     //携带附加信息
    12.     Map<StringString> getAttachments();
    13.     @Experimental("Experiment api for supporting Object transmission")
    14.     Map<StringObject> getObjectAttachments();
    15.     void addAttachments(Map<StringString> map);
    16.     @Experimental("Experiment api for supporting Object transmission")
    17.     void addObjectAttachments(Map<StringObject> map);
    18.     void setAttachments(Map<StringString> map);
    19.     @Experimental("Experiment api for supporting Object transmission")
    20.     void setObjectAttachments(Map<StringObject> map);
    21.     String getAttachment(String key);
    22.     @Experimental("Experiment api for supporting Object transmission")
    23.     Object getObjectAttachment(String key);
    24.     String getAttachment(String keyString defaultValue);
    25.     @Experimental("Experiment api for supporting Object transmission")
    26.     Object getObjectAttachment(String keyObject defaultValue);
    27.     void setAttachment(String keyString value);
    28.     @Experimental("Experiment api for supporting Object transmission")
    29.     void setAttachment(String keyObject value);
    30.     @Experimental("Experiment api for supporting Object transmission")
    31.     void setObjectAttachment(String keyObject value);
    32.     //添加回调 当RPC调用完成时,会触发回调
    33.     Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn);
    34.     <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
    35.     //阻塞线程,等待此次RPC调用完成
    36.     Result get() throws InterruptedException, ExecutionException;
    37.     Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    38. }

    Exporter

    Exporter暴露Invoker的实现,就是让Provider能够根据请求的各种信息,找到对应的Invoker的实现。

    1. public interface Exporter<T> {
    2.     //获取Invoker对象
    3.     Invoker getInvoker();
    4.     //取消Invoker对象
    5.     void unexport();
    6. }

    Protocol

    Protocol接口主要有三个核心方法export、refer以及destroy,export主要是将Invoker服务暴露出去,refer引用一个服务将Invoker对象返回,destroy主要是销毁Invoker,释放Protocol对底层的占用。Protocol接口的实现中,export方法并不是简单地将Invoker对象包装成Exporter对象返回,其中还涉及代理对象的创建、底层Server的启动等操作;refer方法除了根据传入的type类型以及URL参数查询Invoker之外,还涉及相关Client的创建等操作。 此外该接口被SPI修饰,export和refer被Adaptive修饰,因此对于Protocol可以动态选择实现,此外Dubbo也提供多种Protocol实现。

    1. @SPI("dubbo")
    2. public interface Protocol {
    3.     //默认端口
    4.     int getDefaultPort();
    5.     //将一个Invoker暴露,该方法必须是幂等的
    6.     @Adaptive
    7.     <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    8.     //引用一个Invoker,返回一个Invoker对象
    9.     @Adaptive
    10.     <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    11.     //销毁export方法以及refer方法使用到的Invoker对象,释放当前Protocol对象底层占用的资源
    12.     void destroy();
    13.     //返回当前Protocol底层的全部ProtocolServer
    14.     default List<ProtocolServer> getServers() {
    15.         return Collections.emptyList();
    16.     }
    17. }

    Filter

    Filter接口用来拦截Dubbo请求,定义了一个invoke方法将请求传递给后续的Invoker进行处理。

    1. @SPI
    2. public interface Filter {
    3.     //将请求传给后续的Invoker处理
    4.     Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
    5.     //监听响应以及异常
    6.     interface Listener {
    7.         void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
    8.         void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
    9.     }
    10. }

    ProxyFactory

    ProxyFactory接口主要的功能是用来创建代理对象,此外ProxyFactory也是一个扩展接口,getProxy方法为Invoker创建代理对象,getInvoker方法将代理对象转为Invoker对象,默认采用javassist生成代理对象,Dubbo还提供很多种实现,可以通过SPI配置进行自定义。

    1. @SPI("javassist")
    2. public interface ProxyFactory {
    3.     //将Invoker对象转为代理对象
    4.     @Adaptive({PROXY_KEY})
    5.     <T> T getProxy(Invoker<T> invoker) throws RpcException;
    6.     @Adaptive({PROXY_KEY})
    7.     <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
    8.     //将proxy对象转为Invoker
    9.     @Adaptive({PROXY_KEY})
    10.     <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
    11. }

    结束

    欢迎大家点点关注,点点赞!

  • 相关阅读:
    s28.CentOS、Ubuntu、Rocky Linux系统初始化脚本v6版本
    王道p18 第12题假设 A中的 n个元素保存在一个一维数组中,请设计一个尽可能高效的算法,找出A的主元素。若存在主元素,则输出该元素:否则输出-1
    Vlookup 查找基本用法
    大数据毕业设计选题推荐-无线网络大数据平台-Hadoop-Spark-Hive
    【GIT版本控制管理】第4章 基本的Git概念
    linux驱动之设备树查找节点(3)
    【论文阅读|深读】DNGR:Deep Neural Networks for Learning Graph Representations
    S-MBRec学习笔记-徐老师重新整理
    直击产业落地 | 飞桨重磅推出业界首个模型选型工具
    AIR-CAP2702I-H-K9/AIR-CAP3602I-H-K9刷固件讲解
  • 原文地址:https://blog.csdn.net/weixin_38592881/article/details/128177447