1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程

FROM 《深度剖析服务发现组件Netflix Eureka》

Eureka-Client 获取注册信息,分成全量获取增量获取。默认配置下,Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,而后每 30增量获取刷新本地缓存( 非“正常”情况下会是全量获取 )。


2. Eureka-Client 发起全量获取


2.1 初始化全量获取

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,首先代码如下:

// DiscoveryClient.java
* Applications 在本地的缓存
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {

// ... 省略无关代码

// 【3.2.5】初始化应用集合在本地的缓存
localRegionApps.set(new Applications());

// ... 省略无关代码

// 【3.2.12】从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {

// ... 省略无关代码

  • com.netflix.discovery.shared.Applications,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:

  • 配置 eureka.shouldFetchRegistry = true,开启从 Eureka-Server 获取注册信息。默认值:true

  • 调用 #fetchRegistry(false) 方法,从 Eureka-Server 全量获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

2.2 定时获取

Eureka-Client 在初始化过程中,创建获取注册信息线程,固定间隔向 Eureka-Server 发起获取注册信息( fetch ),刷新本地注册信息缓存。实现代码如下:

// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
// ... 省略无关代码

// 【3.2.9】初始化线程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()

cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
); // use direct handoff

// ... 省略无关代码

// 【3.2.14】初始化定时任务

// ... 省略无关代码

private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
new TimedSupervisorTask(
new CacheRefreshThread()
registryFetchIntervalSeconds, TimeUnit.SECONDS);
// ... 省略无关代码

2.3 刷新注册信息缓存

调用 #refreshRegistry(false) 方法,刷新注册信息缓存,实现代码如下:

// DiscoveryClient.java
1: void refreshRegistry() {
2: try {
3: // TODO 芋艿:TODO[0009]:RemoteRegionRegistry
4: boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
6: boolean remoteRegionsModified = false;
7: // This makes sure that a dynamic change to remote regions to fetch is honored.
8: String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
9: if (null != latestRemoteRegions) {
10: String currentRemoteRegions = remoteRegionsToFetch.get();
11: if (!latestRemoteRegions.equals(currentRemoteRegions)) {
12: // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
13: synchronized (instanceRegionChecker.getAzToRegionMapper()) {
14: if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
15: String[] remoteRegions = latestRemoteRegions.split(",");
16: remoteRegionsRef.set(remoteRegions);
17: instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
18: remoteRegionsModified = true;
19: } else {
20: logger.info("Remote regions to fetch modified concurrently," +
21: " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
22: }
23: }
24: } else {
25: // Just refresh mapping to reflect any DNS/Property change
26: instanceRegionChecker.getAzToRegionMapper().refreshMapping();
27: }
28: }
30: boolean success = fetchRegistry(remoteRegionsModified);
31: if (success) {
32: // 设置 注册信息的应用实例数
33: registrySize = localRegionApps.get().size();
34: // 设置 最后获取注册信息时间
35: lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
36: }
38: // 打印日志
39: if (logger.isDebugEnabled()) {
40: StringBuilder allAppsHashCodes = new StringBuilder();
41: allAppsHashCodes.append("Local region apps hashcode: ");
42: allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
43: allAppsHashCodes.append(", is fetching remote regions? ");
44: allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
45: for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
46: allAppsHashCodes.append(", Remote region: ");
47: allAppsHashCodes.append(entry.getKey());
48: allAppsHashCodes.append(" , apps hashcode: ");
49: allAppsHashCodes.append(entry.getValue().getAppsHashCode());
50: }
51: logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
52: allAppsHashCodes.toString());
53: }
54: } catch (Throwable e) {
55: logger.error("Cannot fetch registry from server", e);
56: }
57: }

  • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry

  • 第 30 行 :调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息,在 「2.4 发起获取注册信息」 详细解析。

  • 第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下:

    * 注册信息的应用实例数
    private volatile int registrySize = 0;
    * 最后成功从 Eureka-Server 拉取注册信息时间戳
    private volatile long lastSuccessfulRegistryFetchTimestamp = -1;

  • 第 38 至 53 行 :打印调试日志。

  • 第 54 至 56 行 :打印异常日志。

2.4 发起获取注册信息

调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是全量,也可能是增量 ),实现代码如下:

 1: private boolean fetchRegistry(boolean forceFullRegistryFetch) {
2: Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
4: try {
5: // 获取 本地缓存的注册的应用实例集合
6: // If the delta is disabled or if it is the first time, get all
7: // applications
8: Applications applications = getApplications();
10: // 全量获取
11: if (clientConfig.shouldDisableDelta() // 禁用增量获取
12: || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
13: || forceFullRegistryFetch
14: || (applications == null) // 空
15: || (applications.getRegisteredApplications().size() == 0) // 空
16: || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
17: {
18: logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
19: logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
20: logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
21: logger.info("Application is null : {}", (applications == null));
22: logger.info("Registered Applications size is zero : {}",
23: (applications.getRegisteredApplications().size() == 0));
24: logger.info("Application version is -1: {}", (applications.getVersion() == -1));
25: // 执行 全量获取
26: getAndStoreFullRegistry();
27: } else {
28: // 执行 增量获取
29: getAndUpdateDelta(applications);
30: }
31: // 设置 应用集合 hashcode
32: applications.setAppsHashCode(applications.getReconcileHashCode());
33: // 打印 本地缓存的注册的应用实例数量
34: logTotalInstances();
35: } catch (Throwable e) {
36: logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
37: return false;
38: } finally {
39: if (tracer != null) {
40: tracer.stop();
41: }
42: }
44: // Notify about cache refresh before updating the instance remote status
45: onCacheRefreshed();
47: // Update remote status based on refreshed data held in the cache
48: updateInstanceRemoteStatus();
50: // registry was fetched successfully, so return true
51: return true;
52: }

  • 第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下:

    public Applications getApplications() {
    return localRegionApps.get();

  • 第 10 至 26 行 :全量获取注册信息。

    • 第 11 行 :配置 eureka.disableDelta = true ,禁用增量获取注册信息。默认值:false
    • 第 12 行 :只获得一个 vipAddress 对应的应用实例们的注册信息。
    • 第 13 行 :方法参数 forceFullRegistryFetch 强制全量获取注册信息。
    • 第 14 至 15 行 :本地缓存为空。
    • 第 25 至 26 行 :调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下文详细解析。
  • 第 27 至 30 行 :增量获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

  • 第 31 至 32 行 :计算应用集合 hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

  • 第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:

    private void logTotalInstances() {
    if (logger.isDebugEnabled()) {
    int totInstances = 0;
    for (Application application : getApplications().getRegisteredApplications()) {
    totInstances += application.getInstancesAsIsFromEureka().size();
    logger.debug("The total number of all instances in the client now is {}", totInstances);

  • 第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。

    • #onCacheRefreshed() 方法,实现代码如下:

      * Eureka 事件监听器
      private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();

      protected void onCacheRefreshed() {
      fireEvent(new CacheRefreshedEvent());

      protected void fireEvent(final EurekaEvent event) {
      for (EurekaEventListener listener : eventListeners) {

      • x
    • 笔者的YY :你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到持久化最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:

      // 【3.2.12】从 Eureka-Server 拉取注册信息
      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {

  • 第47 至 48 行 :更新本地缓存的当前应用实例在 Eureka-Server 的状态。

     1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 
    3: private synchronized void updateInstanceRemoteStatus() {
    4: // Determine this instance's status for this app and set to UNKNOWN if not found
    5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
    6: if (instanceInfo.getAppName() != null) {
    7: Application app = getApplication(instanceInfo.getAppName());
    8: if (app != null) {
    9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
    10: if (remoteInstanceInfo != null) {
    11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
    12: }
    13: }
    14: }
    15: if (currentRemoteInstanceStatus == null) {
    16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
    17: }
    19: // Notify if status changed
    20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
    21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
    22: lastRemoteInstanceStatus = currentRemoteInstanceStatus;
    23: }
    24: }

    • 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。

    • 第 19 至 23 行 :对比本地缓存最新的的当前应用实例在 Eureka-Server 的状态,若不同,更新本地缓存( 注意,只更新该缓存变量,不更新本地当前应用实例的状态( instanceInfo.status ) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。#onRemoteStatusChanged(...) 实现代码如下:

      protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
      fireEvent(new StatusChangeEvent(oldStatus, newStatus));

2.4.1 全量获取注册信息,并设置到本地缓存

调用 #getAndStoreFullRegistry() 方法,全量获取注册信息,并设置到本地缓存。下实现代码如下:

 1: private void getAndStoreFullRegistry() throws Throwable {
2: long currentUpdateGeneration = fetchRegistryGeneration.get();
4: logger.info("Getting all instance registry info from the eureka server");
6: // 全量获取注册信息
7: Applications apps = null;
8: EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
9: ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
10: : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
11: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
12: apps = httpResponse.getEntity();
13: }
14: logger.info("The response status is {}", httpResponse.getStatusCode());
16: // 设置到本地缓存
17: if (apps == null) {
18: logger.error("The application is null for some reason. Not storing this information");
19: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
20: localRegionApps.set(this.filterAndShuffle(apps));
21: logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
22: } else {
23: logger.warn("Not updating applications as another thread is updating it already");
24: }
25: }

  • 第 6 至 14 行 :全量获取注册信息,实现代码如下:

    // AbstractJerseyEurekaHttpClient.java
    public EurekaHttpResponse<Applications> getApplications(String... regions) {
    return getApplicationsInternal("apps/", regions);

    private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
    WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
    if (regions != null && regions.length > 0) {
    regionsParamValue = StringUtil.join(regions);
    webResource = webResource.queryParam("regions", regionsParamValue);
    Builder requestBuilder = webResource.getRequestBuilder();
    response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON

    Applications applications = null;
    if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
    applications = response.getEntity(Applications.class);
    return anEurekaHttpResponse(response.getStatus(), Applications.class)
    } finally {
    if (logger.isDebugEnabled()) {
    logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
    serviceUrl, urlPath,
    regionsParamValue == null ? "" : "regions=" + regionsParamValue,
    response == null ? "N/A" : response.getStatus()
    if (response != null) {

    • 调用 AbstractJerseyEurekaHttpClient#getApplications(...) 方法,GET 请求 Eureka-Server 的 apps/ 接口,参数为 regions ,返回格式为 JSON ,实现全量获取注册信息
  • 第 16 至 24 行 :设置到本地注册信息缓存

    • 第 19 行 :TODO[0025] :并发更新的情况???
    • 第 20 行 :调用 #filterAndShuffle(...) 方法,根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 :true ) 过滤只保留状态为开启( UP )的应用实例,并随机打乱应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击链接查看方法实现。

3. Eureka-Server 接收全量获取

3.1 接收全量获取请求

com.netflix.eureka.resources.ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。

接收全量获取请求,映射 ApplicationsResource#getContainers() 方法,实现代码如下:

 1: @GET
2: public Response getContainers(@PathParam("version") String version,
3: @HeaderParam(HEADER_ACCEPT) String acceptHeader,
4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
6: @Context UriInfo uriInfo,
7: @Nullable @QueryParam("regions") String regionsStr) {
8: // TODO[0009]:RemoteRegionRegistry
9: boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
10: String[] regions = null;
11: if (!isRemoteRegionRequested) {
12: EurekaMonitors.GET_ALL.increment();
13: } else {
14: regions = regionsStr.toLowerCase().split(",");
15: Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
16: EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
17: }
19: // 判断是否可以访问
20: // Check if the server allows the access to the registry. The server can
21: // restrict access if it is not
22: // ready to serve traffic depending on various reasons.
23: if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
24: return Response.status(Status.FORBIDDEN).build();
25: }
27: // API 版本
28: CurrentRequestVersion.set(Version.toEnum(version));
30: // 返回数据格式
31: KeyType keyType = Key.KeyType.JSON;
32: String returnMediaType = MediaType.APPLICATION_JSON;
33: if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
34: keyType = Key.KeyType.XML;
35: returnMediaType = MediaType.APPLICATION_XML;
36: }
38: // 响应缓存键( KEY )
39: Key cacheKey = new Key(Key.EntityType.Application,
40: ResponseCacheImpl.ALL_APPS,
41: keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
42: );
44: //
45: Response response;
46: if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
47: response = Response.ok(responseCache.getGZIP(cacheKey))
49: .header(HEADER_CONTENT_TYPE, returnMediaType)
50: .build();
51: } else {
52: response = Response.ok(responseCache.get(cacheKey))
53: .build();
54: }
55: return response;
56: }

  • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry

  • 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。

  • 第 27 至 28 行 :设置 API 版本号。默认最新 API 版本为 V2。实现代码如下:

    public enum Version {
    V1, V2;

    public static Version toEnum(String v) {
    for (Version version : Version.values()) {
    if (version.name().equalsIgnoreCase(v)) {
    return version;
    //Defaults to v2
    return V2;

  • 第 30 至 36 行 :设置返回数据格式,默认 JSON 。

  • 第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在 「3.2.1 缓存键」详细解析。

  • 第 44 至 55 行 :从响应缓存读取全量注册信息,在 「3.3 缓存读取」详细解析。

3.2 响应缓存 ResponseCache


public interface ResponseCache {

String get(Key key);

byte[] getGZIP(Key key);

void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);

AtomicLong getVersionDelta();

AtomicLong getVersionDeltaWithRegions();


  • 其中,#getVersionDelta()#getVersionDeltaWithRegions() 已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码:

    // Applications.java
    public void setVersion(Long version) {
    this.versionDelta = version;

    // AbstractInstanceRegistry.java
    public Applications getApplicationDeltas() {
    // ... 省略其它无关代码
    apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方
    // ... 省略其它无关代码

  • #get() :获得缓存。

  • #getGZIP() :获得缓存,并 GZIP 。

  • #invalidate() :过期缓存。

3.2.1 缓存键


public class Key {

public enum KeyType {

* An enum to define the entity that is stored in this cache for this key.
public enum EntityType {
Application, VIP, SVIP

* 实体名
private final String entityName;
* TODO[0009]:RemoteRegionRegistry
private final String[] regions;
* 请求参数类型
private final KeyType requestType;
* 请求 API 版本号
private final Version requestVersion;
* hashKey
private final String hashKey;
* 实体类型
* {@link EntityType}
private final EntityType entityType;
* {@link EurekaAccept}
private final EurekaAccept eurekaAccept;

public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();

public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
+ requestType.name() + requestVersion.name() + this.eurekaAccept.name();

public int hashCode() {
String hashKey = getHashKey();
return hashKey.hashCode();

public boolean equals(Object other) {
if (other instanceof Key) {
return getHashKey().equals(((Key) other).getHashKey());
} else {
return false;


3.2.2 响应缓存实现类


在 ResponseCacheImpl 里,将缓存拆分成两层 :

  • 只读缓存( readOnlyCacheMap )
  • 固定过期 + 固定大小读写缓存( readWriteCacheMap )



  • 应用实例注册、下线、过期时,只只只过期 readWriteCacheMap
  • readWriteCacheMap 写入一段时间( 可配置 )后自动过期。
  • 定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

注意:应用实例注册、下线、过期时,不会很快刷新到 readWriteCacheMap 缓存里。默认配置下,最大延迟在 30 秒。


CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。


3.3 缓存读取

调用 ResponseCacheImpl#get(...) 方法( #getGzip(...) 类似 ),读取缓存,实现代码如下:

 1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
3: private final LoadingCache<Key, Value> readWriteCacheMap;
5: public String get(final Key key) {
6: return get(key, shouldUseReadOnlyResponseCache);
7: }
9: String get(final Key key, boolean useReadOnlyCache) {
10: Value payload = getValue(key, useReadOnlyCache);
11: if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
12: return null;
13: } else {
14: return payload.getPayload();
15: }
16: }
18: Value getValue(final Key key, boolean useReadOnlyCache) {
19: Value payload = null;
20: try {
21: if (useReadOnlyCache) {
22: final Value currentPayload = readOnlyCacheMap.get(key);
23: if (currentPayload != null) {
24: payload = currentPayload;
25: } else {
26: payload = readWriteCacheMap.get(key);
27: readOnlyCacheMap.put(key, payload);
28: }
29: } else {
30: payload = readWriteCacheMap.get(key);
31: }
32: } catch (Throwable t) {
33: logger.error("Cannot get value for key :" + key, t);
34: }
35: return payload;
36: }

  • 第 5 至 7 行 :调用 #get(key, useReadOnlyCache) 方法,读取缓存。其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 :true ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 readOnlyCacheMap ,性能会有一定的下降。

  • 第 9 至 16 行 :调用 getValue(key, useReadOnlyCache) 方法,读取缓存。从 readOnlyCacheMapreadWriteCacheMap 变量可以看到缓存值的类为 com.netflix.eureka.registry.ResponseCacheImpl.Value ,实现代码如下:

    public class Value {

    * 原始值
    private final String payload;
    * GZIP 压缩后的值
    private byte[] gzipped;

    public Value(String payload) {
    this.payload = payload;
    if (!EMPTY_PAYLOAD.equals(payload)) {
    // ... 省略 GZIP 压缩代码
    gzipped = bos.toByteArray();
    } else {
    gzipped = null;

    public String getPayload() {
    return payload;

    public byte[] getGzipped() {
    return gzipped;


  • 第 21 至 31 行 :读取缓存。

    • 第 21 至 28 行 :先读取 readOnlyCacheMap 。读取不到,读取 readWriteCacheMap ,并设置到 readOnlyCacheMap

    • 第 29 至 31 行 :读取 readWriteCacheMap

    • readWriteCacheMap 实现代码如下:

      this.readWriteCacheMap =
      .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
      .removalListener(new RemovalListener<Key, Value>() {
      public void onRemoval(RemovalNotification<Key, Value> notification) {
      // TODO[0009]:RemoteRegionRegistry
      Key removedKey = notification.getKey();
      if (removedKey.hasRegions()) {
      Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
      regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
      .build(new CacheLoader<Key, Value>() {
      public Value load(Key key) throws Exception {
      // // TODO[0009]:RemoteRegionRegistry
      if (key.hasRegions()) {
      Key cloneWithNoRegions = key.cloneWithoutRegions();
      regionSpecificKeys.put(cloneWithNoRegions, key);
      Value value = generatePayload(key);
      return value;

      • readWriteCacheMap 最大缓存数量为 1000 。
      • 调用 #generatePayload(key) 方法,生成缓存值。
  • #generatePayload(key) 方法,实现代码如下:

     1: private Value generatePayload(Key key) {
    2: Stopwatch tracer = null;
    3: try {
    4: String payload;
    5: switch (key.getEntityType()) {
    6: case Application:
    7: boolean isRemoteRegionRequested = key.hasRegions();
    9: if (ALL_APPS.equals(key.getName())) {
    10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
    11: tracer = serializeAllAppsWithRemoteRegionTimer.start();
    12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
    13: } else {
    14: tracer = serializeAllAppsTimer.start();
    15: payload = getPayLoad(key, registry.getApplications());
    16: }
    17: } else if (ALL_APPS_DELTA.equals(key.getName())) {
    18: // ... 省略增量获取相关的代码
    19: } else {
    20: tracer = serializeOneApptimer.start();
    21: payload = getPayLoad(key, registry.getApplication(key.getName()));
    22: }
    23: break;
    24: // ... 省略部分代码
    25: }
    26: return new Value(payload);
    27: } finally {
    28: if (tracer != null) {
    29: tracer.stop();
    30: }
    31: }
    32: }

    • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
    • 第 13 至 16 行 :调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合。后调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值。🙂 这两个方法代码较多,下面详细解析。
    • 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.1 获得注册的应用集合

调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合,实现代码如下:

 1: // AbstractInstanceRegistry.java
3: private static final String[] EMPTY_STR_ARRAY = new String[0];
5: public Applications getApplications() {
6: boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
7: if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry
8: return getApplicationsFromLocalRegionOnly();
9: } else {
10: return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
11: }
12: }
14: public Applications getApplicationsFromLocalRegionOnly() {
15: return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
16: }

  • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry

  • 第 9 至 16 行 :调用 #getApplicationsFromMultipleRegions(...) 方法,获得注册的应用集合,实现代码如下:

     1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    2: // TODO[0009]:RemoteRegionRegistry
    3: boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    4: logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
    5: includeRemoteRegion, Arrays.toString(remoteRegions));
    6: if (includeRemoteRegion) {
    8: } else {
    9: GET_ALL_CACHE_MISS.increment();
    10: }
    11: // 获得获得注册的应用集合
    12: Applications apps = new Applications();
    13: apps.setVersion(1L);
    14: for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
    15: Application app = null;
    17: if (entry.getValue() != null) {
    18: for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
    19: Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
    20: if (app == null) {
    21: app = new Application(lease.getHolder().getAppName());
    22: }
    23: app.addInstance(decorateInstanceInfo(lease));
    24: }
    25: }
    26: if (app != null) {
    27: apps.addApplication(app);
    28: }
    29: }
    30: // TODO[0009]:RemoteRegionRegistry
    31: if (includeRemoteRegion) {
    32: for (String remoteRegion : remoteRegions) {
    33: RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
    34: if (null != remoteRegistry) {
    35: Applications remoteApps = remoteRegistry.getApplications();
    36: for (Application application : remoteApps.getRegisteredApplications()) {
    37: if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
    38: logger.info("Application {} fetched from the remote region {}",
    39: application.getName(), remoteRegion);
    41: Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
    42: if (appInstanceTillNow == null) {
    43: appInstanceTillNow = new Application(application.getName());
    44: apps.addApplication(appInstanceTillNow);
    45: }
    46: for (InstanceInfo instanceInfo : application.getInstances()) {
    47: appInstanceTillNow.addInstance(instanceInfo);
    48: }
    49: } else {
    50: logger.debug("Application {} not fetched from the remote region {} as there exists a "
    51: + "whitelist and this app is not in the whitelist.",
    52: application.getName(), remoteRegion);
    53: }
    54: }
    55: } else {
    56: logger.warn("No remote registry available for the remote region {}", remoteRegion);
    57: }
    58: }
    59: }
    60: // 设置 应用集合 hashcode
    61: apps.setAppsHashCode(apps.getReconcileHashCode());
    62: return apps;
    63: }

    • 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
    • 第 11 至 29 行 :获得获得注册的应用集合。
    • 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
    • 第 61 行 :计算应用集合 hashcode 。该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.2 转换成缓存值

调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值,实现代码如下:

* Generate pay load with both JSON and XML formats for all applications.
private String getPayLoad(Key key, Applications apps) {
// 获得编码器
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
// 编码
result = encoderWrapper.encode(apps);
} catch (Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
if(logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
return result;

3.4 主动过期读写缓存

应用实例注册、下线、过期时,调用 ResponseCacheImpl#invalidate() 方法,主动过期读写缓存( readWriteCacheMap ),实现代码如下:

public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));

  • 调用 #invalidate(keys) 方法,逐个过期每个缓存键值,实现代码如下:

    public void invalidate(Key... keys) {
    for (Key key : keys) {
    logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
    // 过期读写缓存
    // TODO[0009]:RemoteRegionRegistry
    Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
    if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
    for (Key keysWithRegion : keysWithRegions) {
    logger.debug("Invalidating the response cache key : {} {} {} {} {}",
    key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

3.5 被动过期读写缓存

读写缓存( readWriteCacheMap ) 写入后,一段时间自动过期,实现代码如下:


  • 配置 eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

3.6 定时刷新只读缓存

定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。实现代码如下:

 1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
2: // ... 省略无关代码
4: long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
5: // ... 省略无关代码
7: if (shouldUseReadOnlyResponseCache) {
8: timer.schedule(getCacheUpdateTask(),
9: new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
10: + responseCacheUpdateIntervalMs),
11: responseCacheUpdateIntervalMs);
12: }
14: // ... 省略无关代码
15: }
17: private TimerTask getCacheUpdateTask() {
18: return new TimerTask() {
19: @Override
20: public void run() {
21: logger.debug("Updating the client cache from response cache");
22: for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键
23: if (logger.isDebugEnabled()) {
24: Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
25: logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
26: }
27: try {
28: CurrentRequestVersion.set(key.getVersion());
29: Value cacheValue = readWriteCacheMap.get(key);
30: Value currentCacheValue = readOnlyCacheMap.get(key);
31: if (cacheValue != currentCacheValue) { // 不一致时,进行替换
32: readOnlyCacheMap.put(key, cacheValue);
33: }
34: } catch (Throwable th) {
35: logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
36: }
37: }
38: }
39: };
40: }

  • 第 7 至 12 行 :初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒。
  • 第 17 至 39 行 :创建定时任务。
    • 第 22 行 :循环 readOnlyCacheMap 的缓存键。为什么不循环 readWriteCacheMapreadOnlyCacheMap 的缓存过期依赖 readWriteCacheMap,因此缓存键会更多。
    • 第 28 行 至 33 行 :对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

666. 彩蛋




