• Dubbo之消费端服务RPC调用


    在消费端服务是基于接口调用Provider端提供的服务,所以在消费端并没有服务公共接口的实现类。

    1. @RestController
    2. @RequestMapping("/consumer")
    3. public class CountryController {
    4. @DubboReference(version = "2.0",lazy = true)
    5. CountryService countryService;
    6. @GetMapping("/getCountry")
    7. public JSONObject getCountry() {
    8. JSONObject rtn = new JSONObject();
    9. rtn.put("msg",countryService.getCountry());
    10. return rtn;
    11. }
    12. }
    1. 利用注解@DubboReference将目标接口CountryService作为CountryController类的字段属性,在解析类CountryController时获取全部字段属性并单独关注解析存在注解@DubboReference的字段属性。
    2. 通过步骤1得到服务公共接口类型,在生成RootBeanDefinition时设置其Class属性为ReferenceBean,最终将服务公共接口CountryService注册至IOC容器中。
    3. 通过JdkDynamicAopProxy对服务公共接口生成代理。

    1.ReferenceAnnotationBeanPostProcessor

    ReferenceAnnotationBeanPostProcessor重置服务目标接口CountryService在IOC注册表class的属性为ReferenceBean

    1. public class ReferenceBean implements FactoryBean{
    2. @Override
    3. public T getObject() {
    4. if (lazyProxy == null) {
    5. // 对目标类代理处理
    6. createLazyProxy();
    7. }
    8. return (T) lazyProxy;
    9. }
    10. private void createLazyProxy() {
    11. ProxyFactory proxyFactory = new ProxyFactory();
    12. proxyFactory.setTargetSource(new DubboReferenceLazyInitTargetSource());
    13. proxyFactory.addInterface(interfaceClass);
    14. Class[] internalInterfaces = AbstractProxyFactory.getInternalInterfaces();
    15. for (Class anInterface : internalInterfaces) {
    16. proxyFactory.addInterface(anInterface);
    17. }
    18. if (!StringUtils.isEquals(interfaceClass.getName(), interfaceName)) {
    19. Class serviceInterface = ClassUtils.forName(interfaceName, beanClassLoader);
    20. proxyFactory.addInterface(serviceInterface);
    21. }
    22. //jdk基于接口代理
    23. this.lazyProxy = proxyFactory.getProxy(this.beanClassLoader);
    24. }
    25. private Object getCallProxy() throws Exception {
    26. //get reference proxy
    27. return referenceConfig.get();
    28. }
    29. private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource {
    30. @Override
    31. protected Object createObject() throws Exception {
    32. return getCallProxy();
    33. }
    34. @Override
    35. public synchronized Class getTargetClass() {
    36. return getInterfaceClass();
    37. }
    38. }
    39. }

    jdk动态代理技术生成目标接口代理类过程中需要注意 DubboReferenceLazyInitTargetSourcelazyTarget属性【属性赋值时机、属性使用时机】。

    TargetSource之DubboReferenceLazyInitTargetSource是初始化lazyTarget属性值的时机其实是通过抽象类AbstractLazyCreationTargetSource完成的。

    1. public abstract class AbstractLazyCreationTargetSource implements TargetSource {
    2. /**
    3. * Returns the lazy-initialized target object,
    4. * creating it on-the-fly if it doesn't exist already.
    5. * @see #createObject()
    6. */
    7. @Override
    8. public synchronized Object getTarget() throws Exception {
    9. if (this.lazyTarget == null) {
    10. // 触发CountryService的代理
    11. this.lazyTarget = createObject();
    12. }
    13. return this.lazyTarget;
    14. }
    15. }

    根据注解得知lazyTarget属性赋值是懒加载方式得到的,即首次获取lazyTarget对象时才真正触发其完成赋值。但是实际情况是在创建目标接口的代理类时就实现赋值操作【不知道为啥?】。

    1. class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {
    2. // 代理目标类中的目标方法
    3. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    4. Object oldProxy = null;
    5. boolean setProxyContext = false;
    6. // 通过 ProxyFactory 获取TargetSource之DubboReferenceLazyInitTargetSource
    7. TargetSource targetSource = this.advised.targetSource;
    8. Object target = null;
    9. ...
    10. Object retVal;
    11. // 此处就是获取目标接口CountryService的代理,其实就是lazyTarget属性值
    12. target = targetSource.getTarget();
    13. Class targetClass = (target != null ? target.getClass() : null);
    14. // Get the interception chain for this method.
    15. List chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    16. if (chain.isEmpty()) {// 默认情况下成立
    17. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
    18. retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
    19. }
    20. else {
    21. ...
    22. }
    23. ...
    24. return retVal;
    25. }
    26. }
    27. 综上所述:目标接口代理类生成过程中还涉及lazyTarget属性赋值。而且发现目标接口代理类的类名、lazyTarget属性名均为CountryServiceDubboProxy0。但是两者区别是前者实例中持有的InvocationHandler类型为JdkDynamicAopProxy,后者持有的InvocationHandler类型为InvokerInvocationHandler。


      2.DubboDeployApplicationListener

      接口Protocol的扩展类如下图所示:

      1. public interface Protocol {
      2. // 服务提供者注册目标接口
      3. @Adaptive
      4. Exporter export(Invoker invoker) throws RpcException;
      5. // 服务消费端通过注解@DubboReference引进目标接口
      6. @Adaptive
      7. Invoker refer(Class type, URL url) throws RpcException;
      8. default List getServers() {
      9. return Collections.emptyList();
      10. }
      11. }

      ProtocolSerializationWrapper


      3.ServiceDiscoveryRegistry

      1. public class ServiceDiscoveryRegistry extends FailbackRegistry {
      2. private final AbstractServiceNameMapping serviceNameMapping;
      3. @Override
      4. public void doSubscribe(URL url, NotifyListener listener) {
      5. url = addRegistryClusterKey(url);
      6. serviceDiscovery.subscribe(url, listener);
      7. boolean check = url.getParameter(CHECK_KEY, false);
      8. String key = ServiceNameMapping.buildMappingKey(url);
      9. Lock mappingLock = serviceNameMapping.getMappingLock(key);
      10. mappingLock.lock();
      11. Set subscribedServices = serviceNameMapping.getCachedMapping(url);
      12. MappingListener mappingListener = new DefaultMappingListener(url, subscribedServices, listener);
      13. subscribedServices = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
      14. mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
      15. subscribeURLs(url, listener, subscribedServices);
      16. }
      17. protected void subscribeURLs(URL url, NotifyListener listener, Set serviceNames) {
      18. //
      19. serviceNames = toTreeSet(serviceNames);
      20. String serviceNamesKey = toStringKeys(serviceNames);
      21. String protocolServiceKey = url.getProtocolServiceKey();
      22. logger.info(String.format("Trying to subscribe from apps %s for service key %s, ", serviceNamesKey, protocolServiceKey));
      23. // register ServiceInstancesChangedListener
      24. Lock appSubscriptionLock = getAppSubscription(serviceNamesKey);
      25. try {
      26. appSubscriptionLock.lock();
      27. ServiceInstancesChangedListener serviceInstancesChangedListener = serviceListeners.get(serviceNamesKey);
      28. if (serviceInstancesChangedListener == null) {
      29. serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames);
      30. serviceInstancesChangedListener.setUrl(url);
      31. for (String serviceName : serviceNames) {//
      32. // 获取提供端服务的IP、端口号等信息
      33. List serviceInstances = serviceDiscovery.getInstances(serviceName);
      34. if (CollectionUtils.isNotEmpty(serviceInstances)) {
      35. serviceInstancesChangedListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
      36. }
      37. }
      38. serviceListeners.put(serviceNamesKey, serviceInstancesChangedListener);
      39. }
      40. if (!serviceInstancesChangedListener.isDestroyed()) {
      41. serviceInstancesChangedListener.setUrl(url);
      42. listener.addServiceListener(serviceInstancesChangedListener);
      43. serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, listener);
      44. serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
      45. } else {
      46. logger.info(String.format("Listener of %s has been destroyed by another thread.", serviceNamesKey));
      47. serviceListeners.remove(serviceNamesKey);
      48. }
      49. } finally {
      50. appSubscriptionLock.unlock();
      51. }
      52. }
      53. }
      54. public abstract class AbstractServiceNameMapping implements ServiceNameMapping, ScopeModelAware {
      55. @Override
      56. public Set getAndListen(URL registryURL, URL subscribedURL, MappingListener listener) {
      57. String key = ServiceNameMapping.buildMappingKey(subscribedURL);
      58. // use previously cached services.
      59. Set mappingServices = this.getCachedMapping(key);
      60. // Asynchronously register listener in case previous cache does not exist or cache expired.
      61. if (CollectionUtils.isEmpty(mappingServices)) {
      62. ...
      63. } else {
      64. ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
      65. .getBean(FrameworkExecutorRepository.class).getMappingRefreshingExecutor();
      66. executorService.submit(new AsyncMappingTask(listener, subscribedURL, true));
      67. }
      68. return mappingServices;
      69. }
      70. private class AsyncMappingTask implements Callable> {
      71. private final MappingListener listener;
      72. private final URL subscribedURL;
      73. private final boolean notifyAtFirstTime;
      74. public AsyncMappingTask(MappingListener listener, URL subscribedURL, boolean notifyAtFirstTime) {
      75. this.listener = listener;
      76. this.subscribedURL = subscribedURL;
      77. this.notifyAtFirstTime = notifyAtFirstTime;
      78. }
      79. @Override
      80. public Set call() throws Exception {
      81. synchronized (mappingListeners) {
      82. Set mappedServices = emptySet();
      83. try {
      84. String mappingKey = ServiceNameMapping.buildMappingKey(subscribedURL);
      85. if (listener != null) {
      86. // 通过zk 客户端 获取服务提供端的服务名集合
      87. mappedServices = toTreeSet(getAndListen(subscribedURL, listener));
      88. Set listeners = mappingListeners.computeIfAbsent(mappingKey, _k -> new HashSet<>());
      89. listeners.add(listener);
      90. if (CollectionUtils.isNotEmpty(mappedServices)) {
      91. if (notifyAtFirstTime) {
      92. // 将 提供端服务名 添加到本地集合缓存 serviceNameMapping 中
      93. // DefaultMappingListener:本地缓存 & zk 服务端 之间保证 提供端服务名 一致性
      94. listener.onEvent(new MappingChangedEvent(mappingKey, mappedServices));
      95. }
      96. }
      97. } else {
      98. mappedServices = get(subscribedURL);
      99. if (CollectionUtils.isNotEmpty(mappedServices)) {
      100. AbstractServiceNameMapping.this.putCachedMapping(mappingKey, mappedServices);
      101. }
      102. }
      103. } catch (Exception e) {
      104. logger.error("Failed getting mapping info from remote center. ", e);
      105. }
      106. return mappedServices;
      107. }
      108. }
      109. }
      110. }
      NettyChannel真正netty发送数据。
      CodecSupportNetty相关协议。
    28. 相关阅读:
      python爬虫:同步模式和异步模式的区别
      算法 LeetCode 题解 | 两个数组的交集
      如何解决WIN11系统在安装HyperV虚拟机,网络桥接后网络上传慢的问题
      那些上线MES系统的企业都怎么样了?
      LeetCode 1417. 重新格式化字符串
      CAD如何自定义快捷键
      DataGrip 如何导出和恢复整个数据库数据,使用单个 SQL 文件
      PL/0 语言简介、PL/0 文法
      网络正常,微信正常登录,谷歌浏览器无法正常打开(100%解决)
      ShardingJDBC配置读写分离
    29. 原文地址:https://blog.csdn.net/qq_36851469/article/details/136261218