• 【Eureka】【源码+图解】【八】Eureka客户端的服务获取


    【Eureka】【源码+图解】【七】Eureka的下线功能

    7. 获取服务

    整体流程如下:
    在这里插入图片描述

    7.1 初始化HeartBeat的task

    public class DiscoveryClient implements EurekaClient {
        private final ThreadPoolExecutor cacheRefreshExecutor;
        
        DiscoveryClient(...) {
            ......
            // 1. 初始化获取服务的线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, 
                // eureka.client.cacheRefreshExecutorThreadPoolSize,注册信息更新最大线程数,默认2
                clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );
            ......
        }
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // eureka.client.registryFetchIntervalSeconds,更新间隔,默认30秒
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                // eureka.client.cacheRefreshExecutorExponentialBackOffBound,更新超时最大倍数,默认10
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                // 3. 初始化获取服务的task
                cacheRefreshTask = new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                );
                // 4. 开启定时任务
                scheduler.schedule(
                        cacheRefreshTask,
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }
            ......
        }
        
        // 2. 定义获取服务的线程
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    7.2 将task进一步包装成定时timerTask

    public class TimedSupervisorTask extends TimerTask {
        public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                                   int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
            this.name = name;
            this.scheduler = scheduler; // 定时调度器
            this.executor = executor; // 任务执行线程池
            this.timeoutMillis = timeUnit.toMillis(timeout);
            this.task = task; // 具体的task,即CacheRefreshThread
            this.delay = new AtomicLong(timeoutMillis); // 定时时间
            this.maxDelay = timeoutMillis * expBackOffBound; // 最大定时时间
            ......
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    7.3 定时时间到,执行timeTask

    public class TimedSupervisorTask extends TimerTask {
        @Override
        public void run() {
            Future<?> future = null;
            try {
                // 1. 将CacheRefreshThread提交到线程池,并用future接收结果
                future = executor.submit(task);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                // 2. 阻塞等待结果,等待时间eureka.client.registryFetchIntervalSeconds
                future.get(timeoutMillis, TimeUnit.MILLISECONDS);
                // 3. 设置下一次执行任务的时间
                delay.set(timeoutMillis);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                successCounter.increment();
            } catch (TimeoutException e) {
                timeoutCounter.increment();
                long currentDelay = delay.get();
                long newDelay = Math.min(maxDelay, currentDelay * 2);
                // 等待结果超时,下一次执行任务时间为2*currentDelay
                // 最大延时为eureka.client.registryFetchIntervalSeconds * eureka.client.cacheRefreshExecutorExponentialBackOffBound
                delay.compareAndSet(currentDelay, newDelay);
            } catch (RejectedExecutionException e) {
                rejectedCounter.increment();
            } catch (Throwable e) {
                throwableCounter.increment();
            } finally {
                if (future != null) {
                    future.cancel(true);
                }
                if (!scheduler.isShutdown()) {
                    // 4. 定时下一次任务
                    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    7.4 task获得线程资源,执行refreshRegistry()

    public class DiscoveryClient implements EurekaClient {
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }
        @VisibleForTesting
        void refreshRegistry() {
            try {
                ......
                // 1. 动态获取最新的RemoteRegions
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                ......
                // 2. 获取最新的服务实例
                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }
                
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }
        }
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            ......
                // 3. 获取最新的服务实例并更新到本地
                    getAndStoreFullRegistry();
                ......
                applications.setAppsHashCode(applications.getReconcileHashCode());
            ......
            return true;
        }
        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
            Applications apps = null;
            // 4. 发送http请求到服务端
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
    
            if (apps == null) {
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                // 5. 过滤掉状态非UP的实例并更新到localRegionApps,缓存
                localRegionApps.set(this.filterAndShuffle(apps));
            } else {
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    7.5 服务端接受请求

    public class ApplicationsResource {
        @Inject
        ApplicationsResource(EurekaServerContext eurekaServer) {
            this.serverConfig = eurekaServer.getServerConfig();
            this.registry = eurekaServer.getRegistry();
            // 缓存为AbstractInstanceRegistry.responseCache
            this.responseCache = registry.getResponseCache();
        }
        @GET
        public Response getContainers(@PathParam("version") String version,
                                      @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                      @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                      @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                      @Context UriInfo uriInfo,
                                      @Nullable @QueryParam("regions") String regionsStr) {
            
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL.increment();
            } else {
                // 1. 设置regions
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
            }
            if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
                return Response.status(Status.FORBIDDEN).build();
            }
            // 2. 设置Version,默认V2
            CurrentRequestVersion.set(Version.toEnum(version));
            // 3. 设置keyType
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
            // 4. Key的唯一性确定请参考前文(3.2.2.1节)
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            Response response;
            // 5. 从responseCache获取值,客户端默认getGZIP
            if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                response = Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                response = Response.ok(responseCache.get(cacheKey))
                        .build();
            }
            CurrentRequestVersion.remove();
            return response;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    7.6 获取Applications

    public class ResponseCacheImpl implements ResponseCache {
        public byte[] getGZIP(Key key) {
            // shouldUseReadOnlyResponseCache = eureka.server.useReadOnlyResponseCache, 默认true
            Value payload = getValue(key, shouldUseReadOnlyResponseCache);
            if (payload == null) {
                return null;
            }
            return payload.getGzipped();
        }
        Value getValue(final Key key, boolean useReadOnlyCache) {
            Value payload = null;
            try {
                // 默认true,不管是true或false,第一次获取都会走readWriteCacheMap.get(key)
                // 第一次需要load,因此会走到generatePayload,见(3.2.2.1节),不再赘述
                if (useReadOnlyCache) {
                    final Value currentPayload = readOnlyCacheMap.get(key);
                    if (currentPayload != null) {
                        payload = currentPayload;
                    } else {
                        payload = readWriteCacheMap.get(key);
                        readOnlyCacheMap.put(key, payload);
                    }
                } else {
                    payload = readWriteCacheMap.get(key);
                }
            } catch (Throwable t) {
                logger.error("Cannot get value for key : {}", key, t);
            }
            return payload;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    关于Eureka的分析到此告一段落,接下来开始学习LoadBalancer

  • 相关阅读:
    【Gzip】详细介绍
    数据库平滑扩容方案剖析
    Terraform 系列-Terraform 简介
    android动画的学习与总结
    C++模拟OpenGL库——图片处理及纹理系统(三):图片缩放操作:简单插值&二次线性插值
    【水果派不吃灰】半小时搭建Raspberry Pi树莓派可运行环境(不需要显示器,不需要网线)
    【深度学习实验】前馈神经网络(一):使用PyTorch构建神经网络的基本步骤
    木聚糖-聚乙二醇-苯硼酸,PBA-PEG-Xylan,苯硼酸-PEG-木聚糖
    Spring(七)注解开发管理第三方Bean
    View 自定义 - 坐标系、位置获取
  • 原文地址:https://blog.csdn.net/Lanna_w/article/details/128153994