整体流程如下:
public class DiscoveryClient implements EurekaClient {
private final ThreadPoolExecutor cacheRefreshExecutor;
DiscoveryClient(...) {
......
// 1. 初始化获取服务的线程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1,
// eureka.client.cacheRefreshExecutorThreadPoolSize,注册信息更新最大线程数,默认2
clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
......
}
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// eureka.client.registryFetchIntervalSeconds,更新间隔,默认30秒
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// eureka.client.cacheRefreshExecutorExponentialBackOffBound,更新超时最大倍数,默认10
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 3. 初始化获取服务的task
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
// 4. 开启定时任务
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
......
}
// 2. 定义获取服务的线程
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
}
public class TimedSupervisorTask extends TimerTask {
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler; // 定时调度器
this.executor = executor; // 任务执行线程池
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task; // 具体的task,即CacheRefreshThread
this.delay = new AtomicLong(timeoutMillis); // 定时时间
this.maxDelay = timeoutMillis * expBackOffBound; // 最大定时时间
......
}
}
public class TimedSupervisorTask extends TimerTask {
@Override
public void run() {
Future<?> future = null;
try {
// 1. 将CacheRefreshThread提交到线程池,并用future接收结果
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// 2. 阻塞等待结果,等待时间eureka.client.registryFetchIntervalSeconds
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 3. 设置下一次执行任务的时间
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
// 等待结果超时,下一次执行任务时间为2*currentDelay
// 最大延时为eureka.client.registryFetchIntervalSeconds * eureka.client.cacheRefreshExecutorExponentialBackOffBound
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
rejectedCounter.increment();
} catch (Throwable e) {
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
// 4. 定时下一次任务
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
public class DiscoveryClient implements EurekaClient {
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
......
// 1. 动态获取最新的RemoteRegions
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
......
// 2. 获取最新的服务实例
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
......
// 3. 获取最新的服务实例并更新到本地
getAndStoreFullRegistry();
......
applications.setAppsHashCode(applications.getReconcileHashCode());
......
return true;
}
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
// 4. 发送http请求到服务端
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
if (apps == null) {
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 5. 过滤掉状态非UP的实例并更新到localRegionApps,缓存
localRegionApps.set(this.filterAndShuffle(apps));
} else {
}
}
}
public class ApplicationsResource {
@Inject
ApplicationsResource(EurekaServerContext eurekaServer) {
this.serverConfig = eurekaServer.getServerConfig();
this.registry = eurekaServer.getRegistry();
// 缓存为AbstractInstanceRegistry.responseCache
this.responseCache = registry.getResponseCache();
}
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
// 1. 设置regions
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
// 2. 设置Version,默认V2
CurrentRequestVersion.set(Version.toEnum(version));
// 3. 设置keyType
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 4. Key的唯一性确定请参考前文(3.2.2.1节)
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
// 5. 从responseCache获取值,客户端默认getGZIP
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
}
public class ResponseCacheImpl implements ResponseCache {
public byte[] getGZIP(Key key) {
// shouldUseReadOnlyResponseCache = eureka.server.useReadOnlyResponseCache, 默认true
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
return payload.getGzipped();
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
// 默认true,不管是true或false,第一次获取都会走readWriteCacheMap.get(key)
// 第一次需要load,因此会走到generatePayload,见(3.2.2.1节),不再赘述
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
}
关于Eureka的分析到此告一段落,接下来开始学习LoadBalancer。