• 对dubbo的DubboReference.check的参数进行剖析


    背景

    在使用dubbo的时候,发现当消费者启动的时候,如果提供者没有启动,即使提供者后来启动了,消费者也调不通提供者提供的接口了。

    注册中心使用都是nacos

    dubbo版本是3.0.4

    例子

    接口

    public interface DemoService {
        String sayHello();
    }
    

    提供者

    @DubboService
    public class DemoServiceImpl implements DemoService {
        @Override
        public String sayHello() {
            return "hello";
        }
    }
    
    @EnableDubbo
    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
    public class ReferenceCheckProviderStarter {
        public static void main(String[] args) {
            new SpringApplicationBuilder(ReferenceCheckProviderStarter.class)
                    .web(WebApplicationType.NONE) // .REACTIVE, .SERVLET
                    .run(args);
            System.out.println("dubbo service started");
        }
    }
    

    消费者

    @EnableDubbo
    @RestController
    @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
    public class ReferenceCheckConsumerStarter {
    
        @DubboReference
        private DemoService demoService;
    
        @GetMapping("/dubbo/nacos/test")
        public Object test() {
            return demoService.sayHello();
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ReferenceCheckConsumerStarter.class, args);
        }
    }
    

    1. 先启动provider,再启动consumer

    a. 启动provider

    nacos出现provider的服务

    b. 启动consumer

    nacos出现consumer的服务

    访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
    
    hello
    

    c. 终止provider

    nacos上provider的服务消失了

    访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
    
    No provider available from registry
    

    d. 重新启动provider

    nacos出现provider的服务

    访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
    
    hello
    

    可以看出:先启动provider,再启动consumer,整个过程是没问题。

    2. 先启动consumer,再启动provider

    a. 启动consumer


    nacos出现consumer的服务,但立即又消失了

    b. 启动provider

    nacos出现provider的服务

    访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
    
    Directory already destroyed .
    

    可以看出:当consumer先启动时,如果provider此时没有启动,consumer就再也访问不到provider的服务了。

    3. 先启动consumer,再启动provider (check=false)

    修改一下注解@DubboRefere的参数

    @DubboReference(check = false)
    private DemoService demoService;
    

    a. 启动consumer

    nacos出现consumer的服务

    访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
    No provider available from registry
    

    b. 启动provider

    nacos出现provider的服务

    访问 http://127.0.0.1:8080/dubbo/nacos/test 后返回
    hello
    

    可以看出:即使是consumer先启动,当provider启动后,consumer还是能够访问到provider的服务的。

    关于报错

    org.apache.dubbo.rpc.RpcException: No provider available from registry

    public class RegistryDirectory extends DynamicDirectory {
    @Override
        public List> doList(Invocation invocation) {
            if (forbidden) {
                // 1. No service provider 2. Service providers are disabled
                throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                        getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                        NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                        ", please check status of providers(disabled, not registered or in blacklist).");
            }
    
            // ......
        }
    }
    
    public class RegistryDirectory extends DynamicDirectory {
        String EMPTY_PROTOCOL = "empty";
    
        private void refreshInvoker(List invokerUrls) {
            Assert.notNull(invokerUrls, "invokerUrls should not be null");
    
            if (invokerUrls.size() == 1
                    && invokerUrls.get(0) != null
                    && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
                this.forbidden = true; // Forbid to access
                this.invokers = Collections.emptyList();
                routerChain.setInvokers(this.invokers);
                destroyAllInvokers(); // Close all invokers
            } else {
                this.forbidden = false; // Allow to access
                
                if (invokerUrls == Collections.emptyList()) {
                    invokerUrls = new ArrayList<>();
                }
                if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                    invokerUrls.addAll(this.cachedInvokerUrls);
                } else {
                    this.cachedInvokerUrls = new HashSet<>();
                    this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
                }
                if (invokerUrls.isEmpty()) {
                    return;
                }
                
                // can't use local reference because this.urlInvokerMap might be accessed at isAvailable() by main thread concurrently.
                Map> oldUrlInvokerMap = null;
                if (this.urlInvokerMap != null) {
                    // the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.
                    oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + this.urlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));
                    this.urlInvokerMap.forEach(oldUrlInvokerMap::put);
                }
                Map> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map
    
                /**
                 * If the calculation is wrong, it is not processed.
                 *
                 * 1. The protocol configured by the client is inconsistent with the protocol of the server.
                 *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
                 * 2. The registration center is not robust and pushes illegal specification data.
                 *
                 */
                if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                            .toString()));
                    return;
                }
    
                List> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
                // pre-route and build cache, notice that route cache should build on original Invoker list.
                // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
                routerChain.setInvokers(newInvokers);
                this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
                this.urlInvokerMap = newUrlInvokerMap;
    
                try {
                    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
                } catch (Exception e) {
                    logger.warn("destroyUnusedInvokers error. ", e);
                }
    
                // notify invokers refreshed
                this.invokersChanged();
            }
        }
    
        private synchronized void refreshOverrideAndInvoker(List urls) {
            // mock zookeeper://xxx?mock=return null
            overrideDirectoryUrl();
            refreshInvoker(urls);
        }
    
        @Override
        public synchronized void notify(List urls) {
            if (isDestroyed()) {
                return;
            }
    
            Map> categoryUrls = urls.stream()
                    .filter(Objects::nonNull)
                    .filter(this::isValidCategory)
                    .filter(this::isNotCompatibleFor26x)
                    .collect(Collectors.groupingBy(this::judgeCategory));
    
            List configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
            this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
    
            List routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
            toRouters(routerURLs).ifPresent(this::addRouters);
    
            // providers
            List providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    
            // 3.x added for extend URL address
            ExtensionLoader addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);
            List supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
            if (supportedListeners != null && !supportedListeners.isEmpty()) {
                for (AddressListener addressListener : supportedListeners) {
                    providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
                }
            }
    
            refreshOverrideAndInvoker(providerURLs); // 这里
        }
    
    }
    
    public abstract class AbstractRegistry implements Registry {
        /**
         * Notify changes from the Provider side.
         *
         * @param url      consumer side url
         * @param listener listener
         * @param urls     provider latest urls
         */
        protected void notify(URL url, NotifyListener listener, List urls) {
            if (url == null) {
                throw new IllegalArgumentException("notify url == null");
            }
            if (listener == null) {
                throw new IllegalArgumentException("notify listener == null");
            }
            if ((CollectionUtils.isEmpty(urls))
                && !ANY_VALUE.equals(url.getServiceInterface())) {
                logger.warn("Ignore empty notify urls for subscribe url " + url);
                return;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());
            }
            // keep every provider's category.
            Map> result = new HashMap<>(); // 这里
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String category = u.getCategory(DEFAULT_CATEGORY);
                    List categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); // 这里
                    categoryList.add(u); // 这里
                }
            }
            if (result.size() == 0) {
                return;
            }
            Map> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
            for (Map.Entry> entry : result.entrySet()) {
                String category = entry.getKey();
                List categoryList = entry.getValue();
                categoryNotified.put(category, categoryList);
                listener.notify(categoryList); // 这里
                // We will update our cache file after each notification.
                // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
                if (localCacheEnabled) {
                    saveProperties(url);
                }
            }
        }
    }
    
    public class NacosRegistry extends FailbackRegistry {
        private void notifySubscriber(URL url, NotifyListener listener, Collection instances) {
            List enabledInstances = new LinkedList<>(instances);
            if (enabledInstances.size() > 0) {
                //  Instances
                filterEnabledInstances(enabledInstances);
            }
            List urls = toUrlWithEmpty(url, enabledInstances);
            NacosRegistry.this.notify(url, listener, urls); // 这里
        }
    
        String EMPTY_PROTOCOL = "empty";
    
        private List toUrlWithEmpty(URL consumerURL, Collection instances) {
            List urls = buildURLs(consumerURL, instances);
            if (urls.size() == 0) { // 这里
                URL empty = URLBuilder.from(consumerURL)
                    .setProtocol(EMPTY_PROTOCOL)
                    .addParameter(CATEGORY_KEY, DEFAULT_CATEGORY)
                    .build();
                urls.add(empty);
            }
            return urls;
        }
    }
    

    当没有可用的服务时,instances是空的

    当有可用的服务时,instances是不为空的

    是怎么通知的

    public class ServiceInfoHolder implements Closeable {
        public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
            String serviceKey = serviceInfo.getKey();
            if (serviceKey == null) {
                return null;
            }
            ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
            if (isEmptyOrErrorPush(serviceInfo)) {
                //empty or error push, just ignore
                return oldService;
            }
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            boolean changed = isChangedServiceInfo(oldService, serviceInfo);
            if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
                serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
            }
            MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
            if (changed) { // 这里
                NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(serviceInfo.getHosts()));
                NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                        serviceInfo.getClusters(), serviceInfo.getHosts())); // 这里
                DiskCache.write(serviceInfo, cacheDir);
            }
            return serviceInfo;
        }
    }
    
    public class DefaultPublisher extends Thread implements EventPublisher {
        private BlockingQueue queue;
    
        @Override
        public void init(Class type, int bufferSize) {
            setDaemon(true);
            setName("nacos.publisher-" + type.getName());
            this.eventType = type;
            this.queueMaxSize = bufferSize;
            this.queue = new ArrayBlockingQueue<>(bufferSize); // 这里
            start();
        }
    
        @Override
        public boolean publish(Event event) {
            checkIsStart();
            boolean success = this.queue.offer(event); // 这里
            if (!success) {
                LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
                receiveEvent(event);
                return true;
            }
            return true;
        }
    
        @Override
        public void run() {
            openEventHandler();
        }
        
        void openEventHandler() {
            try {
                
                // This variable is defined to resolve the problem which message overstock in the queue.
                int waitTimes = 60;
                // To ensure that messages are not lost, enable EventHandler when
                // waiting for the first Subscriber to register
                for (; ; ) {
                    if (shutdown || hasSubscriber() || waitTimes <= 0) {
                        break;
                    }
                    ThreadUtils.sleep(1000L);
                    waitTimes--;
                }
                
                for (; ; ) {
                    if (shutdown) {
                        break;
                    }
                    final Event event = queue.take(); // 这里
                    receiveEvent(event);  // 这里
                    UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
                }
            } catch (Throwable ex) {
                LOGGER.error("Event listener exception : ", ex);
            }
        }
    
        void receiveEvent(Event event) {
            final long currentEventSequence = event.sequence();
            
            if (!hasSubscriber()) {
                LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.");
                return;
            }
            
            // Notification single event listener
            for (Subscriber subscriber : subscribers) {
                // Whether to ignore expiration events
                if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                    LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                            event.getClass());
                    continue;
                }
                
                // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
                // Remove original judge part of codes.
                notifySubscriber(subscriber, event); // 这里
            }
        }
    
        @Override
        public void notifySubscriber(final Subscriber subscriber, final Event event) {
            
            LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
            
            final Runnable job = () -> subscriber.onEvent(event);
            final Executor executor = subscriber.executor(); 
            
            if (executor != null) {
                executor.execute(job); // 这里
            } else {
                try {
                    job.run(); // 这里
                } catch (Throwable e) {
                    LOGGER.error("Event callback exception: ", e);
                }
            }
        }
    }
    
    public class InstancesChangeNotifier extends Subscriber {
        @Override
        public void onEvent(InstancesChangeEvent event) {
            String key = ServiceInfo
                    .getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
            ConcurrentHashSet eventListeners = listenerMap.get(key);
            if (CollectionUtils.isEmpty(eventListeners)) {
                return;
            }
            for (final EventListener listener : eventListeners) {
                final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
                if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
                    ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); // 这里
                } else {
                    listener.onEvent(namingEvent); // 这里
                }
            }
        }
    }
    
    public class NacosRegistry extends FailbackRegistry {
            @Override
            public void onEvent(Event event) {
                if (event instanceof NamingEvent) {
                    NamingEvent e = (NamingEvent) event;
                    notifier.notify(e.getInstances()); // 这里
                }
            }
    }
    
    public abstract class RegistryNotifier {
        public synchronized void notify(Object rawAddresses) {
            this.rawAddresses = rawAddresses;
            long notifyTime = System.currentTimeMillis();
            this.lastEventTime = notifyTime;
    
            long delta = (System.currentTimeMillis() - lastExecuteTime) - delayTime;
    
            // more than 10 calls && next execute time is in the future
            boolean delay = shouldDelay.get() && delta < 0;
            if (delay) {
                scheduler.schedule(new NotificationTask(this, notifyTime), -delta, TimeUnit.MILLISECONDS); // 这里
            } else {
                // check if more than 10 calls
                if (!shouldDelay.get() && executeTime.incrementAndGet() > DEFAULT_DELAY_EXECUTE_TIMES) {
                    shouldDelay.set(true);
                }
                scheduler.submit(new NotificationTask(this, notifyTime)); // 这里
            }
        }
    
        public static class NotificationTask implements Runnable {
            private final RegistryNotifier listener;
            private final long time;
    
            public NotificationTask(RegistryNotifier listener, long time) {
                this.listener = listener;
                this.time = time;
            }
    
            @Override
            public void run() {
                try {
                    if (this.time == listener.lastEventTime) {
                        listener.doNotify(listener.rawAddresses); // 这里
                        listener.lastExecuteTime = System.currentTimeMillis();
                        synchronized (listener) {
                            if (this.time == listener.lastEventTime) {
                                listener.rawAddresses = null;
                            }
                        }
                    }
                } catch (Throwable t) {
                    logger.error("Error occurred when notify directory. ", t);
                }
            }
        }}
    }
    
    public class NacosRegistry extends FailbackRegistry {
    
        private class RegistryChildListenerImpl implements EventListener {
            private RegistryNotifier notifier;
    
            public RegistryChildListenerImpl(String serviceName, URL consumerUrl, NotifyListener listener) {
                notifier = new RegistryNotifier(getUrl(), NacosRegistry.this.getDelay()) {
                    @Override
                    protected void doNotify(Object rawAddresses) {
                        List instances = (List) rawAddresses;
                        if (isServiceNamesWithCompatibleMode(consumerUrl)) {
                            /**
                             * Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
                             * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                             */
                            NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                            instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
                        }
                        NacosRegistry.this.notifySubscriber(consumerUrl, listener, instances); // 这里
                    }
                };
            }
    }
    

    然后就调用了上面的👆🏻

    什么时候添加监听器的?

    public class NacosRegistry extends FailbackRegistry {
    
        private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
            throws NacosException {
            EventListener eventListener = new RegistryChildListenerImpl(serviceName, url, listener);  // 这里
            namingService.subscribe(serviceName,
                getUrl().getGroup(Constants.DEFAULT_GROUP),
                eventListener); // 这里
        }
    
        private void doSubscribe(final URL url, final NotifyListener listener, final Set serviceNames) {
            try {
                if (isServiceNamesWithCompatibleMode(url)) {
                    List allCorrespondingInstanceList = Lists.newArrayList();
    
                    /**
                     * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
                     * in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
                     *
                     * namingService.getAllInstances with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
                     * default {@link DEFAULT_GROUP}
                     *
                     * in https://github.com/apache/dubbo/issues/5978
                     */
                    for (String serviceName : serviceNames) {
                        List instances = namingService.getAllInstances(serviceName,
                            getUrl().getGroup(Constants.DEFAULT_GROUP));
                        NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
                        allCorrespondingInstanceList.addAll(instances);
                    }
                    notifySubscriber(url, listener, allCorrespondingInstanceList); 
                    for (String serviceName : serviceNames) {
                        subscribeEventListener(serviceName, url, listener); // 这里
                    }
                } else {
                    for (String serviceName : serviceNames) {
                        List instances = new LinkedList<>();
                        instances.addAll(namingService.getAllInstances(serviceName
                            , getUrl().getGroup(Constants.DEFAULT_GROUP)));
                        String serviceInterface = serviceName;
                        String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1);
                        if (segments.length == 4) {
                            serviceInterface = segments[SERVICE_INTERFACE_INDEX];
                        }
                        URL subscriberURL = url.setPath(serviceInterface).addParameters(INTERFACE_KEY, serviceInterface,
                            CHECK_KEY, String.valueOf(false));
                        notifySubscriber(subscriberURL, listener, instances);
                        subscribeEventListener(serviceName, subscriberURL, listener);
                    }
                }
            } catch (Throwable cause) {
                throw new RpcException("Failed to subscribe " + url + " to nacos " + getUrl() + ", cause: " + cause.getMessage(), cause);
            }
        }
    }
    

    org.apache.dubbo.rpc.RpcException: Directory already destroyed

    public abstract class AbstractDirectory implements Directory {
        @Override
        public List> list(Invocation invocation) throws RpcException {
            if (destroyed) {
                throw new RpcException("Directory already destroyed .url: " + getUrl());
            }
    
            return doList(invocation);
        }
    
        @Override
        public void destroy() {
            destroyed = true; // 这里
        }
    }
    
    public class ReferenceConfig extends ReferenceConfigBase {
    
        private void checkInvokerAvailable() throws IllegalStateException {
            if (shouldCheck() && !invoker.isAvailable()) {
                invoker.destroy(); // 这里
                throw new IllegalStateException("Should has at least one way to know which services this interface belongs to," +
                    " subscription url: " + invoker.getUrl());
            }
        }
    
        protected synchronized void init() {
            // ......
    
            checkInvokerAvailable(); // 这里
        }
    
    }
    
    public abstract class ReferenceConfigBase extends AbstractReferenceConfig {
        public boolean shouldCheck() {
            checkDefault();
            Boolean shouldCheck = isCheck(); // 这里
            if (shouldCheck == null && getConsumer() != null) {
                shouldCheck = getConsumer().isCheck(); 
            }
            if (shouldCheck == null) {
                // default true // 这里
                shouldCheck = true;
            }
            return shouldCheck;
        }
    }
    
    public class RegistryDirectory extends DynamicDirectory {
        @Override
        public boolean isAvailable() {
            if (isDestroyed() || this.forbidden) { // 这里
                return false;
            }
            Map> localUrlInvokerMap = urlInvokerMap; // 这里
            return CollectionUtils.isNotEmptyMap(localUrlInvokerMap)
                    && localUrlInvokerMap.values().stream().anyMatch(Invoker::isAvailable);
        }
    }
    

    如果没有设置check字段,那么就会在启动的时候检查提供方是否可用,如果不可用,就销毁了。

  • 相关阅读:
    C++数据存储、表示形式和基本运算
    SQL模板-用户留存率计算
    深入浅出计算机组成原理12-理解电路:从电报机到门电路,我们如何做到“千里传信”?
    动态规划-状态机(188. 买卖股票的最佳时机 IV)
    A114-经典赛题-Web应用程序文件包含安全攻防
    MySQL数据库远程访问权限设置
    0元真的能做游戏代理吗?
    pyqt6 vscode
    数据分析中的数学:从基础到应用20240617
    【脑机接口】基于脑机接口和经皮脊髓电刺激的下肢康复新方法
  • 原文地址:https://www.cnblogs.com/eaglelihh/p/17339915.html