超详细的Eureka源码解析

Eureka简介

Eureka是什么?

Eureka是基于REST(Representational State Transfer)服务,主要以AWS云服务为支撑,提供服务发现并实现负载均衡和故障转移。我们称此服务为Eureka服务。Eureka提供了Java客户端组件,Eureka Client,方便与服务端的交互。客户端内置了基于round-robin实现的简单负载均衡。在Netflix,为Eureka提供更为复杂的负载均衡方案进行封装,以实现高可用,它包括基于流量、资源利用率以及请求返回状态的加权负载均衡。

Eureka架构

Eureka架构从CAP理论看,Eureka是一个AP系统,优先保证可用性(A)和分区容错性(P),Eureka里使用了大量的缓存。

Eureka中的一些概念

  • Register :服务注册

Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口等

  • Renew:服务续约

Eureka客户端会每隔30秒发送一次心跳来续约。通过续约来告知Eureka Server该客户端仍然存在。

  • Fetch Registries:获取注册列表信息

Eureka客户端从服务器获取注册表信息,将其缓存到本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒)更新一次。

  • Cancel:服务下线

Eureka客户端在程序关闭时向Eureka服务器发送取消请求。

  • Eviction:服务剔除

在默认情况下,当Eureka客户端90秒没有向Eureka服务器发送续约,Eureka服务器就会将该服务实例从服务注册列表删除。

除了以上的特性外,Eureka的缓存机制也非常经典,下面详细介绍一下。

Eureka缓存

Eureka Server里存在三个变量(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息。

Eureka客户端向服务端注册之后,数据会立即同步到readWriteCacheMap和registry。

Eureka客户端想查看注册信息,每隔30秒从readOnlyCacheMap拉取。

readOnlyCacheMap会通过定时器每30秒从readWriteCacheMap拉取。

还有一个线程每隔60会将90秒都没有续约的服务剔除出去。

变量 类型 说明
registry ConcurrentHashMap 实时更新,类AbstractInstanceRegistry成员变量,UI端请求的是这里的服务注册信息
readWriteCacheMap Guava Cache 实时更新,类ResponseCacheImpl成员变量,缓存时间180秒
readOnlyCacheMap ConcurrentHashMap 周期更新,类ResponseCacheImpl成员变量,默认每30s从readWriteCacheMap更新,Eureka client默认从这里更新服务注册信息,可配置直接从readWriteCacheMap更新

Eureka Client

本文使用的是2.0.2.RELEASE版本

接下来开始分析Eureka Client的源码。引入spring-cloud-starter-netflix-eureka-client后,Eureka Client会自动启用。EurekaClientAutoConfiguration配置类生效,会注入Bean CloudEurekaClient,然后调用父类DiscoveryClient的构造方法。

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config,AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {
   //省略部分代码
   //如果配置不用注册到Eureka && 配置不用从注册中心获取配置,则不用初始化相关组件
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config),clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",initTimestampMs,this.getApplications().size());

            return;  // no need to setup up an network tasks and we are done
        }

        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            //初始化定时调度器
            scheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            //发送心跳的线程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1,clientConfig.getHeartbeatExecutorThreadPoolSize(),TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            //注册信息缓存刷新的线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1,clientConfig.getCacheRefreshExecutorThreadPoolSize(),new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport,args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper,clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!",e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}",th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally,init the schedule tasks (e.g. cluster resolvers,heartbeat,instanceInfo replicator,fetch
        //初始化定时任务,服务心跳、服务注册、服务列表获取等功能在此处完成
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers",e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",this.getApplications().size());
    }

接下来看initScheduledTasks方法


    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            //默认30s
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //缓存刷新定时任务
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,expBackOffBound,//1. 缓存刷新具体逻辑
                            new CacheRefreshThread()
                    ),TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}",renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",heartbeatExecutor,renewalIntervalInSecs,//2. 心跳具体逻辑
                            new HeartbeatThread()
                    ),TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize
            
            //状态改变监听
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}",statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}",statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

           //3. clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()默认40s,服务注册就在这个方法里完成。
           instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

再来细看上面3个主要方法的具体逻辑。

1. 缓存刷新

 class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }
 void refreshRegistry() {
       //省略部分代码
       //获取服务列表信息
        boolean success = fetchRegistry(remoteRegionsModified);
       //省略部分代码

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time,get all
            // applications
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                //全量获取服务列表缓存在本地
                getAndStoreFullRegistry();
            } else {
               //更新服务列表 getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}",appPathIdentifier,e.getMessage(),e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully,so return true
        return true;
    }

然后调用EurekaHttpClient接口的方法去获取服务列表。请求发送通过jersey

2. 服务心跳

继续跟踪HeartbeatThread方法

  private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}",httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}",instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                //调用注册方法
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!",e);
            return false;
        }
    }

3. 服务注册

跟踪instanceInfoReplicator.start方法

 public void start(int initialDelayMs) {
        if (started.compareAndSet(false,true)) {
            //设置标识,为了启动时进行服务注册
            instanceInfo.setIsDirty();  // for initial register
            
            //延迟40s执行,执行的是this对象的run方法
            Future next = scheduler.schedule(this,initialDelayMs,TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
public void run() {
        try {
            discoveryClient.refreshInstanceInfo();
            //刚才start方法中,设置了标识,所以此处dirtyTimestamp不为空
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
               //服务注册
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator",t);
        } finally {
            //递归性的延迟30s执行当前run方法
            Future next = scheduler.schedule(this,replicationIntervalSeconds,TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

4. 服务关闭

服务关闭之后,会回调到EurekaAutoServiceRegistration类的stop方法,回调的方法是:

@EventListener(ContextClosedEvent.class)
	public void onApplicationEvent(ContextClosedEvent event) {
		if( event.getApplicationContext() == context ) {
		    //服务关闭
			stop();
		}
	}

SmartLifecycle接口也有这个作用,不过我本地使用了一下,是通过ContextClosedEvent来回调的。

public void stop() {
		this.serviceRegistry.deregister(this.registration);
		this.running.set(false);
	}
	@Override
	public void deregister(EurekaRegistration reg) {
		if (reg.getApplicationInfoManager().getInfo() != null) {

			if (log.isInfoEnabled()) {
				log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
						+ " with eureka with status DOWN");
			}
            //状态改为DOWN
			reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);

			//shutdown of eureka client should happen with EurekaRegistration.close()
			//auto registration will create a bean which will be properly disposed
			//manual registrations will need to call close()
		}
	}

从上文分析得知Eureka Client调取服务端的接口都是通过EurekaHttpClient接口,而最终发送请求的httpClient是jersey里面的ApacheHttpClient4。

public interface EurekaHttpClient {

    EurekaHttpResponse<Void> register(InstanceInfo info);

    EurekaHttpResponse<Void> cancel(String appName,String id);

    EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName,String id,InstanceInfo info,InstanceStatus overriddenStatus);

    EurekaHttpResponse<Void> statusUpdate(String appName,InstanceStatus newStatus,InstanceInfo info);

    EurekaHttpResponse<Void> deleteStatusOverride(String appName,InstanceInfo info);

    EurekaHttpResponse<Applications> getApplications(String... regions);

    EurekaHttpResponse<Applications> getDelta(String... regions);

    EurekaHttpResponse<Applications> getVip(String vipAddress,String... regions);

    EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress,String... regions);

    EurekaHttpResponse<Application> getApplication(String appName);

    EurekaHttpResponse<InstanceInfo> getInstance(String appName,String id);

    EurekaHttpResponse<InstanceInfo> getInstance(String id);

    void shutdown();
}

Eureka Server

Eureka Server需要做的事有:

  • 维护服务注册信息列表
  • 接收客户端的register、renew、cancel等请求
  • Eureka Server多节点之间的数据复制同步

项目启动时,EurekaServerAutoConfiguration会被自动注入到容器中。

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
    //省略部分代码

	@Configuration
	protected static class EurekaServerConfigBeanConfiguration {
		@Bean
		@ConditionalOnMissingBean
		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
			EurekaServerConfigBean server = new EurekaServerConfigBean();
			if (clientConfig.shouldRegisterWithEureka()) {
				// Set a sensible default if we are supposed to replicate
				server.setRegistrySyncRetries(5);
			}
			return server;
		}
	}
    
    //Eureka管理页面的Controller
	@Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard",name = "enabled",matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}

	static {
		CodecWrappers.registerWrapper(JACKSON_JSON);
		EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
	}

	@Bean
	public ServerCodecs serverCodecs() {
		return new CloudServerCodecs(this.eurekaServerConfig);
	}

	private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
		CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
		return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
	}

	private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
		CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
		return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class)
				: codec;
	}

	class CloudServerCodecs extends DefaultServerCodecs {

		public CloudServerCodecs(EurekaServerConfig serverConfig) {
			super(getFullJson(serverConfig),CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class),getFullXml(serverConfig),CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class));
		}
	}

    //处理Eureka Client的register、renew、cancel等请求
	@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig,this.eurekaClientConfig,serverCodecs,this.eurekaClient,this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}

    //处理Eureka Server多节点同步
	@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,ServerCodecs serverCodecs) {
		return new RefreshablePeerEurekaNodes(registry,this.eurekaServerConfig,this.applicationInfoManager);
	}
	
	//省略部分代码

1. 请求接受处理

InstanceResource类主要用于接受请求,收到请求后调用InstanceRegistry类的方法进行处理。以renew为例:

 @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(),id,isFromReplicaNode);
        //...

2. 服务剔除

EurekaServerAutoConfiguration类引入了EurekaServerInitializerConfiguration类。容器初始化会触发start方法,start方法如下:

@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					//初始化方法
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context",ex);
				}
			}
		}).start();
	}
eurekaServerBootstrap.contextInitialized()
-》this.registry.openForTraffic(this.applicationInfoManager,registryCount);
    -》 super.postInit();

postInit代码如下:

 protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        //服务剔除定时任务
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),//延迟60s,每60执行一次
                serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
    }

3. readOnlyCacheMap缓存周期更新

DefaultEurekaServerContext类的initialize方法上加了@PostConstruct注解,会在bean构造后被执行:

  @PostConstruct
    @Override
    public void initialize() {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        try {
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }

init()-》 initializedResponseCache()-》new ResponseCacheImpl

ResponseCacheImpl方法如下:

ResponseCacheImpl(EurekaServerConfig serverConfig,ServerCodecs serverCodecs,AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;

        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        //readWriteCacheMap是guava缓存,缓存加载是用的load方法里的实现
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(1000)
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(),TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key,Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key,Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions,removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key,Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions,key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
        
        if (shouldUseReadOnlyResponseCache) {
        //定时30s刷新缓存,具体逻辑在getCacheUpdateTask
            timer.schedule(getCacheUpdateTask(),new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),responseCacheUpdateIntervalMs);
        }

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry",e);
        }
    }
private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}",key.getEntityType(),key.getName(),key.getVersion(),key.getType());
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        //对比值,不同的话readOnlyCacheMap取readWriteCacheMap里的值放入。
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key,cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}",key.toStringCompact(),th);
                    }
                }
            }
        };
    }

番外:源码里有一个这个东西,存最近的数据,如果有相同需求可以借鉴

 private class CircularQueue<E> extends ConcurrentLinkedQueue<E> {
        private int size = 0;

        public CircularQueue(int size) {
            this.size = size;
        }

        @Override
        public boolean add(E e) {
            this.makeSpaceIfNotAvailable();
            return super.add(e);

        }

        private void makeSpaceIfNotAvailable() {
            if (this.size() == size) {
                this.remove();
            }
        }

        public boolean offer(E e) {
            this.makeSpaceIfNotAvailable();
            return super.offer(e);
        }
    }

原文地址:https://www.cnblogs.com/javammc

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


这篇文章主要介绍了spring的事务传播属性REQUIRED_NESTED的原理介绍,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。传统事务中回滚点的使...
今天小编给大家分享的是一文解析spring中事务的传播机制,相信很多人都不太了解,为了让大家更加了解,所以给大家总结了以下内容,一起往下看吧。一定会有所收获...
这篇文章主要介绍了SpringCloudAlibaba和SpringCloud有什么区别,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。Spring Cloud Netfli...
本篇文章和大家了解一下SpringCloud整合XXL-Job的几个步骤。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。第一步:整合pom文件,在S...
本篇文章和大家了解一下Spring延迟初始化会遇到什么问题。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。List 坑列表 = new ArrayList(2);...
这篇文章主要介绍了怎么使用Spring提供的不同缓存注解实现缓存的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇...
本篇内容主要讲解“Spring中的@Autowired和@Resource注解怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学...
今天小编给大家分享一下SpringSecurity怎么定义多个过滤器链的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家
这篇文章主要介绍“Spring的@Conditional注解怎么使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Spring的@Con...
这篇文章主要介绍了SpringCloudGateway的熔断限流怎么配置的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringCloud&nb...
今天小编给大家分享一下怎么使用Spring解决循环依赖问题的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考
这篇文章主要介绍“Spring事务及传播机制的原理及应用方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Sp...
这篇“SpringCloudAlibaba框架实例应用分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价
本篇内容主要讲解“SpringBoot中怎么使用SpringMVC”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习...
这篇文章主要介绍“SpringMVC适配器模式作用范围是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“SpringMVC
这篇“导入SpringCloud依赖失败如何解决”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家...
这篇文章主要讲解了“SpringMVC核心DispatcherServlet处理流程是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来
今天小编给大家分享一下SpringMVCHttpMessageConverter消息转换器怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以...
这篇文章主要介绍“Spring框架实现依赖注入的原理是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Spring框架...
本篇内容介绍了“Spring单元测试控制Bean注入的方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下