Dubbo服务发布端和消费端启动过程如下所示:
(1)Dubbo服务发布端启动过程
(2)Dubbo服务消费端启动过程
服务发布端启动入口为 org.apache.dubbo.config.ServiceConfigBase#export() 方法,其中ServiceConfigBase为ServiceConfig的父类。
- /**
- * export service and auto start application instance
- */
- public final void export() {
- export(RegisterTypeEnum.AUTO_REGISTER);
- }
其中项目启动时,服务被注册到注册中心的方式有以下四种:
- public enum RegisterTypeEnum {
-
- /**
- * 不注册-Never register. Cannot be registered by any command(like QoS-online).
- */
- NEVER_REGISTER,
-
- /**
- * 手动注册-Manual register. Can be registered by command(like QoS-online), but not register by default.
- */
- MANUAL_REGISTER,
-
- /**
- * 自动注册(某个服务的所有的服务提供者都启动之后才注册)
- * (INTERNAL) Auto register by deployer. Will be registered after deployer started.
- * (Delay publish when starting. Prevent service from being invoked before all services are started)
- */
- AUTO_REGISTER_BY_DEPLOYER,
-
- /**
- * 自动注册(某个服务的其中一个服务提供者启动之后就注册)
- * Auto register. Will be registered when one service is exported.
- */
- AUTO_REGISTER;
- }
export方法中调用了doExport方法(org.apache.dubbo.config.ServiceConfig#doExport),内部实现如下所示。
- protected synchronized void doExport(RegisterTypeEnum registerType) {
- // ...
-
- // 服务注册
- doExportUrls(registerType);
- exported();
- }
-
-
- private void doExportUrls(RegisterTypeEnum registerType) {
- // ...
-
- // 加载注册中心信息
- List
registryURLs = ConfigValidationUtils.loadRegistries(this, true); -
- for (ProtocolConfig protocolConfig : protocols) {
- // ...
-
- // 注册服务
- doExportUrlsFor1Protocol(protocolConfig, registryURLs, registerType);
- }
-
- providerModel.setServiceUrls(urls);
- }
-
-
- private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List
registryURLs, RegisterTypeEnum registerType) { - Map
map = buildAttributes(protocolConfig); -
- // remove null key and null value
- map.keySet().removeIf(key -> StringUtils.isEmpty(key) || StringUtils.isEmpty(map.get(key)));
- // init serviceMetadata attachments
- serviceMetadata.getAttachments().putAll(map);
-
- URL url = buildUrl(protocolConfig, map);
-
- processServiceExecutor(url);
-
- // 注册服务
- exportUrl(url, registryURLs, registerType);
-
- initServiceMethodMetrics(url);
- }
-
-
- private void exportUrl(URL url, List
registryURLs, RegisterTypeEnum registerType) { - String scope = url.getParameter(SCOPE_KEY);
- // don't export when none is configured
- if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
-
- // export to local if the config is not remote (export to remote only when config is remote)
- if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
- // 注册本地服务
- exportLocal(url);
- }
-
- // export to remote if the config is not local (export to local only when config is local)
- if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
- // ...
-
- // 注册远程服务
- url = exportRemote(url, registryURLs, registerType);
-
- // ...
- }
- }
- this.urls.add(url);
- }
ServiceConfig通过ProxyFactory将服务提供类(ref)转换成invoker,然后通过Protocol将invoker转为Exporter。主要的实现方法为doExportUrl()方法。
其中注册本地服务和注册远程服务最终均调用了doExportUrl()方法(org.apache.dubbo.config.ServiceConfig#doExportUrl)。注册本地服务时,withMetaData为false;注册远程服务时,withMetaData为true。doExportUrl方法的具体实现如下所示。
- private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {
- if (!url.getParameter(REGISTER_KEY, true)) {
- registerType = RegisterTypeEnum.MANUAL_REGISTER;
- }
- if (registerType == RegisterTypeEnum.NEVER_REGISTER ||
- registerType == RegisterTypeEnum.MANUAL_REGISTER ||
- registerType == RegisterTypeEnum.AUTO_REGISTER_BY_DEPLOYER) {
- url = url.addParameter(REGISTER_KEY, false);
- }
-
- // 1、ServiceConfig通过ProxyFactory将服务提供类(ref)转换成invoker
- Invoker> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
- if (withMetaData) {
- // 转为invoker的包装类
- // wrap the invoker and all the metadata (ServiceConfig)
- invoker = new DelegateProviderMetaDataInvoker(invoker, this);
- }
-
- // 2、通过Protocol将invoker转为Exporter
- Exporter> exporter = protocolSPI.export(invoker);
- exporters.computeIfAbsent(registerType, k -> new CopyOnWriteArrayList<>()).add(exporter);
- }
(1)在创建Exporter的同时,会启动NettyServer监听服务连接,然后将服务注册到服务注册中心。
如果为远程服务暴露,则其内部根据URL中Protocol的类型为registry,会选择RegistryProtocol。如果为本地服务暴露,则其内部根据URL中Protocol的类型为injvm,会选InjvmProtocol。
(2)以RegistryProtocol为例的export方法如下所示。其中通过doLocalExport将Invoker转为Exporter,同时启动NettyServer。通过getRegistry获取注册中心,以及通过register将当前服务注册到服务注册中心。
(3)Dubbo服务发布端的NettyServer主要负责监听客户端连接以及读写IO操作。
Dubbo默认的底层网络通信使用的是Netty,服务提供方NettyServer使用两级线程池,其中boss线程池主要用来接受客户端的链接请求,并把完成Tcp三次握手的连接分发给worker来处理。
- public
Exporter export(final Invoker originInvoker) throws RpcException { - URL registryUrl = getRegistryUrl(originInvoker);
- // url to export locally
- URL providerUrl = getProviderUrl(originInvoker);
-
- // Subscribe the override data
- final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
- final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
- Map
> overrideListeners = getProviderConfigurationListener(overrideSubscribeUrl).getOverrideListeners(); - overrideListeners.computeIfAbsent(overrideSubscribeUrl, k -> new ConcurrentHashSet<>())
- .add(overrideSubscribeListener);
-
- providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
- //export invoker
- // 1、启动NettyServer
- final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker, providerUrl); -
- // url to registry
- // 2、获取注册中心
- final Registry registry = getRegistry(registryUrl);
- final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
-
- // decide if we need to delay publish (provider itself and registry should both need to register)
- boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);
- if (register) {
- // 3、将当前服务注册到服务注册中心
- register(registry, registeredProviderUrl);
- }
-
- // register stated url on provider model
- registerStatedUrl(registryUrl, registeredProviderUrl, register);
-
-
- exporter.setRegisterUrl(registeredProviderUrl);
- exporter.setSubscribeUrl(overrideSubscribeUrl);
- exporter.setNotifyListener(overrideSubscribeListener);
- exporter.setRegistered(register);
-
- ApplicationModel applicationModel = getApplicationModel(providerUrl.getScopeModel());
- if (applicationModel.modelEnvironment().getConfiguration().convert(Boolean.class, ENABLE_26X_CONFIGURATION_LISTEN, true)) {
- if (!registry.isServiceDiscovery()) {
- // Deprecated! Subscribe to override rules in 2.6.x or before.
- registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
- }
- }
-
- notifyExport(exporter);
- //Ensure that a new exporter instance is returned every time export
- return new DestroyableExporter<>(exporter);
- }
创建和启动NettyServer的入口:在doLocalExport中最终会调用DubboProtocol中的export方法,export方法之后的主要调用链路为export()->openServer()->createServer(),具体实现如下。
- public
Exporter export(Invoker invoker) throws RpcException { - checkDestroyed();
- URL url = invoker.getUrl();
-
- // 1、创建Exporter。export service.
- String key = serviceKey(url);
- DubboExporter
exporter = new DubboExporter(invoker, key, exporterMap); -
- //export a stub service for dispatching event
- // ...
-
- // 2、创建并启动NettyServer
- openServer(url);
- optimizeSerialization(url);
-
- return exporter;
- }
-
-
- private void openServer(URL url) {
- checkDestroyed();
- // find server.
- // 服务提供者机器的ip和端口
- String key = url.getAddress();
- // client can export a service which only for server to invoke
- // 只有服务提供者才会启动NettyServer监听
- boolean isServer = url.getParameter(IS_SERVER_KEY, true);
-
- if (isServer) {
- ProtocolServer server = serverMap.get(key);
- if (server == null) {
- synchronized (this) {
- server = serverMap.get(key);
- if (server == null) {
- // 该服务对应的NettyServer还没启动过,则创建并启动NettyServer
- // 同一个机器的不同服务发布只会开启一个NettyServer
- serverMap.put(key, createServer(url));
- return;
- }
- }
- }
-
- // server supports reset, use together with override
- server.reset(url);
- }
- }
-
- private ProtocolServer createServer(URL url) {
- url = URLBuilder.from(url)
- // send readonly event when server closes, it's enabled by default
- .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
- // enable heartbeat by default
- .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
- .addParameter(CODEC_KEY, DubboCodec.NAME)
- .build();
-
- String transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
- if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
- throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);
- }
-
- ExchangeServer server;
- try {
- // 启动NettyServer
- server = Exchangers.bind(url, requestHandler);
- } catch (RemotingException e) {
- throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
- }
-
- transporter = url.getParameter(CLIENT_KEY);
- if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
- throw new RpcException("Unsupported client type: " + transporter);
- }
-
- DubboProtocolServer protocolServer = new DubboProtocolServer(server);
- loadServerProperties(protocolServer);
- return protocolServer;
- }
-
-
DubboProtocol的createServer()之后的调用链为:
createServer()->Exchangers的bind()->HeaderExchanger的bind()->Transporters的bind()->NettyTransporter的bind()。具体实现如下。
- // Exchangers的bind方法
- public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
- if (url == null) {
- throw new IllegalArgumentException("url == null");
- }
- if (handler == null) {
- throw new IllegalArgumentException("handler == null");
- }
- url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
- return getExchanger(url).bind(url, handler);
- }
-
- // HeaderExchanger的bind方法
- public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
- ExchangeServer server;
- boolean isPuServerKey = url.getParameter(IS_PU_SERVER_KEY, false);
- if(isPuServerKey) {
- server = new HeaderExchangeServer(PortUnificationExchanger.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
- }else {
- server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
- }
- return server;
- }
-
- // Transporters的bind方法
- public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
- if (url == null) {
- throw new IllegalArgumentException("url == null");
- }
- if (handlers == null || handlers.length == 0) {
- throw new IllegalArgumentException("handlers == null");
- }
- ChannelHandler handler;
- if (handlers.length == 1) {
- handler = handlers[0];
- } else {
- handler = new ChannelHandlerDispatcher(handlers);
- }
- return getTransporter(url).bind(url, handler);
- }
-
- // NettyTransporter的bind方法
- public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
- return new NettyServer(url, handler);
- }
以 org.apache.dubbo.remoting.transport.netty4.NettyTransporter 为例,可以看到,在NettyTransporter的bind方法中,新建了一个NettyServer。
在NettyServer的构造函数中,调用了其父类AbstractServer的构造函数,其中的核心方法为doOpen()-初始化和启动 netty server。具体如下。
- public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
- // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in CommonConstants.
- // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
- // 父类的构造函数
- super(url, ChannelHandlers.wrap(handler, url));
-
- // read config before destroy
- serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
- }
-
- public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
- super(url, handler);
- executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
- localAddress = getUrl().toInetSocketAddress();
-
- String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
- int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
- if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
- bindIp = ANYHOST_VALUE;
- }
- bindAddress = new InetSocketAddress(bindIp, bindPort);
- this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
- try {
- // 初始化和启动 netty server
- doOpen();
- if (logger.isInfoEnabled()) {
- logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
- }
- } catch (Throwable t) {
- throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
- + " on " + bindAddress + ", cause: " + t.getMessage(), t);
- }
- executors.add(executorRepository.createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
- }
以 org.apache.dubbo.remoting.transport.netty4.NettyServer.doOpen 为例,doOpen()具体实现如下所示。
-
- /**
- * Init and start netty server
- *
- * @throws Throwable
- */
- @Override
- protected void doOpen() throws Throwable {
- // 创建ServerBootstrap
- bootstrap = new ServerBootstrap();
-
- // 创建Netty的boss线程池和worker线程池
- bossGroup = createBossGroup();
- workerGroup = createWorkerGroup();
-
- // 配置NettyServer,添加handler到管线
- final NettyServerHandler nettyServerHandler = createNettyServerHandler();
- channels = nettyServerHandler.getChannels();
-
- initServerBootstrap(nettyServerHandler);
-
- // bind
- try {
- // 绑定本地端口,并启动监听服务
- ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
- channelFuture.syncUninterruptibly();
- channel = channelFuture.channel();
- } catch (Throwable t) {
- closeBootstrap();
- throw t;
- }
- }
由上可知,以RegistryProtocol为例,在export方法中,通过doLocalExport将Invoker转为Exporter,同时启动NettyServer,监听服务连接。然后通过getRegistry方法获取注册中心,并通过register方法将当前服务注册到服务注册中心。
通过getRegistry方法获取注册中心,最终是通过RegistryFactory子类AbstractRegistryFactory中的createRegistry方法创建注册中心。比如创建一个ZookeeperRegistry作为Zookeeper注册中心。具体实现如下所示。
- /**
- * Get an instance of registry based on the address of invoker
- *
- * @param registryUrl
- * @return
- */
- protected Registry getRegistry(final URL registryUrl) {
- RegistryFactory registryFactory = ScopeModelUtil.getExtensionLoader(RegistryFactory.class, registryUrl.getScopeModel()).getAdaptiveExtension();
- return registryFactory.getRegistry(registryUrl);
- }
-
-
- // AbstractRegistryFactory 的 getRegistry
- @Override
- public Registry getRegistry(URL url) {
- if (registryManager == null) {
- throw new IllegalStateException("Unable to fetch RegistryManager from ApplicationModel BeanFactory. " +
- "Please check if `setApplicationModel` has been override.");
- }
-
- Registry defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
- if (null != defaultNopRegistry) {
- return defaultNopRegistry;
- }
-
- url = URLBuilder.from(url)
- .setPath(RegistryService.class.getName())
- .addParameter(INTERFACE_KEY, RegistryService.class.getName())
- .removeParameter(TIMESTAMP_KEY)
- .removeAttribute(EXPORT_KEY)
- .removeAttribute(REFER_KEY)
- .build();
-
- String key = createRegistryCacheKey(url);
- Registry registry = null;
- boolean check = url.getParameter(CHECK_KEY, true) && url.getPort() != 0;
-
- // Lock the registry access process to ensure a single instance of the registry
- // 加锁,确保注册中心是单例对象
- registryManager.getRegistryLock().lock();
- try {
- // double check
- // fix https://github.com/apache/dubbo/issues/7265.
- defaultNopRegistry = registryManager.getDefaultNopRegistryIfDestroyed();
- if (null != defaultNopRegistry) {
- return defaultNopRegistry;
- }
-
- // 从本地缓存中获取注册中心对象
- registry = registryManager.getRegistry(key);
- if (registry != null) {
- return registry;
- }
-
- // create registry by spi/ioc
- // 创建服务注册中心实例
- registry = createRegistry(url);
- if (check && registry == null) {
- throw new IllegalStateException("Can not create registry " + url);
- }
-
- if (registry != null) {
- // 将注册中心对象存入本地缓存中
- registryManager.putRegistry(key, registry);
- }
- } catch (Exception e) {
- if (check) {
- throw new RuntimeException("Can not create registry " + url, e);
- } else {
- // 1-11 Failed to obtain or create registry (service) object.
- LOGGER.warn(REGISTRY_FAILED_CREATE_INSTANCE, "", "",
- "Failed to obtain or create registry ", e);
- }
- } finally {
- // Release the lock
- registryManager.getRegistryLock().unlock();
- }
-
- return registry;
- }
-
-
- /**
- * ZookeeperRegistryFactory 的 createRegistry
- * 其中 ZookeeperRegistryFactory 为 AbstractRegistryFactory 的子类
- */
- @Override
- public Registry createRegistry(URL url) {
- return new ZookeeperRegistry(url, zookeeperTransporter);
- }
-
通过register方法将当前服务注册到服务注册中心。org.apache.dubbo.registry.integration.RegistryProtocol#register 实现如下所示。
- private static void register(Registry registry, URL registeredProviderUrl) {
- ApplicationDeployer deployer = registeredProviderUrl.getOrDefaultApplicationModel().getDeployer();
- try {
- deployer.increaseServiceRefreshCount();
- String registryName = Optional.ofNullable(registry.getUrl())
- .map(u -> u.getParameter(RegistryConstants.REGISTRY_CLUSTER_KEY,
- UrlUtils.isServiceDiscoveryURL(u) ? u.getParameter(REGISTRY_KEY) : u.getProtocol()))
- .filter(StringUtils::isNotEmpty)
- .orElse("unknown");
- MetricsEventBus.post(RegistryEvent.toRsEvent(registeredProviderUrl.getApplicationModel(), registeredProviderUrl.getServiceKey(), 1, Collections.singletonList(registryName)),
- () -> {
- // 注册服务
- registry.register(registeredProviderUrl);
- return null;
- });
- } finally {
- deployer.decreaseServiceRefreshCount();
- }
- }
以ZooKeeper注册中心为例。将会调用ZookeeperRegistry的父类的方法
org.apache.dubbo.registry.support.FailbackRegistry#register,在该方法中将调用ZookeeperRegistry的doRegister()方法,最终将调用ZookeeperClient的create()方法创建Zookeeper节点,将服务注册到Zookeeper中。具体实现如下。
- public void register(URL url) {
- if (!acceptable(url)) {
- logger.info("URL " + url + " will not be registered to Registry. Registry " + this.getUrl() + " does not accept service of this protocol type.");
- return;
- }
- super.register(url);
- removeFailedRegistered(url);
- removeFailedUnregistered(url);
- try {
- // 注册服务
- // Sending a registration request to the server side
- doRegister(url);
- } catch (Exception e) {
- Throwable t = e;
-
- // If the startup detection is opened, the Exception is thrown directly.
- boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
- && url.getParameter(Constants.CHECK_KEY, true)
- && (url.getPort() != 0);
- boolean skipFailback = t instanceof SkipFailbackWrapperException;
- if (check || skipFailback) {
- if (skipFailback) {
- t = t.getCause();
- }
- throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
- } else {
- logger.error(INTERNAL_ERROR, "unknown error in registry module", "", "Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
- }
-
- // Record a failed registration request to a failed list, retry regularly
- addFailedRegistered(url);
- }
- }
-
-
- public void doRegister(URL url) {
- try {
- checkDestroyed();
- zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true), true);
- } catch (Throwable e) {
- throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
- }
- }
其中ZookeeperClient的create()方法的代码如下所示。
- /**
- * Create ZNode in Zookeeper.
- *
- * @param path path to ZNode
- * @param ephemeral specify create mode of ZNode creation. true - EPHEMERAL, false - PERSISTENT.
- * @param faultTolerant specify fault tolerance of ZNode creation.
- * true - ignore exception and recreate if is ephemeral, false - throw exception.
- */
- void create(String path, boolean ephemeral, boolean faultTolerant);
-
-
- public void create(String path, boolean ephemeral, boolean faultTolerant) {
- if (!ephemeral) {
- if (persistentExistNodePath.contains(path)) {
- return;
- }
- if (checkExists(path)) {
- persistentExistNodePath.add(path);
- return;
- }
- }
- int i = path.lastIndexOf('/');
- if (i > 0) {
- create(path.substring(0, i), false, true);
- }
- if (ephemeral) {
- createEphemeral(path, faultTolerant);
- } else {
- createPersistent(path, faultTolerant);
- persistentExistNodePath.add(path);
- }
- }
create()方法的伪代码和打印结果如下所示。
- public static void main(String[] args) {
- // 原始url
- String path = "/dubbo/com.hn.TestService/providers/dubbo://192.168.0.1:20880/com.hn.TestService?anyhost=true&default.dynamic=false&dubbo=3.2";
- // toUrlPath() 转化后
- path = "/dubbo/com.hn.TestService/providers/dubbo%3A%2F%2F192.168.0.1%3A20880%2Fcom.hn.TestService%3Fanyhost%3Dtrue%26default.dynamic%3Dfalse%26dubbo%3D3.2";
- create(path, true, true);
- }
-
- public static void create(String path, boolean ephemeral, boolean faultTolerant) {
- int i = path.lastIndexOf('/');
- if (i > 0) {
- create(path.substring(0, i), false, true);
- }
- if (ephemeral) {
- createEphemeral(path, faultTolerant);
- } else {
- createPersistent(path, faultTolerant);
- }
- }
-
- private static void createEphemeral(String path, boolean faultTolerant) {
- System.out.println("createEphemeral:" + path);
- }
-
- private static void createPersistent(String path, boolean faultTolerant) {
- System.out.println("createPersistent:" + path);
- }
打印结果如下所示。
- createPersistent:/dubbo
- createPersistent:/dubbo/com.hn.TestService
- createPersistent:/dubbo/com.hn.TestService/providers
- createEphemeral:/dubbo/com.hn.TestService/providers/dubbo%3A%2F%2F192.168.0.1%3A20880%2Fcom.hn.TestService%3Fanyhost%3Dtrue%26default.dynamic%3Dfalse%26dubbo%3D3.2
create()方法首先执行递归部分,调用createPersistent()方法依次创建ZooKeeper节点 /dubbo、/dubbo/com.hn.TestService 和 /dubbo/com.hn.TestService/providers;
然后执行递归之后的部分,调用createEphemeral() 方法创建
/dubbo/com.hn.TestService/providers/dubbo%3A%2F%2F192.168.0.1%3A20880%2Fcom.hn.TestService%3Fanyhost%3Dtrue%26default.dynamic%3Dfalse%26dubbo%3D3.2。
服务注册到ZooKeeper之后,ZooKeeper服务端的树状结构图如下所示。
服务消费端启动入口为org.apache.dubbo.config.ReferenceConfigBase#get()方法,其中ReferenceConfigBase为ReferenceConfig的父类,通过此方法创建一个对服务提供方的远程调用代理。代码如下所示。
- @Transient
- public final T get() {
- return get(true);
- }
-
-
- public T get(boolean check) {
- if (destroyed) {
- throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
- }
-
- if (ref == null) {
- if (getScopeModel().isLifeCycleManagedExternally()) {
- // prepare model for reference
- getScopeModel().getDeployer().prepare();
- } else {
- // ensure start module, compatible with old api usage
- getScopeModel().getDeployer().start();
- }
-
- init(check);
- }
-
- return ref;
- }
上述get()方法将调用init()方法,然后调用createProxy() 方法来创建远程服务代理类,将远程服务转换为客户端需要的接口。
具体而言,首先通过Protocol将远程服务转换为Invoker;然后通过ProxyFactory将Invoker转换为客户端需要的接口。具体代码如下所示。
- protected synchronized void init(boolean check) {
- // ...
-
- Map
referenceParameters = appendConfig(); -
- ModuleServiceRepository repository = getScopeModel().getServiceRepository();
- ServiceDescriptor serviceDescriptor;
- if (CommonConstants.NATIVE_STUB.equals(getProxy())) {
- serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);
- repository.registerService(serviceDescriptor);
- } else {
- serviceDescriptor = repository.registerService(interfaceClass);
- }
- consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor,
- getScopeModel(), serviceMetadata, createAsyncMethodInfo(), interfaceClassLoader);
-
- // Compatible with dependencies on ServiceModel#getReferenceConfig() , and will be removed in a future version.
- consumerModel.setConfig(this);
-
- repository.registerConsumer(consumerModel);
-
- serviceMetadata.getAttachments().putAll(referenceParameters);
-
- // 创建远程服务代理类
- ref = createProxy(referenceParameters);
-
- // ...
- }
-
-
- private T createProxy(Map
referenceParameters) { - urls.clear();
-
- meshModeHandleUrl(referenceParameters);
-
- if (StringUtils.isNotEmpty(url)) {
- // user specified URL, could be peer-to-peer address, or register center's address.
- parseUrl(referenceParameters);
- } else {
- // if protocols not in jvm checkRegistry
- aggregateUrlFromRegistry(referenceParameters);
- }
-
- // 1、通过Protocol将远程服务转换为Invoker
- createInvoker();
-
- if (logger.isInfoEnabled()) {
- logger.info("Referred dubbo service: [" + referenceParameters.get(INTERFACE_KEY) + "]." +
- (ProtocolUtils.isGeneric(referenceParameters.get(GENERIC_KEY)) ?
- " it's GenericService reference" : " it's not GenericService reference"));
- }
-
- URL consumerUrl = new ServiceConfigURL(CONSUMER_PROTOCOL, referenceParameters.get(REGISTER_IP_KEY), 0,
- referenceParameters.get(INTERFACE_KEY), referenceParameters);
- consumerUrl = consumerUrl.setScopeModel(getScopeModel());
- consumerUrl = consumerUrl.setServiceModel(consumerModel);
- MetadataUtils.publishServiceDefinition(consumerUrl, consumerModel.getServiceModel(), getApplicationModel());
-
- // 通过ProxyFactory将Invoker转换为客户端需要的接口
- // create service proxy
- return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
- }
-
在createInvoker() 方法中,因为服务注册和发现使用是register协议,因此将调用 RegistryProtocol#refer方法创建invoker。代码如下所示。
- private void createInvoker() {
- if (urls.size() == 1) {
- URL curUrl = urls.get(0);
- // urls - 消费端准备消费的远程服务
- // 将将远程服务转换为Invoker
- invoker = protocolSPI.refer(interfaceClass, curUrl);
- // registry url, mesh-enable and unloadClusterRelated is true, not need Cluster.
- if (!UrlUtils.isRegistry(curUrl) &&
- !curUrl.getParameter(UNLOAD_CLUSTER_RELATED, false)) {
- List
> invokers = new ArrayList<>(); - invokers.add(invoker);
- invoker = Cluster.getCluster(getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(curUrl, invokers), true);
- }
- } else {
- List
> invokers = new ArrayList<>(); - URL registryUrl = null;
- for (URL url : urls) {
- // For multi-registry scenarios, it is not checked whether each referInvoker is available.
- // Because this invoker may become available later.
- invokers.add(protocolSPI.refer(interfaceClass, url));
-
- if (UrlUtils.isRegistry(url)) {
- // use last registry url
- registryUrl = url;
- }
- }
-
- if (registryUrl != null) {
- // registry url is available
- // for multi-subscription scenario, use 'zone-aware' policy by default
- String cluster = registryUrl.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
- // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker
- // (RegistryDirectory, routing happens here) -> Invoker
- invoker = Cluster.getCluster(registryUrl.getScopeModel(), cluster, false).join(new StaticDirectory(registryUrl, invokers), false);
- } else {
- // not a registry url, must be direct invoke.
- if (CollectionUtils.isEmpty(invokers)) {
- throw new IllegalArgumentException("invokers == null");
- }
- URL curUrl = invokers.get(0).getUrl();
- String cluster = curUrl.getParameter(CLUSTER_KEY, Cluster.DEFAULT);
- invoker = Cluster.getCluster(getScopeModel(), cluster).join(new StaticDirectory(curUrl, invokers), true);
- }
- }
- }
-
-
- public
Invoker refer(Class type, URL url) throws RpcException { - url = getRegistryUrl(url);
- Registry registry = getRegistry(url);
- if (RegistryService.class.equals(type)) {
- return proxyFactory.getInvoker((T) registry, type, url);
- }
-
- // group="a,b" or group="*"
- Map
qs = (Map) url.getAttribute(REFER_KEY); - String group = qs.get(GROUP_KEY);
- if (StringUtils.isNotEmpty(group)) {
- if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
- return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
- }
- }
-
- Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
-
- // 创建invoker
- return doRefer(cluster, registry, type, url, qs);
- }
-
-
- protected
Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url, Map parameters) { - // ...
-
- // 创建invoker
- ClusterInvoker
migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl); - return interceptInvoker(migrationInvoker, url, consumerUrl);
- }
-
因为默认的rpc协议为dubbo协议,通过interceptInvoker()方法,最终将调用DubboProtocol的方法创建invoker。创建代码如下所示。
- // InterfaceCompatibleRegistryProtocol.getInvoker
- public
ClusterInvoker getInvoker(Cluster cluster, Registry registry, Class type, URL url) { - DynamicDirectory
directory = new RegistryDirectory<>(type, url); - return doCreateInvoker(directory, cluster, registry, type);
- }
-
- // RegistryProtocol#doCreateInvoker
- protected
ClusterInvoker doCreateInvoker(DynamicDirectory directory, Cluster cluster, Registry registry, Class type) { - directory.setRegistry(registry);
- directory.setProtocol(protocol);
- // all attributes of REFER_KEY
- Map
parameters = new HashMap(directory.getConsumerUrl().getParameters()); - URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
- if (directory.isShouldRegister()) {
- directory.setRegisteredConsumerUrl(urlToRegistry);
- registry.register(directory.getRegisteredConsumerUrl());
- }
-
- // 1、建立路由规则链
- directory.buildRouterChain(urlToRegistry);
-
- // 2、订阅服务提供者地址,生成invoker
- directory.subscribe(toSubscribeUrl(urlToRegistry));
-
- // 3、包装机器容错策略到invoker
- return (ClusterInvoker
) cluster.join(directory); - }
-
- // org.apache.dubbo.rpc.protocol.AbstractProtocol#refer
- public
Invoker refer(Class type, URL url) throws RpcException { - return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
- }
-
- // org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer
- public
Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException { - optimizeSerialization(url);
-
- // create rpc invoker.
- DubboInvoker
invoker = new DubboInvoker(serviceType, url, getClients(url), invokers); - invokers.add(invoker);
-
- return invoker;
- }
在创建Invoker的同时,通过getClients() 方法创建并启动服务消费端的NettyClient对象,与服务提供端建立TCP连接。
(1)每个服务消费端与服务提供端集群中的所有机器都建立了连接,因为服务消费端会将所有服务提供者的URL转化为Invoker。
(2)默认情况下,针对一个服务提供者机器只创建一个NettyClient对象。即服务消费端引用同一个服务提供者机器上的多个服务时,这些服务复用同一个Netty连接。代码如下所示。
- private ClientsProvider getClients(URL url) {
- int connections = url.getParameter(CONNECTIONS_KEY, 0);
- // whether to share connection
- // if not configured, connection is shared, otherwise, one connection for one service
- // 默认情况下,针对一个服务提供者机器只创建一个NettyClient对象,
- // 即服务消费端引用同一个服务提供者机器上的多个服务时,这些服务复用同一个Netty连接
- if (connections == 0) {
- /*
- * The xml configuration should have a higher priority than properties.
- */
- String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))
- ? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS)
- : url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
- connections = Integer.parseInt(shareConnectionsStr);
-
- return getSharedClient(url, connections);
- }
-
- List
clients = IntStream.range(0, connections) - .mapToObj((i) -> initClient(url))
- .collect(Collectors.toList());
- return new ExclusiveClientsProvider(clients);
- }
-
-
- private SharedClientsProvider getSharedClient(URL url, int connectNum) {
- String key = url.getAddress();
-
- // connectNum must be greater than or equal to 1
- int expectedConnectNum = Math.max(connectNum, 1);
- return referenceClientMap.compute(key, (originKey, originValue) -> {
- if (originValue != null && originValue.increaseCount()) {
- return originValue;
- } else {
- return new SharedClientsProvider(this, originKey, buildReferenceCountExchangeClientList(url, expectedConnectNum));
- }
- });
- }
-
- private List
buildReferenceCountExchangeClientList(URL url, int connectNum) { - List
clients = new ArrayList<>(); -
- for (int i = 0; i < connectNum; i++) {
- clients.add(buildReferenceCountExchangeClient(url));
- }
-
- return clients;
- }
-
-
- /**
- * Build a single client
- *
- * @param url
- * @return
- */
- private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
- ExchangeClient exchangeClient = initClient(url);
- ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);
- // read configs
- int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
- client.setShutdownWaitTime(shutdownTimeout);
- return client;
- }
其中创建NettyClient的关键方法为initClient()方法。具体如下所示。
- /**
- * Create new connection
- *
- * @param url
- */
- private ExchangeClient initClient(URL url) {
- /*
- * Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
- * which means params are shared among different services. Since client is shared among services this is currently not a problem.
- */
- String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
-
- // BIO is not allowed since it has severe performance issue.
- if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
- throw new RpcException("Unsupported client type: " + str + "," +
- " supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
- }
-
- try {
- ScopeModel scopeModel = url.getScopeModel();
- int heartbeat = UrlUtils.getHeartbeat(url);
- // Replace InstanceAddressURL with ServiceConfigURL.
- url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
- url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
- // enable heartbeat by default
- url = url.addParameterIfAbsent(HEARTBEAT_KEY, Integer.toString(heartbeat));
- url = url.setScopeModel(scopeModel);
-
- // connection should be lazy
- return url.getParameter(LAZY_CONNECT_KEY, false)
- ? new LazyConnectExchangeClient(url, requestHandler)
- : Exchangers.connect(url, requestHandler);
- } catch (RemotingException e) {
- throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
- }
- }
(3)当消费端启动时就与服务提供端建立了TCP连接。
由initClient()方法实现可知,默认情况下,LAZY_CONNECT_KEY为false,即此处是创建及时连接 —— Exchangers.connect(url, requestHandler)。
Exchangers.connect() 之后的主要调用链路为:Exchangers的connect()->HeaderExchanger的connect()->Transporters的connect()->NettyTransporter的connect(),最终通过NettyTransporter的connect()创建NettyClient。具体代码如下所示。
- public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
- if (url == null) {
- throw new IllegalArgumentException("url == null");
- }
- if (handler == null) {
- throw new IllegalArgumentException("handler == null");
- }
- return getExchanger(url).connect(url, handler);
- }
-
-
- public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
- return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
- }
-
- public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
- if (url == null) {
- throw new IllegalArgumentException("url == null");
- }
- ChannelHandler handler;
- if (handlers == null || handlers.length == 0) {
- handler = new ChannelHandlerAdapter();
- } else if (handlers.length == 1) {
- handler = handlers[0];
- } else {
- handler = new ChannelHandlerDispatcher(handlers);
- }
- return getTransporter(url).connect(url, handler);
- }
-
- public Client connect(URL url, ChannelHandler handler) throws RemotingException {
- return new NettyClient(url, handler);
- }
-
-
(4)创建并启动服务消费端的NettyClient对象,并与服务提供端建立TCP连接
调用父类 AbstractClient 的构造函数,创建并启动服务消费端的NettyClient对象,并与服务提供端建立TCP连接。
- public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
- super(url, wrapChannelHandler(url, handler));
- }
-
- public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
- super(url, handler);
- // set default needReconnect true when channel is not connected
- needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
-
- initExecutor(url);
-
- try {
- // 1、启动 NettyClient
- doOpen();
- } catch (Throwable t) {
- close();
- throw new RemotingException(url.toInetSocketAddress(), null,
- "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
- }
-
- try {
- // connect.
- // 2、发起远程连接,与服务提供者建立TCP连接
- connect();
- if (logger.isInfoEnabled()) {
- logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
- }
- } catch (RemotingException t) {
- // If lazy connect client fails to establish a connection, the client instance will still be created,
- // and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
- if (url.getParameter(LAZY_CONNECT_KEY, false)) {
- logger.warn(TRANSPORT_FAILED_CONNECT_PROVIDER, "", "", "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
- " connect to the server " + getRemoteAddress() +
- " (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
- t.getMessage(), t);
- return;
- }
-
- if (url.getParameter(Constants.CHECK_KEY, true)) {
- close();
- throw t;
- } else {
- logger.warn(TRANSPORT_FAILED_CONNECT_PROVIDER, "", "", "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
- }
- } catch (Throwable t) {
- close();
- throw new RemotingException(url.toInetSocketAddress(), null,
- "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
- }
- }
-
-
- /**
- * Init bootstrap
- *
- * @throws Throwable
- */
- @Override
- protected void doOpen() throws Throwable {
- // 创建业务Handler
- final NettyClientHandler nettyClientHandler = createNettyClientHandler();
- // 创建启动器并配置
- bootstrap = new Bootstrap();
- initBootstrap(nettyClientHandler);
- }
-
-
- protected void connect() throws RemotingException {
- connectLock.lock();
-
- try {
- // ...
- doConnect();
- // ...
- } catch (RemotingException e) {
- throw e;
-
- } catch (Throwable e) {
- throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
- + ", cause: " + e.getMessage(), e);
-
- } finally {
- connectLock.unlock();
- }
- }
-
- private void doConnect(InetSocketAddress serverAddress) throws RemotingException {
- long start = System.currentTimeMillis();
- // 发起链接
- ChannelFuture future = bootstrap.connect(serverAddress);
- // ...
- }
通过ProxyFactory的getProxy()方法创建代理类,将Invoker转换为客户端需要的接口。以JavassistProxyFactory为例,当使用JavassistProxyFactory创建失败时,会使用JdkProxyFactory重试。代码如下所示。
- public
T getProxy(Invoker invoker, Class>[] interfaces) { - try {
- return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
- } catch (Throwable fromJavassist) {
- // try fall back to JDK proxy factory
- try {
- T proxy = jdkProxyFactory.getProxy(invoker, interfaces);
- logger.error(PROXY_FAILED, "", "", "Failed to generate proxy by Javassist failed. Fallback to use JDK proxy success. " +
- "Interfaces: " + Arrays.toString(interfaces), fromJavassist);
- return proxy;
- } catch (Throwable fromJdk) {
- logger.error(PROXY_FAILED, "", "", "Failed to generate proxy by Javassist failed. Fallback to use JDK proxy is also failed. " +
- "Interfaces: " + Arrays.toString(interfaces) + " Javassist Error.", fromJavassist);
- logger.error(PROXY_FAILED, "", "", "Failed to generate proxy by Javassist failed. Fallback to use JDK proxy is also failed. " +
- "Interfaces: " + Arrays.toString(interfaces) + " JDK Error.", fromJdk);
- throw fromJavassist;
- }
- }
- }