• Dubbo(五):Dubbo 2.7 服务导出引入调用


    Spring 整合 Dubbo 源码: https://blog.csdn.net/menxu_work/article/details/126418487?spm=1001.2014.3001.5502

    服务导出 — ContextRefreshedEvent

    服务导出的入口为ServiceBean中的export()方法

    当Spring启动完之后,通过接收Spring的ContextRefreshedEvent事件来触发export()方法的执行。

    一个ServiceBean对象就表示一个Dubbo服务,ServiceBean对象中的参数就表示服务的参数,比如timeout,该对象的参数值来至@Service注解中所定义的。

    服务导出主要得做两件事情:

    1. 根据服务的参数信息,启动对应的网络服务器(netty、tomcat、jetty等),用来接收网络请求
    2. 将服务的信息注册到注册中心

    但是在做这两件事情之前得先把服务的参数确定好,因为一个Dubbo服务的参数,除开可以在@Service注解中去配置,还会继承Dubbo服务所属应用(Application)上的配置,还可以在配置中心或JVM环境变量中去配置某个服务的参数,所以首先要做的是确定好当前服务最终的(优先级最高)的参数值。

    确定好服务参数之后,就根据所配置的协议启动对应的网络服务器。在启动网络服务器时,并且在网络服务器接收请求的过程中,都可以从服务参数中获取信息,比如最大连接数,线程数,socket超时时间等等。

    启动完网络服务器之后,就将服务信息注册到注册中心。同时还有向注册中心注册监听器,监听Dubbo的中的动态配置信息变更。

    服务导出思路

    1. 确定服务的参数
    2. 确定服务支持的协议
    3. 构造服务最终的URL
    4. 将服务URL注册到注册中心去
    5. 根据服务支持的不同协议,启动不同的Server,用来接收和处理请求
    6. 因为Dubbo支持动态配置服务参数,所以服务导出时还需要绑定一个监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进行导出

    在这里插入图片描述

    导出服务

    ServiceBean extends ApplicationListener

    ServiceBean.onApplicationEvent(ContextRefreshedEvent event)

    ServiceBean.export(); // 服务导出(服务注册)

    super.export()

    ServcieConfig.export()

    • ServcieConfig.checkAndUpdateSubConfigs(); // 准备服务参数

        1. ServiceConfig中的某些属性如果是空的,那么就从ProviderConfig、ModuleConfig、 ApplicationConfig中获取
        1. 从配置中心获取配置,包括应用配置和全局配置
        1. 从配置中心获取Provider配置
        1. 从配置中心获取Protocol配置
        1. 如果ApplicationConfig为空,则构造一个ApplicationConfig
        1. 从配置中心获取Registry配置
        1. 更新ServiceConfig中的属性为优先级最高的配置
        1. 更新MetadataReportConfig中的属性为优先级最高的配置
        1. 检查当前服务是不是一个泛化服务
      • 10.检查Stub和Local
      • 11.检查Mock
    • ServcieConfig.doExport(); // 导出服务

    • doExportUrls();

        1. 得到服务注册url list registry://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&logger=log4j&pid=42222®istry=zookeeper&release=2.7.0×tamp=1661236388599
        1. 遍历每个协议 每种协议导出一个单独的服务,注册到各个注册中心
    • ServcieConfig.doExportUrlsFor1Protocol(protocolConfig, registryURLs); //导出一个单独的服务,注册到各个注册中心

      • 组装url的map参数, 并拼接URL
        dubbo://192.168.1.5:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider1-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.1.5&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=42222&release=2.7.0&side=provider×tamp=1661236769839
    • for registryURLs //遍历注册中心–将服务注册到注册中心
      registry://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&logger=log4j&pid=42222®istry=zookeeper&release=2.7.0×tamp=1661236388599

      • Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
        // 1. 生成一个当前服务接口的代理对象
        // 2. 使用代理生成一个Invoker,Invoker表示服务提供者的代理,可以使用Invoker的invoke方法执行服务
        // 3. 对应的url为registry://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.5%3A20881%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-provider1-application%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.1.5%26bind.port%3D20881%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26logger%3Dlog4j%26methods%3DsayHello%26pid%3D42222%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1661236769839&logger=log4j&pid=42222®istry=zookeeper&release=2.7.0×tamp=1661236388599
        // 4. 这个Invoker中包括了服务的实现者、服务接口类、服务的注册地址(针对当前服务的,参数export指定了当前服务)
        // 5. 此invoker表示一个可执行的服务,调用invoker的invoke()方法即可执行服务,同时此invoker也可用来导出

      • Exporter exporter = protocol.export(wrapperInvoker);
        // 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter (借助SPI ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();)
        // 1. 先使用RegistryProtocol进行服务注册
        // 2. 注册完了之后,使用DubboProtocol进行导出

    • RegistryProtocol.export(wrapperInvoker) // 特定的协议来对服务进行导出

      • 得到registryUrl: zookeeper://zookeeper.local.com:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-provider1-application&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.1.5%3A20881%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-provider1-application%26bean.name%3DServiceBean%3Aorg.apache.dubbo.demo.DemoService%26bind.ip%3D192.168.1.5%26bind.port%3D20881%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26logger%3Dlog4j%26methods%3DsayHello%26pid%3D42222%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1661236769839&logger=log4j&pid=42222&release=2.7.0×tamp=1661236388599
      • 服务提供者 providerUrl: dubbo://192.168.1.5:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider1-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.1.5&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=42222&release=2.7.0&side=provider×tamp=1661236769839
    • overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);

      1. 事件监听
      2. overrideSubscribeUrl对应一个OverrideListener,用来监听变化事件
      3. providerConfigurationListener表示应用级别的动态配置监听器
      4. serviceConfigurationListener表示服务级别的动态配置监听器
    • exporter = doLocalExport(originInvoker, providerUrl); // 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务

      • RegistryProtocol.doLocalExport(originInvoker,providerUrl)
        在这里插入图片描述
      • DubboProtocol.export
        1. exporterMap.put(“org.apache.dubbo.demo.DemoService:20881”, DubboExporter)
        2. openServer(url); // 开启NettyServer
        3. server.reset(url); // 服务重新导出时,配置有变化:关闭当前定时,新启定时任务 ==》任务超时关闭Channel channel.close();
    • registry.register(registeredProviderUrl); //注册服务,把简化后的服务提供者url注册到registryUrl中去

      1. final Registry registry = getRegistry(originInvoker); //得到注册中心-ZookeeperRegistry
      2. final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); //得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化

    服务引入

    服务目录

    1. 查询注册中心
    2. 服务提供者URL —> 配置优先级 --> 动态配置 —> List
    3. 路由链 ---- tagRouter --> AppRouter --> ServiceRouter https://dubbo.apache.org/zh/docs/v2.7/user/examples/routing-rule/
    4. 监听

    代理对象.method() ----- invoker.invoke(Invocation)

    1. 生成Invocation
    2. 获取服务提供者列表
    3. mock MockClusterInvoker.invoke(Invocation)
    4. 路由
    5. 负载均衡
    6. 集群容错 FailoverClusterInvoker.invoke(Invocation)
    7. 构造NettyClient DubboInvoker.invoke(Invocation)
    8. 发送数据(Invocation)

    服务引入 ReferenceBean implements FactoryBean

        @Override
        public Object getObject() {
            return get();
        }
    
    	public synchronized T get() {
    		.....
            if (ref == null) {
                // 入口
                init();
            }
            return ref;  // Invoke代理
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    链路

    • ReferenceBean.getObject
    • ReferenceBean.get
    • ReferenceBean.init
    • ReferenceBean.createProxy
    • ReferenceBean --> PROXY_FACTORY.getProxy(invoker);
    • AbstractProxyFactory.getProxy(invoker);
    • AbstractProxyFactory.getProxy(invoker, interfaces)
      • JavassistProxyFactory default
      • JdkProxyFactory
    • JavassistProxyFactory.getProxy(Invoker invoker, Class[] interfaces)
        public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
            return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
        }
    
    • 1
    • 2
    • 3
    • InvokerInvocationHandler.invoke //服务调用
      @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          String methodName = method.getName();
          Class<?>[] parameterTypes = method.getParameterTypes();
          if (method.getDeclaringClass() == Object.class) {
              return method.invoke(invoker, args);
          }
          if ("toString".equals(methodName) && parameterTypes.length == 0) {
              return invoker.toString();
          }
          if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
              return invoker.hashCode();
          }
          if ("equals".equals(methodName) && parameterTypes.length == 1) {
              return invoker.equals(args[0]);
          }
      
          // 这里的recreate方法很重要,他会调用AppResponse的recreate方法,
          // 如果AppResponse对象中存在exception信息,则此方法中会throw这个异常
          return invoker.invoke(new RpcInvocation(method, args)).recreate();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • MockClusterInvoker implements Invoker
        • no mock
        • force:direct mock
        • fail-mock
          • AbstractClusterInvoker.invoke(invocation).doInvoke
            • FailoverClusterInvoker
            • FailfastClusterInvoker

    AbstractClusterInvoker.invoke(invocation).doInvoke

        @Override
        public Result invoke(final Invocation invocation) throws RpcException {
            checkWhetherDestroyed();
    
            // binding attachments into invocation.
            Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
            if (contextAttachments != null && contextAttachments.size() != 0) {
                ((RpcInvocation) invocation).addAttachments(contextAttachments);
            }
    
            List<Invoker<T>> invokers = list(invocation); //路由
            LoadBalance loadbalance = initLoadBalance(invokers, invocation); //负载均衡
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
            return doInvoke(invocation, invokers, loadbalance); //服务容错
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    FailoverClusterInvoker
    在这里插入图片描述

    服务调用

    在这里插入图片描述

    在这里插入图片描述

    发送 DubboInvoker.doInvoke

    • CompletableFuture responseFuture = currentClient.request(inv, timeout);
    • HeaderExchangeClient.request
    • HeaderExchangeChannel.request
      @Override
      public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
          if (closed) {
              throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
          }
          // create request.
          Request req = new Request();
          req.setVersion(Version.getProtocolVersion());
          req.setTwoWay(true);
          req.setData(request);
          //构建DefaultFuture、同时添加一个超时任务task
          DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
          try {
              channel.send(req);
          } catch (RemotingException e) {
              future.cancel();
              throw e;
          }
          return future;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • channel.send(req);
    • AbstractPeer.send
    • AbstractClient.send
    • org.apache.dubbo.remoting.transport.netty4.NettyChannel.send
    @Override
        public void send(Object message, boolean sent) throws RemotingException {
            // whether the channel is closed
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                ChannelFuture future = channel.writeAndFlush(message);
                //sent=true 等待消息发出,消息发送失败抛异常
                //sent=false 不等待消息发出,将消息放入IO队列,直接返回
                if (sent) {
                    // wait timeout ms
                    timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                Throwable cause = future.cause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    接收 NettyServerHandler.channelRead

    • HeaderExchangeHandler.received
    • HeaderExchangeHandler.handleResponse(channel, (Response) message);
    • DefaultFuture.received(channel, response);
      public static void received(Channel channel, Response response, boolean timeout) {
          try {
              // response的id,
              DefaultFuture future = FUTURES.remove(response.getId());
              if (future != null) {
                  Timeout t = future.timeoutCheckTask;
                  if (!timeout) {
                      // decrease Time
                      t.cancel();
                  }
                  future.doReceived(response);
              } else {
                  logger.warn("The timeout response finally returned at "
                          + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                          + ", response " + response
                          + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                          + " -> " + channel.getRemoteAddress()));
              }
          } finally {
              CHANNELS.remove(response.getId());
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
  • 相关阅读:
    计算机毕业设计Java校园疫情信息管理系统(源码+系统+mysql数据库+Lw文档)
    SRRC认证的必要性:保障电子产品质量安全的重要措施
    dubbo学习之事件通知实践
    内网信息收集
    【前端笔记】SCSS学习篇之一:基础入门
    fastadmin框架如何查询数据表指定时间段内的数据
    数据库的增删改(DML)
    LeetCode 374. 猜数字大小
    晚上玩电脑,删除一些补丁,竟然出现两个怪问题
    性能测试:系统架构性能优化思路
  • 原文地址:https://blog.csdn.net/menxu_work/article/details/126467202