• Dubbo服务发布与消费过程概述


    1 Dubbo服务发布端和消费端启动过程图

    Dubbo服务发布端和消费端启动过程如下所示:

    (1)Dubbo服务发布端启动过程

    • ServiceConfig通过ProxyFactory将服务提供类(ref)转换成invoker;
    • 然后通过Protocol将invoker转为Exporter。在这个过程中会先创建并启动NettyServer监听服务连接,然后将服务注册到服务注册中心。

    (2)Dubbo服务消费端启动过程

    • ReferenceConfig通过Protocol将远程服务转换为Invoker。在创建Invoker的同时会创建并启动NettyClient对象,使服务消费端与服务提供端建立TCP连接。
    • 然后通过ProxyFactory将Invoker转换为客户端(即消费端)需要的接口。

    2 Dubbo服务发布端启动过程

    2.1 Dubbo服务发布端启动过程概述

    • ServiceConfig通过ProxyFactory将服务提供类(ref)转换成invoker;
    • 然后通过Protocol将invoker转为Exporter。在这个过程中会先启动NettyServer监听服务连接,然后将服务注册到服务注册中心。

    2.2 源码剖析

     2.2.1 服务发布端启动入口

    服务发布端启动入口为 org.apache.dubbo.config.ServiceConfigBase#export() 方法,其中ServiceConfigBase为ServiceConfig的父类。

    1. /**
    2. * export service and auto start application instance
    3. */
    4. public final void export() {
    5. export(RegisterTypeEnum.AUTO_REGISTER);
    6. }

    其中项目启动时,服务被注册到注册中心的方式有以下四种:

    1. public enum RegisterTypeEnum {
    2. /**
    3. * 不注册-Never register. Cannot be registered by any command(like QoS-online).
    4. */
    5. NEVER_REGISTER,
    6. /**
    7. * 手动注册-Manual register. Can be registered by command(like QoS-online), but not register by default.
    8. */
    9. MANUAL_REGISTER,
    10. /**
    11. * 自动注册(某个服务的所有的服务提供者都启动之后才注册)
    12. * (INTERNAL) Auto register by deployer. Will be registered after deployer started.
    13. * (Delay publish when starting. Prevent service from being invoked before all services are started)
    14. */
    15. AUTO_REGISTER_BY_DEPLOYER,
    16. /**
    17. * 自动注册(某个服务的其中一个服务提供者启动之后就注册)
    18. * Auto register. Will be registered when one service is exported.
    19. */
    20. AUTO_REGISTER;
    21. }

    export方法中调用了doExport方法(org.apache.dubbo.config.ServiceConfig#doExport),内部实现如下所示。

    1. protected synchronized void doExport(RegisterTypeEnum registerType) {
    2. // ...
    3. // 服务注册
    4. doExportUrls(registerType);
    5. exported();
    6. }
    7. private void doExportUrls(RegisterTypeEnum registerType) {
    8. // ...
    9. // 加载注册中心信息
    10. List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
    11. for (ProtocolConfig protocolConfig : protocols) {
    12. // ...
    13. // 注册服务
    14. doExportUrlsFor1Protocol(protocolConfig, registryURLs, registerType);
    15. }
    16. providerModel.setServiceUrls(urls);
    17. }
    18. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs, RegisterTypeEnum registerType) {
    19. Map map = buildAttributes(protocolConfig);
    20. // remove null key and null value
    21. map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));
    22. // init serviceMetadata attachments
    23. serviceMetadata.getAttachments().putAll(map);
    24. URL url = buildUrl(protocolConfig, map);
    25. processServiceExecutor(url);
    26. // 注册服务
    27. exportUrl(url, registryURLs, registerType);
    28. initServiceMethodMetrics(url);
    29. }
    30. private void exportUrl(URL url, List registryURLs, RegisterTypeEnum registerType) {
    31. String scope = url.getParameter(SCOPE_KEY);
    32. // don't export when none is configured
    33. if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
    34. // export to local if the config is not remote (export to remote only when config is remote)
    35. if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
    36. // 注册本地服务
    37. exportLocal(url);
    38. }
    39. // export to remote if the config is not local (export to local only when config is local)
    40. if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
    41. // ...
    42. // 注册远程服务
    43. url = exportRemote(url, registryURLs, registerType);
    44. // ...
    45. }
    46. }
    47. this.urls.add(url);
    48. }

    2.2.2 将服务提供类(ref)转化为Exporter

    ServiceConfig通过ProxyFactory将服务提供类(ref)转换成invoker,然后通过Protocol将invoker转为Exporter。主要的实现方法为doExportUrl()方法。

    其中注册本地服务和注册远程服务最终均调用了doExportUrl()方法(org.apache.dubbo.config.ServiceConfig#doExportUrl)。注册本地服务时,withMetaData为false;注册远程服务时,withMetaData为true。doExportUrl方法的具体实现如下所示。

    1. private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {
    2. if (!url.getParameter(REGISTER_KEY, true)) {
    3. registerType = RegisterTypeEnum.MANUAL_REGISTER;
    4. }
    5. if (registerType == RegisterTypeEnum.NEVER_REGISTER ||
    6. registerType == RegisterTypeEnum.MANUAL_REGISTER ||
    7. registerType == RegisterTypeEnum.AUTO_REGISTER_BY_DEPLOYER) {
    8. url = url.addParameter(REGISTER_KEY, false);
    9. }
    10. // 1、ServiceConfig通过ProxyFactory将服务提供类(ref)转换成invoker
    11. Invoker invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
    12. if (withMetaData) {
    13. // 转为invoker的包装类
    14. // wrap the invoker and all the metadata (ServiceConfig)
    15. invoker = new DelegateProviderMetaDataInvoker(invoker, this);
    16. }
    17. // 2、通过Protocol将invoker转为Exporter
    18. Exporter exporter = protocolSPI.export(invoker);
    19. exporters.computeIfAbsent(registerType, k -> new CopyOnWriteArrayList<>()).add(exporter);
    20. }

    2.2.3 创建并启动NettyServer

    (1)在创建Exporter的同时,会启动NettyServer监听服务连接,然后将服务注册到服务注册中心。

    如果为远程服务暴露,则其内部根据URL中Protocol的类型为registry,会选择RegistryProtocol。如果为本地服务暴露,则其内部根据URL中Protocol的类型为injvm,会选InjvmProtocol。

    (2)以RegistryProtocol为例的export方法如下所示。其中通过doLocalExport将Invoker转为Exporter,同时启动NettyServer。通过getRegistry获取注册中心,以及通过register将当前服务注册到服务注册中心

    (3)Dubbo服务发布端的NettyServer主要负责监听客户端连接以及读写IO操作。

    Dubbo默认的底层网络通信使用的是Netty,服务提供方NettyServer使用两级线程池,其中boss线程池主要用来接受客户端的链接请求,并把完成Tcp三次握手的连接分发给worker来处理。

    1. public Exporter export(final Invoker originInvoker) throws RpcException {
    2. URL registryUrl = getRegistryUrl(originInvoker);
    3. // url to export locally
    4. URL providerUrl = getProviderUrl(originInvoker);
    5. // Subscribe the override data
    6. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    7. final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    8. Map> overrideListeners = getProviderConfigurationListener(overrideSubscribeUrl).getOverrideListeners();
    9. overrideListeners.computeIfAbsent(overrideSubscribeUrl, k -> new ConcurrentHashSet<>())
    10. .add(overrideSubscribeListener);
    11. providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    12. //export invoker
    13. // 1、启动NettyServer
    14. final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);
    15. // url to registry
    16. // 2、获取注册中心
    17. final Registry registry = getRegistry(registryUrl);
    18. final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
    19. // decide if we need to delay publish (provider itself and registry should both need to register)
    20. boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
    21. if (register) {
    22. // 3、将当前服务注册到服务注册中心
    23. register(registry, registeredProviderUrl);
    24. }
    25. // register stated url on provider model
    26. registerStatedUrl(registryUrl, registeredProviderUrl, register);
    27. exporter.setRegisterUrl(registeredProviderUrl);
    28. exporter.setSubscribeUrl(overrideSubscribeUrl);
    29. exporter.setNotifyListener(overrideSubscribeListener);
    30. exporter.setRegistered(register);
    31. ApplicationModel applicationModel = getApplicationModel(providerUrl.getScopeModel());
    32. if (applicationModel.modelEnvironment().getConfiguration().convert(Boolean.class, ENABLE_26X_CONFIGURATION_LISTEN, true)) {
    33. if (!registry.isServiceDiscovery()) {
    34. // Deprecated! Subscribe to override rules in 2.6.x or before.
    35. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    36. }
    37. }
    38. notifyExport(exporter);
    39. //Ensure that a new exporter instance is returned every time export
    40. return new DestroyableExporter<>(exporter);
    41. }

    创建和启动NettyServer的入口:在doLocalExport中最终会调用DubboProtocol中的export方法,export方法之后的主要调用链路为export()->openServer()->createServer(),具体实现如下。

    1. public Exporter export(Invoker invoker) throws RpcException {
    2. checkDestroyed();
    3. URL url = invoker.getUrl();
    4. // 1、创建Exporter。export service.
    5. String key = serviceKey(url);
    6. DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
    7. //export a stub service for dispatching event
    8. // ...
    9. // 2、创建并启动NettyServer
    10. openServer(url);
    11. optimizeSerialization(url);
    12. return exporter;
    13. }
    14. private void openServer(URL url) {
    15. checkDestroyed();
    16. // find server.
    17. // 服务提供者机器的ip和端口
    18. String key = url.getAddress();
    19. // client can export a service which only for server to invoke
    20. // 只有服务提供者才会启动NettyServer监听
    21. boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    22. if (isServer) {
    23. ProtocolServer server = serverMap.get(key);
    24. if (server == null) {
    25. synchronized (this) {
    26. server = serverMap.get(key);
    27. if (server == null) {
    28. // 该服务对应的NettyServer还没启动过,则创建并启动NettyServer
    29. // 同一个机器的不同服务发布只会开启一个NettyServer
    30. serverMap.put(key, createServer(url));
    31. return;
    32. }
    33. }
    34. }
    35. // server supports reset, use together with override
    36. server.reset(url);
    37. }
    38. }
    39. private ProtocolServer createServer(URL url) {
    40. url = URLBuilder.from(url)
    41. // send readonly event when server closes, it's enabled by default
    42. .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
    43. // enable heartbeat by default
    44. .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
    45. .addParameter(CODEC_KEY, DubboCodec.NAME)
    46. .build();
    47. String transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
    48. if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
    49. throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);
    50. }
    51. ExchangeServer server;
    52. try {
    53. // 启动NettyServer
    54. server = Exchangers.bind(url, requestHandler);
    55. } catch (RemotingException e) {
    56. throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    57. }
    58. transporter = url.getParameter(CLIENT_KEY);
    59. if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
    60. throw new RpcException("Unsupported client type: " + transporter);
    61. }
    62. DubboProtocolServer protocolServer = new DubboProtocolServer(server);
    63. loadServerProperties(protocolServer);
    64. return protocolServer;
    65. }

    DubboProtocol的createServer()之后的调用链为:

    createServer()->Exchangers的bind()->HeaderExchanger的bind()->Transporters的bind()->NettyTransporter的bind()。具体实现如下。

    1. // Exchangers的bind方法
    2. public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    3. if (url == null) {
    4. throw new IllegalArgumentException("url == null");
    5. }
    6. if (handler == null) {
    7. throw new IllegalArgumentException("handler == null");
    8. }
    9. url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    10. return getExchanger(url).bind(url, handler);
    11. }
    12. // HeaderExchanger的bind方法
    13. public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    14. ExchangeServer server;
    15. boolean isPuServerKey = url.getParameter(IS_PU_SERVER_KEY, false);
    16. if(isPuServerKey) {
    17. server = new HeaderExchangeServer(PortUnificationExchanger.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    18. }else {
    19. server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    20. }
    21. return server;
    22. }
    23. // Transporters的bind方法
    24. public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
    25. if (url == null) {
    26. throw new IllegalArgumentException("url == null");
    27. }
    28. if (handlers == null || handlers.length == 0) {
    29. throw new IllegalArgumentException("handlers == null");
    30. }
    31. ChannelHandler handler;
    32. if (handlers.length == 1) {
    33. handler = handlers[0];
    34. } else {
    35. handler = new ChannelHandlerDispatcher(handlers);
    36. }
    37. return getTransporter(url).bind(url, handler);
    38. }
    39. // NettyTransporter的bind方法
    40. public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
    41. return new NettyServer(url, handler);
    42. }

    以 org.apache.dubbo.remoting.transport.netty4.NettyTransporter 为例,可以看到,在NettyTransporter的bind方法中,新建了一个NettyServer。

    在NettyServer的构造函数中,调用了其父类AbstractServer的构造函数,其中的核心方法为doOpen()-初始化和启动 netty server。具体如下。

    1. public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    2. // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in CommonConstants.
    3. // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
    4. // 父类的构造函数
    5. super(url, ChannelHandlers.wrap(handler, url));
    6. // read config before destroy
    7. serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
    8. }
    9. public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    10. super(url, handler);
    11. executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
    12. localAddress = getUrl().toInetSocketAddress();
    13. String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    14. int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    15. if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
    16. bindIp = ANYHOST_VALUE;
    17. }
    18. bindAddress = new InetSocketAddress(bindIp, bindPort);
    19. this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    20. try {
    21. // 初始化和启动 netty server
    22. doOpen();
    23. if (logger.isInfoEnabled()) {
    24. logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
    25. }
    26. } catch (Throwable t) {
    27. throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
    28. + " on " + bindAddress + ", cause: " + t.getMessage(), t);
    29. }
    30. executors.add(executorRepository.createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    31. }

    以 org.apache.dubbo.remoting.transport.netty4.NettyServer.doOpen 为例,doOpen()具体实现如下所示。

    1. /**
    2. * Init and start netty server
    3. *
    4. * @throws Throwable
    5. */
    6. @Override
    7. protected void doOpen() throws Throwable {
    8. // 创建ServerBootstrap
    9. bootstrap = new ServerBootstrap();
    10. // 创建Netty的boss线程池和worker线程池
    11. bossGroup = createBossGroup();
    12. workerGroup = createWorkerGroup();
    13. // 配置NettyServer,添加handler到管线
    14. final NettyServerHandler nettyServerHandler = createNettyServerHandler();
    15. channels = nettyServerHandler.getChannels();
    16. initServerBootstrap(nettyServerHandler);
    17. // bind
    18. try {
    19. // 绑定本地端口,并启动监听服务
    20. ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    21. channelFuture.syncUninterruptibly();
    22. channel = channelFuture.channel();
    23. } catch (Throwable t) {
    24. closeBootstrap();
    25. throw t;
    26. }
    27. }

    2.2.4 将服务注册到服务注册中心

    由上可知,以RegistryProtocol为例,在export方法中,通过doLocalExport将Invoker转为Exporter,同时启动NettyServer,监听服务连接。然后通过getRegistry方法获取注册中心,并通过register方法将当前服务注册到服务注册中心。

    2.2.4.1 获取注册中心

    通过getRegistry方法获取注册中心,最终是通过RegistryFactory子类AbstractRegistryFactory中的createRegistry方法创建注册中心。比如创建一个ZookeeperRegistry作为Zookeeper注册中心。具体实现如下所示。

    1. /**
    2. * Get an instance of registry based on the address of invoker
    3. *
    4. * @param registryUrl
    5. * @return
    6. */
    7. protected Registry getRegistry(final URL registryUrl) {
    8. RegistryFactory registryFactory = ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();
    9. return registryFactory.getRegistry(registryUrl);
    10. }
    11. // AbstractRegistryFactory 的 getRegistry
    12. @Override
    13. public Registry getRegistry(URL url) {
    14. if (registryManager == null) {
    15. throw new IllegalStateException("Unable to fetch RegistryManager from ApplicationModel BeanFactory. " +
    16. "Please check if `setApplicationModel` has been override.");
    17. }
    18. Registry defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
    19. if (null != defaultNopRegistry) {
    20. return defaultNopRegistry;
    21. }
    22. url = URLBuilder.from(url)
    23. .setPath(RegistryService.class.getName())
    24. .addParameter(INTERFACE_KEY, RegistryService.class.getName())
    25. .removeParameter(TIMESTAMP_KEY)
    26. .removeAttribute(EXPORT_KEY)
    27. .removeAttribute(REFER_KEY)
    28. .build();
    29. String key = createRegistryCacheKey(url);
    30. Registry registry = null;
    31. boolean check = url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
    32. // Lock the registry access process to ensure a single instance of the registry
    33. // 加锁,确保注册中心是单例对象
    34. registryManager.getRegistryLock().lock();
    35. try {
    36. // double check
    37. // fix https://github.com/apache/dubbo/issues/7265.
    38. defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
    39. if (null != defaultNopRegistry) {
    40. return defaultNopRegistry;
    41. }
    42. // 从本地缓存中获取注册中心对象
    43. registry = registryManager.getRegistry(key);
    44. if (registry != null) {
    45. return registry;
    46. }
    47. // create registry by spi/ioc
    48. // 创建服务注册中心实例
    49. registry = createRegistry(url);
    50. if (check && registry == null) {
    51. throw new IllegalStateException("Can not create registry " + url);
    52. }
    53. if (registry != null) {
    54. // 将注册中心对象存入本地缓存中
    55. registryManager.putRegistry(key, registry);
    56. }
    57. } catch (Exception e) {
    58. if (check) {
    59. throw new RuntimeException("Can not create registry " + url, e);
    60. } else {
    61. // 1-11 Failed to obtain or create registry (service) object.
    62. LOGGER.warn(REGISTRY_FAILED_CREATE_INSTANCE, "", "",
    63. "Failed to obtain or create registry ", e);
    64. }
    65. } finally {
    66. // Release the lock
    67. registryManager.getRegistryLock().unlock();
    68. }
    69. return registry;
    70. }
    71. /**
    72. * ZookeeperRegistryFactory 的 createRegistry
    73. * 其中 ZookeeperRegistryFactory 为 AbstractRegistryFactory 的子类
    74. */
    75. @Override
    76. public Registry createRegistry(URL url) {
    77. return new ZookeeperRegistry(url, zookeeperTransporter);
    78. }
    2.2.4.2 注册服务

    通过register方法将当前服务注册到服务注册中心。org.apache.dubbo.registry.integration.RegistryProtocol#register 实现如下所示。

    1. private static void register(Registry registry, URL registeredProviderUrl) {
    2. ApplicationDeployer deployer = registeredProviderUrl.getOrDefaultApplicationModel().getDeployer();
    3. try {
    4. deployer.increaseServiceRefreshCount();
    5. String registryName = Optional.ofNullable(registry.getUrl())
    6. .map(u -> u.getParameter(RegistryConstants.REGISTRY_CLUSTER_KEY,
    7. UrlUtils.isServiceDiscoveryURL(u) ? u.getParameter(REGISTRY_KEY) : u.getProtocol()))
    8. .filter(StringUtils::isNotEmpty)
    9. .orElse("unknown");
    10. MetricsEventBus.post(RegistryEvent.toRsEvent(registeredProviderUrl.getApplicationModel(), registeredProviderUrl.getServiceKey(), 1, Collections.singletonList(registryName)),
    11. () -> {
    12. // 注册服务
    13. registry.register(registeredProviderUrl);
    14. return null;
    15. });
    16. } finally {
    17. deployer.decreaseServiceRefreshCount();
    18. }
    19. }

    以ZooKeeper注册中心为例。将会调用ZookeeperRegistry的父类的方法

    org.apache.dubbo.registry.support.FailbackRegistry#register,在该方法中将调用ZookeeperRegistry的doRegister()方法,最终将调用ZookeeperClient的create()方法创建Zookeeper节点,将服务注册到Zookeeper中。具体实现如下。

    1. public void register(URL url) {
    2. if (!acceptable(url)) {
    3. logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
    4. return;
    5. }
    6. super.register(url);
    7. removeFailedRegistered(url);
    8. removeFailedUnregistered(url);
    9. try {
    10. // 注册服务
    11. // Sending a registration request to the server side
    12. doRegister(url);
    13. } catch (Exception e) {
    14. Throwable t = e;
    15. // If the startup detection is opened, the Exception is thrown directly.
    16. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
    17. && url.getParameter(Constants.CHECK_KEY, true)
    18. && (url.getPort() != 0);
    19. boolean skipFailback = t instanceof SkipFailbackWrapperException;
    20. if (check || skipFailback) {
    21. if (skipFailback) {
    22. t = t.getCause();
    23. }
    24. throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
    25. } else {
    26. logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    27. }
    28. // Record a failed registration request to a failed list, retry regularly
    29. addFailedRegistered(url);
    30. }
    31. }
    32. public void doRegister(URL url) {
    33. try {
    34. checkDestroyed();
    35. zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true), true);
    36. } catch (Throwable e) {
    37. throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    38. }
    39. }

    其中ZookeeperClient的create()方法的代码如下所示。

    1. /**
    2. * Create ZNode in Zookeeper.
    3. *
    4. * @param path path to ZNode
    5. * @param ephemeral specify create mode of ZNode creation. true - EPHEMERAL, false - PERSISTENT.
    6. * @param faultTolerant specify fault tolerance of ZNode creation.
    7. * true - ignore exception and recreate if is ephemeral, false - throw exception.
    8. */
    9. void create(String path, boolean ephemeral, boolean faultTolerant);
    10. public void create(String path, boolean ephemeral, boolean faultTolerant) {
    11. if (!ephemeral) {
    12. if (persistentExistNodePath.contains(path)) {
    13. return;
    14. }
    15. if (checkExists(path)) {
    16. persistentExistNodePath.add(path);
    17. return;
    18. }
    19. }
    20. int i = path.lastIndexOf('/');
    21. if (i > 0) {
    22. create(path.substring(0, i), false, true);
    23. }
    24. if (ephemeral) {
    25. createEphemeral(path, faultTolerant);
    26. } else {
    27. createPersistent(path, faultTolerant);
    28. persistentExistNodePath.add(path);
    29. }
    30. }

    create()方法的伪代码和打印结果如下所示。

    1. public static void main(String[] args) {
    2. // 原始url
    3. String path = "/dubbo/com.hn.TestService/providers/dubbo://192.168.0.1:20880/com.hn.TestService?anyhost=true&default.dynamic=false&dubbo=3.2";
    4. // toUrlPath() 转化后
    5. path = "/dubbo/com.hn.TestService/providers/dubbo%3A%2F%2F192.168.0.1%3A20880%2Fcom.hn.TestService%3Fanyhost%3Dtrue%26default.dynamic%3Dfalse%26dubbo%3D3.2";
    6. create(path, true, true);
    7. }
    8. public static void create(String path, boolean ephemeral, boolean faultTolerant) {
    9. int i = path.lastIndexOf('/');
    10. if (i > 0) {
    11. create(path.substring(0, i), false, true);
    12. }
    13. if (ephemeral) {
    14. createEphemeral(path, faultTolerant);
    15. } else {
    16. createPersistent(path, faultTolerant);
    17. }
    18. }
    19. private static void createEphemeral(String path, boolean faultTolerant) {
    20. System.out.println("createEphemeral:" + path);
    21. }
    22. private static void createPersistent(String path, boolean faultTolerant) {
    23. System.out.println("createPersistent:" + path);
    24. }

    打印结果如下所示。

    1. createPersistent:/dubbo
    2. createPersistent:/dubbo/com.hn.TestService
    3. createPersistent:/dubbo/com.hn.TestService/providers
    4. createEphemeral:/dubbo/com.hn.TestService/providers/dubbo%3A%2F%2F192.168.0.1%3A20880%2Fcom.hn.TestService%3Fanyhost%3Dtrue%26default.dynamic%3Dfalse%26dubbo%3D3.2

    create()方法首先执行递归部分,调用createPersistent()方法依次创建ZooKeeper节点 /dubbo、/dubbo/com.hn.TestService 和 /dubbo/com.hn.TestService/providers;

    然后执行递归之后的部分,调用createEphemeral() 方法创建 

    /dubbo/com.hn.TestService/providers/dubbo%3A%2F%2F192.168.0.1%3A20880%2Fcom.hn.TestService%3Fanyhost%3Dtrue%26default.dynamic%3Dfalse%26dubbo%3D3.2。

    2.2.4.3 ZooKeeper服务端的树状结构图

    服务注册到ZooKeeper之后,ZooKeeper服务端的树状结构图如下所示。

    3 Dubbo服务消费端启动过程

    3.1 Dubbo服务消费端启动过程概述

    • ReferenceConfig通过Protocol将远程服务转换为Invoker。在创建Invoker的同时会创建NettyClient对象,使服务消费端与服务提供端建立TCP连接。
    • 然后通过ProxyFactory将Invoker转换为客户端(即消费端)需要的接口。

    3.2 源码剖析

    3.2.1 服务消费端启动入口

    服务消费端启动入口为org.apache.dubbo.config.ReferenceConfigBase#get()方法,其中ReferenceConfigBase为ReferenceConfig的父类,通过此方法创建一个对服务提供方的远程调用代理。代码如下所示。

    1. @Transient
    2. public final T get() {
    3. return get(true);
    4. }
    5. public T get(boolean check) {
    6. if (destroyed) {
    7. throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    8. }
    9. if (ref == null) {
    10. if (getScopeModel().isLifeCycleManagedExternally()) {
    11. // prepare model for reference
    12. getScopeModel().getDeployer().prepare();
    13. } else {
    14. // ensure start module, compatible with old api usage
    15. getScopeModel().getDeployer().start();
    16. }
    17. init(check);
    18. }
    19. return ref;
    20. }

    3.2.2 将远程服务转换为Invoker

    上述get()方法将调用init()方法,然后调用createProxy() 方法来创建远程服务代理类,将远程服务转换为客户端需要的接口。

    具体而言,首先通过Protocol将远程服务转换为Invoker;然后通过ProxyFactory将Invoker转换为客户端需要的接口。具体代码如下所示。

    1. protected synchronized void init(boolean check) {
    2. // ...
    3. Map referenceParameters = appendConfig();
    4. ModuleServiceRepository repository = getScopeModel().getServiceRepository();
    5. ServiceDescriptor serviceDescriptor;
    6. if (CommonConstants.NATIVE_STUB.equals(getProxy())) {
    7. serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);
    8. repository.registerService(serviceDescriptor);
    9. } else {
    10. serviceDescriptor = repository.registerService(interfaceClass);
    11. }
    12. consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,
    13. getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);
    14. // Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.
    15. consumerModel.setConfig(this);
    16. repository.registerConsumer(consumerModel);
    17. serviceMetadata.getAttachments().putAll(referenceParameters);
    18. // 创建远程服务代理类
    19. ref = createProxy(referenceParameters);
    20. // ...
    21. }
    22. private T createProxy(Map referenceParameters) {
    23. urls.clear();
    24. meshModeHandleUrl(referenceParameters);
    25. if (StringUtils.isNotEmpty(url)) {
    26. // user specified URL, could be peer-to-peer address, or register center's address.
    27. parseUrl(referenceParameters);
    28. } else {
    29. // if protocols not in jvm checkRegistry
    30. aggregateUrlFromRegistry(referenceParameters);
    31. }
    32. // 1、通过Protocol将远程服务转换为Invoker
    33. createInvoker();
    34. if (logger.isInfoEnabled()) {
    35. logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +
    36. (ProtocolUtils.isGeneric(referenceParameters.get(GENERIC_KEY)) ?
    37. " it's GenericService reference" : " it's not GenericService reference"));
    38. }
    39. URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
    40. referenceParameters.get(INTERFACE_KEY), referenceParameters);
    41. consumerUrl = consumerUrl.setScopeModel(getScopeModel());
    42. consumerUrl = consumerUrl.setServiceModel(consumerModel);
    43. MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
    44. // 通过ProxyFactory将Invoker转换为客户端需要的接口
    45. // create service proxy
    46. return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    47. }

    在createInvoker() 方法中,因为服务注册和发现使用是register协议,因此将调用 RegistryProtocol#refer方法创建invoker。代码如下所示。

    1. private void createInvoker() {
    2. if (urls.size() == 1) {
    3. URL curUrl = urls.get(0);
    4. // urls - 消费端准备消费的远程服务
    5. // 将将远程服务转换为Invoker
    6. invoker = protocolSPI.refer(interfaceClass, curUrl);
    7. // registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.
    8. if (!UrlUtils.isRegistry(curUrl) &&
    9. !curUrl.getParameter(UNLOAD_CLUSTER_RELATED, false)) {
    10. List> invokers = new ArrayList<>();
    11. invokers.add(invoker);
    12. invoker = Cluster.getCluster(getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
    13. }
    14. } else {
    15. List> invokers = new ArrayList<>();
    16. URL registryUrl = null;
    17. for (URL url : urls) {
    18. // For multi-registry scenarios, it is not checked whether each referInvoker is available.
    19. // Because this invoker may become available later.
    20. invokers.add(protocolSPI.refer(interfaceClass, url));
    21. if (UrlUtils.isRegistry(url)) {
    22. // use last registry url
    23. registryUrl = url;
    24. }
    25. }
    26. if (registryUrl != null) {
    27. // registry url is available
    28. // for multi-subscription scenario, use 'zone-aware' policy by default
    29. String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
    30. // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
    31. // (RegistryDirectory, routing happens here) -> Invoker
    32. invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
    33. } else {
    34. // not a registry url, must be direct invoke.
    35. if (CollectionUtils.isEmpty(invokers)) {
    36. throw new IllegalArgumentException("invokers == null");
    37. }
    38. URL curUrl = invokers.get(0).getUrl();
    39. String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
    40. invoker = Cluster.getCluster(getScopeModel(), cluster).join(new StaticDirectory(curUrl, invokers), true);
    41. }
    42. }
    43. }
    44. public Invoker refer(Class type, URL url) throws RpcException {
    45. url = getRegistryUrl(url);
    46. Registry registry = getRegistry(url);
    47. if (RegistryService.class.equals(type)) {
    48. return proxyFactory.getInvoker((T) registry, type, url);
    49. }
    50. // group="a,b" or group="*"
    51. Map qs = (Map) url.getAttribute(REFER_KEY);
    52. String group = qs.get(GROUP_KEY);
    53. if (StringUtils.isNotEmpty(group)) {
    54. if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
    55. return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
    56. }
    57. }
    58. Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
    59. // 创建invoker
    60. return doRefer(cluster, registry, type, url, qs);
    61. }
    62. protected Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url, Map parameters) {
    63. // ...
    64. // 创建invoker
    65. ClusterInvoker migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
    66. return interceptInvoker(migrationInvoker, url, consumerUrl);
    67. }

    因为默认的rpc协议为dubbo协议,通过interceptInvoker()方法,最终将调用DubboProtocol的方法创建invoker。创建代码如下所示。

    1. // InterfaceCompatibleRegistryProtocol.getInvoker
    2. public ClusterInvoker getInvoker(Cluster cluster, Registry registry, Class type, URL url) {
    3. DynamicDirectory directory = new RegistryDirectory<>(type, url);
    4. return doCreateInvoker(directory, cluster, registry, type);
    5. }
    6. // RegistryProtocol#doCreateInvoker
    7. protected ClusterInvoker doCreateInvoker(DynamicDirectory directory, Cluster cluster, Registry registry, Class type) {
    8. directory.setRegistry(registry);
    9. directory.setProtocol(protocol);
    10. // all attributes of REFER_KEY
    11. Map parameters = new HashMap(directory.getConsumerUrl().getParameters());
    12. URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    13. if (directory.isShouldRegister()) {
    14. directory.setRegisteredConsumerUrl(urlToRegistry);
    15. registry.register(directory.getRegisteredConsumerUrl());
    16. }
    17. // 1、建立路由规则链
    18. directory.buildRouterChain(urlToRegistry);
    19. // 2、订阅服务提供者地址,生成invoker
    20. directory.subscribe(toSubscribeUrl(urlToRegistry));
    21. // 3、包装机器容错策略到invoker
    22. return (ClusterInvoker) cluster.join(directory);
    23. }
    24. // org.apache.dubbo.rpc.protocol.AbstractProtocol#refer
    25. public Invoker refer(Class type, URL url) throws RpcException {
    26. return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    27. }
    28. // org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer
    29. public Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException {
    30. optimizeSerialization(url);
    31. // create rpc invoker.
    32. DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
    33. invokers.add(invoker);
    34. return invoker;
    35. }

    3.2.3 创建并启动NettyClient

    在创建Invoker的同时,通过getClients() 方法创建并启动服务消费端的NettyClient对象,与服务提供端建立TCP连接。

    (1)每个服务消费端与服务提供端集群中的所有机器都建立了连接,因为服务消费端会将所有服务提供者的URL转化为Invoker。

    (2)默认情况下,针对一个服务提供者机器只创建一个NettyClient对象。即服务消费端引用同一个服务提供者机器上的多个服务时,这些服务复用同一个Netty连接。代码如下所示。

    1. private ClientsProvider getClients(URL url) {
    2. int connections = url.getParameter(CONNECTIONS_KEY, 0);
    3. // whether to share connection
    4. // if not configured, connection is shared, otherwise, one connection for one service
    5. // 默认情况下,针对一个服务提供者机器只创建一个NettyClient对象,
    6. // 即服务消费端引用同一个服务提供者机器上的多个服务时,这些服务复用同一个Netty连接
    7. if (connections == 0) {
    8. /*
    9. * The xml configuration should have a higher priority than properties.
    10. */
    11. String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))
    12. ? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS)
    13. : url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
    14. connections = Integer.parseInt(shareConnectionsStr);
    15. return getSharedClient(url, connections);
    16. }
    17. List clients = IntStream.range(0, connections)
    18. .mapToObj((i) -> initClient(url))
    19. .collect(Collectors.toList());
    20. return new ExclusiveClientsProvider(clients);
    21. }
    22. private SharedClientsProvider getSharedClient(URL url, int connectNum) {
    23. String key = url.getAddress();
    24. // connectNum must be greater than or equal to 1
    25. int expectedConnectNum = Math.max(connectNum, 1);
    26. return referenceClientMap.compute(key, (originKey, originValue) -> {
    27. if (originValue != null && originValue.increaseCount()) {
    28. return originValue;
    29. } else {
    30. return new SharedClientsProvider(this, originKey, buildReferenceCountExchangeClientList(url, expectedConnectNum));
    31. }
    32. });
    33. }
    34. private List buildReferenceCountExchangeClientList(URL url, int connectNum) {
    35. List clients = new ArrayList<>();
    36. for (int i = 0; i < connectNum; i++) {
    37. clients.add(buildReferenceCountExchangeClient(url));
    38. }
    39. return clients;
    40. }
    41. /**
    42. * Build a single client
    43. *
    44. * @param url
    45. * @return
    46. */
    47. private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    48. ExchangeClient exchangeClient = initClient(url);
    49. ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);
    50. // read configs
    51. int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
    52. client.setShutdownWaitTime(shutdownTimeout);
    53. return client;
    54. }

    其中创建NettyClient的关键方法为initClient()方法。具体如下所示。

    1. /**
    2. * Create new connection
    3. *
    4. * @param url
    5. */
    6. private ExchangeClient initClient(URL url) {
    7. /*
    8. * Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
    9. * which means params are shared among different services. Since client is shared among services this is currently not a problem.
    10. */
    11. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
    12. // BIO is not allowed since it has severe performance issue.
    13. if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
    14. throw new RpcException("Unsupported client type: " + str + "," +
    15. " supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    16. }
    17. try {
    18. ScopeModel scopeModel = url.getScopeModel();
    19. int heartbeat = UrlUtils.getHeartbeat(url);
    20. // Replace InstanceAddressURL with ServiceConfigURL.
    21. url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
    22. url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    23. // enable heartbeat by default
    24. url = url.addParameterIfAbsent(HEARTBEAT_KEY, Integer.toString(heartbeat));
    25. url = url.setScopeModel(scopeModel);
    26. // connection should be lazy
    27. return url.getParameter(LAZY_CONNECT_KEY, false)
    28. ? new LazyConnectExchangeClient(url, requestHandler)
    29. : Exchangers.connect(url, requestHandler);
    30. } catch (RemotingException e) {
    31. throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    32. }
    33. }

    (3)当消费端启动时就与服务提供端建立了TCP连接。

    由initClient()方法实现可知,默认情况下,LAZY_CONNECT_KEY为false,即此处是创建及时连接 —— Exchangers.connect(url, requestHandler)。

     Exchangers.connect() 之后的主要调用链路为:Exchangers的connect()->HeaderExchanger的connect()->Transporters的connect()->NettyTransporter的connect(),最终通过NettyTransporter的connect()创建NettyClient。具体代码如下所示。

    1. public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    2. if (url == null) {
    3. throw new IllegalArgumentException("url == null");
    4. }
    5. if (handler == null) {
    6. throw new IllegalArgumentException("handler == null");
    7. }
    8. return getExchanger(url).connect(url, handler);
    9. }
    10. public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    11. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    12. }
    13. public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    14. if (url == null) {
    15. throw new IllegalArgumentException("url == null");
    16. }
    17. ChannelHandler handler;
    18. if (handlers == null || handlers.length == 0) {
    19. handler = new ChannelHandlerAdapter();
    20. } else if (handlers.length == 1) {
    21. handler = handlers[0];
    22. } else {
    23. handler = new ChannelHandlerDispatcher(handlers);
    24. }
    25. return getTransporter(url).connect(url, handler);
    26. }
    27. public Client connect(URL url, ChannelHandler handler) throws RemotingException {
    28. return new NettyClient(url, handler);
    29. }

    (4)创建并启动服务消费端的NettyClient对象,并与服务提供端建立TCP连接

    调用父类 AbstractClient 的构造函数,创建并启动服务消费端的NettyClient对象,并与服务提供端建立TCP连接。

    1. public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    2. super(url, wrapChannelHandler(url, handler));
    3. }
    4. public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    5. super(url, handler);
    6. // set default needReconnect true when channel is not connected
    7. needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
    8. initExecutor(url);
    9. try {
    10. // 1、启动 NettyClient
    11. doOpen();
    12. } catch (Throwable t) {
    13. close();
    14. throw new RemotingException(url.toInetSocketAddress(), null,
    15. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    16. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    17. }
    18. try {
    19. // connect.
    20. // 2、发起远程连接,与服务提供者建立TCP连接
    21. connect();
    22. if (logger.isInfoEnabled()) {
    23. logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
    24. }
    25. } catch (RemotingException t) {
    26. // If lazy connect client fails to establish a connection, the client instance will still be created,
    27. // and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
    28. if (url.getParameter(LAZY_CONNECT_KEY, false)) {
    29. logger.warn(TRANSPORT_FAILED_CONNECT_PROVIDER, "", "", "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
    30. " connect to the server " + getRemoteAddress() +
    31. " (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
    32. t.getMessage(), t);
    33. return;
    34. }
    35. if (url.getParameter(Constants.CHECK_KEY, true)) {
    36. close();
    37. throw t;
    38. } else {
    39. logger.warn(TRANSPORT_FAILED_CONNECT_PROVIDER, "", "", "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    40. + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
    41. }
    42. } catch (Throwable t) {
    43. close();
    44. throw new RemotingException(url.toInetSocketAddress(), null,
    45. "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
    46. + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    47. }
    48. }
    49. /**
    50. * Init bootstrap
    51. *
    52. * @throws Throwable
    53. */
    54. @Override
    55. protected void doOpen() throws Throwable {
    56. // 创建业务Handler
    57. final NettyClientHandler nettyClientHandler = createNettyClientHandler();
    58. // 创建启动器并配置
    59. bootstrap = new Bootstrap();
    60. initBootstrap(nettyClientHandler);
    61. }
    62. protected void connect() throws RemotingException {
    63. connectLock.lock();
    64. try {
    65. // ...
    66. doConnect();
    67. // ...
    68. } catch (RemotingException e) {
    69. throw e;
    70. } catch (Throwable e) {
    71. throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
    72. + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
    73. + ", cause: " + e.getMessage(), e);
    74. } finally {
    75. connectLock.unlock();
    76. }
    77. }
    78. private void doConnect(InetSocketAddress serverAddress) throws RemotingException {
    79. long start = System.currentTimeMillis();
    80. // 发起链接
    81. ChannelFuture future = bootstrap.connect(serverAddress);
    82. // ...
    83. }

    3.2.4 将Invoker转换为客户端需要的接口(ref)

    通过ProxyFactory的getProxy()方法创建代理类,将Invoker转换为客户端需要的接口。以JavassistProxyFactory为例,当使用JavassistProxyFactory创建失败时,会使用JdkProxyFactory重试。代码如下所示。

    1. public T getProxy(Invoker invoker, Class[] interfaces) {
    2. try {
    3. return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    4. } catch (Throwable fromJavassist) {
    5. // try fall back to JDK proxy factory
    6. try {
    7. T proxy = jdkProxyFactory.getProxy(invoker, interfaces);
    8. logger.error(PROXY_FAILED, "", "", "Failed to generate proxy by Javassist failed. Fallback to use JDK proxy success. " +
    9. "Interfaces: " + Arrays.toString(interfaces), fromJavassist);
    10. return proxy;
    11. } catch (Throwable fromJdk) {
    12. logger.error(PROXY_FAILED, "", "", "Failed to generate proxy by Javassist failed. Fallback to use JDK proxy is also failed. " +
    13. "Interfaces: " + Arrays.toString(interfaces) + " Javassist Error.", fromJavassist);
    14. logger.error(PROXY_FAILED, "", "", "Failed to generate proxy by Javassist failed. Fallback to use JDK proxy is also failed. " +
    15. "Interfaces: " + Arrays.toString(interfaces) + " JDK Error.", fromJdk);
    16. throw fromJavassist;
    17. }
    18. }
    19. }
  • 相关阅读:
    nestjs使用rabbitMQ
    STC89C52定时器/中断快速上手示例讲解
    黑马mysql教程笔记(mysql8教程)基础篇——数据库相关概念、mysql安装及卸载、数据模型、SQL通用语法及分类(DDL、DML、DQL、DCL)
    操作系统实验四:文件管理(2学时)
    如何通过DBeaver 连接 TDengine?
    int的存储和解释
    【每日一题】CF1680C. Binary String | 双指针 | 简单
    当老板一定要学会花钱去赚钱
    vue3异步加载js文件
    mysql为什么使用B+树
  • 原文地址:https://blog.csdn.net/J_bean/article/details/133894304