1.简单工厂设计模式
389 2023-04-03 02:41:18
package org.springframework.cloud.client.discovery;public interface DiscoveryClient {String description();List<ServiceInstance> getInstances(String serviceId);List<String> getServices();}
package org.springframework.cloud.netflix.eureka;public class EurekaDiscoveryClient implements DiscoveryClient {}
package com.netflix.discovery;@Singletonpublic class DiscoveryClient implements EurekaClient {}
LookupService---> EurekaClient ----> DiscoveryClient
package com.netflix.discovery.shared;public interface LookupService<T> { Application getApplication(String appName); Applications getApplications(); List<InstanceInfo> getInstancesById(String id); InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);}
package com.netflix.discovery.shared;@Serializer("com.netflix.discovery.converters.EntityBodyConverter")@XStreamAlias("application")@JsonRootName("application")public class Application { private static Random shuffleRandom = new Random(); private String name; private volatile boolean isDirty = false; private final Set<InstanceInfo> instances; private final AtomicReference<List<InstanceInfo>> shuffledInstances; private final Map<String, InstanceInfo> instancesMap;}
private void removeInstance(InstanceInfo i, boolean markAsDirty) { instancesMap.remove(i.getId()); synchronized (instances) { instances.remove(i); if (markAsDirty) { isDirty = true; } } }
//为Eureka Client注册健康检查处理器 public void registerHealthCheck(HealthCheckHandler healthCheckHandler); //监听Client服务实例信息的更新 public void registerEventListener(EurekaEventListener eventListener);
public interface HealthCheckHandler { InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);}
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider)
参数说明
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { //1、参数说明 //ApplicationInfoManager:应用信息管理器 //EurekaClientConfig:Client与Server交互信息 //AbstractDiscoveryClientOptionalArgs:可选参数 //Provider<BackupRegistry>:注册中心备份 //2、初始化信息 if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); //3、配置信息读取初始化 //shouldFetchRegistry对应配置:eureka.client.fetch-register=true/false(是否从Server中拉取注册表信息) //shouldRegisterWithEureka对应配置:eureka.client.register-with-eureka=true/false(是否将自身信息注册到Server) if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } //4、线程池定义 //scheduler:线程池大小为2 //heartbeatExecutor:心跳发送线程 //cacheRefreshExecutor:缓存刷新线程 try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //5、构建EurekaTransport //EurekaTransport是DiscoverClient内部类,封装了Client与Server进行http调用的Jersey客户端 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } //6、拉取注册表信息:先拉取Server注册表信息,并缓存到本地,减少与Server端通讯 //shouldFetchRegistry()=true,并且没有注册过 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } //7、注册服务:注册之前先调用预注册功能 // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } //8、开始注册 //开始注册:register() //初始化定时任务:initScheduledTasks(); if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications Applications applications = getApplications(); //1、判断拉取方式 //如果增量方式被禁止,或Application为空,采用全量拉取方式 //getAndUpdateDelta:全量拉取 //getAndUpdateDelta:增量拉取 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); } else { getAndUpdateDelta(applications); } //2、计算集合一致性哈希码 applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } //3、更新远程实例数据 // onCacheRefreshed:推送缓存刷新事件 //updateInstanceRemoteStatus:缓存中被刷新数据更新远程实例数据 // Notify about cache refresh before updating the instance remote status onCacheRefreshed(); // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; }
private void getAndStoreFullRegistry() throws Throwable { //1、获取注册表的版本号,防止版本落后(由线程引起) long currentUpdateGeneration = fetchRegistryGeneration.get(); logger.info("Getting all instance registry info from the eureka server"); Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); //2、信息获取成功 if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); //3、判断信息 //检查fetchRegistryGeneration的版本更新是否有改变,无改变说明是最新数据 //有个数据更新:从app中选出状态为UP的实例,同时打乱实例顺序,防止同一个服务不同的实例在启动时接受流量 if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); } }
private void getAndUpdateDelta(Applications applications) throws Throwable { //1、获取版本号 long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } //2、数据获取 //数据获取失败:调用全量拉取方法拉取数据 //拉取成功:更新本地缓存,计算集合一致性哈希码,如果哈希码不一致认为本次数据为脏数据,继续采用全量拉取信息 if (delta == null) { logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { //更新缓存数据 updateDelta(delta); reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
public enum ActionType { ADDED, // Added in the discovery server MODIFIED, // Changed in the discovery server DELETED// Deleted from the discovery server }
//缓存数据更新:遍历列表数据,将添加和修改的数据添加本地注册表中,将删除类型的数据从本地注册表删除 private void updateDelta(Applications delta) { int deltaCount = 0; for (Application app : delta.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } //将添加类型数据直接添加到本地注册表 logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } //将修改类型数据直接添加到本地注册表 logger.debug("Modified instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } //将删除类型的数据从本地注册表删除 logger.debug("Deleted instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance); } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { //注册服务:将数据封装到InstanceInfo httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { //1、注册表缓存刷新时间:默认30秒 //通过eureka.client.registry-fetch-interval-seconds 设置 // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } //2、发送心跳定时器:默认30秒发送一次 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); //3、注册定时器 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; //注册应用状态改变监控器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //启动定时任务 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
1、scheduler初始化并延迟执行TimedSupervisorTask2、TimedSupervisorTask将task提交到executor中执行,task和executor在初始化TimedSupervisorTask时传入3、task正常执行,TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行4、task超时执行,计算新的delay时间TimedSupervisorTask将自己提交到scheduler,延迟delay时间后再次执行
public class TimedSupervisorTask extends TimerTask { private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class); private final Counter timeoutCounter; private final Counter rejectedCounter; private final Counter throwableCounter; private final LongGauge threadPoolLevelGauge; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor executor; private final long timeoutMillis; private final Runnable task; private final AtomicLong delay; private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } @Override public void run() { Future<?> future = null; try { //执行任务:submit future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); //等待执行任务结果 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout //执行完成,设置下次任务执行频率(时间间隔) delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); } catch (TimeoutException e) { //任务超时,设置下次任务执行频率 logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { //任务拒绝,统计拒绝任务次数 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { //取消未结束的任务 if (future != null) { future.cancel(true); } //如果定时任务未关闭,定义下一次任务 if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }}
class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } }
@VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } //判断Region是否改变(Server地址),用于决定全量拉取还是增量拉取 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } //打印更新注册表缓存后变化 if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
续约核心参数:appName,InstanceId
1、调用HTTP发送心跳到Server端
2、如果请求状态码为404表示Server中不存在当前实例,线程会重新调用注册方法进行注册
3、如果请求状态码为200表示续约成功
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //调用HTTP发现心跳到Server:主要参数-appName,InstanceId httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); //Server中不存在当前实例为404,线程查重新注册 if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } //续约成功返回200 return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
private void initScheduledTasks() { ... ... //检查InstanceInfo数据是否变化,有变化重新注册 // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //监听应用状态改变,状态改变后重新发起注册 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
class InstanceInfoReplicator implements Runnable { InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) { this.discoveryClient = discoveryClient; this.instanceInfo = instanceInfo; //创建定时任务 this.scheduler = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") .setDaemon(true) .build()); this.scheduledPeriodicRef = new AtomicReference<Future>(); this.started = new AtomicBoolean(false); this.rateLimiter = new RateLimiter(TimeUnit.MINUTES); this.replicationIntervalSeconds = replicationIntervalSeconds; this.burstSize = burstSize; this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds; logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute); } public void run() { try { //刷新InstanceInfo中服务实例信息 discoveryClient.refreshInstanceInfo(); //如果数据发生改变,返回数据更新时间 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //注册实例信息,重新更新状态 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { //执行下一个延时任务 Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }}
void refreshInstanceInfo() { //更新新服务信息 applicationInfoManager.refreshDataCenterInfoIfRequired(); //更新租约信息 applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { //调用getHealthCheckHandler检查服务实例变化 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e); status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } }
@PreDestroy @Override public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); //注销监听器 if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } //取消定时任务 cancelScheduledTasks(); //服务下线 // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } //关闭Jersy客户端 if (eurekaTransport != null) { eurekaTransport.shutdown(); } //关闭相关Monitor heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } }