Eureka核心源码解析(一):应用实例注册、续约、下线

2024-01-05 18:40

本文主要是介绍Eureka核心源码解析(一):应用实例注册、续约、下线,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要来解析Eureka应用实例注册、续约、下线的核心源码,基于1.9.8版本

一、应用实例注册

1、Eureka Client发起注册

Eureka Client向Eureka Server发起注册应用实例需要符合如下条件:

  • 配置eureka.registration.enabled=true,Eureka Client向Eureka Server发起注册应用实例的开关
  • InstanceInfo在Eureka Client和Eureka Server数据不一致

每次InstanceInfo发生属性变化时,标记isInstanceInfoDirty属性为true,表示InstanceInfo在Eureka Client和Eureka Server数据不一致,需要注册。另外,InstanceInfo刚被创建时,在Eureka Server不存在,也会被注册

当符合条件时,InstanceInfo不会立即向Eureka Server注册,而是后台线程定时注册

当InstanceInfo的状态(status)属性发生变化时,并且配置eureka.shouldOnDemandUpdateStatusChange=true(默认为true)时,立即向Eureka Server注册

1)、应用实例信息复制
public class DiscoveryClient implements EurekaClient {/*** 应用实例状态变更监听器*/private ApplicationInfoManager.StatusChangeListener statusChangeListener;/*** 应用实例信息复制器*/private InstanceInfoReplicator instanceInfoReplicator;private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}

调用InstanceInfoReplicator的start(int initialDelayMs) 方法,开启应用实例信息复制器。实现代码如下:

class InstanceInfoReplicator implements Runnable {public void start(int initialDelayMs) {if (started.compareAndSet(false, true)) {// 设置应用实例信息数据不一致instanceInfo.setIsDirty();  // for initial register// 提交任务Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}public void run() {try {// 刷新应用实例信息discoveryClient.refreshInstanceInfo();// 判断应用实例信息是否数据不一致Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 发起注册discoveryClient.register();// 设置应用实例信息数据一致instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {// 提交任务 不断循环定时执行任务Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

调用DiscoveryClient的register() 方法,EurekaClient向Eureka Server注册应用实例

2)、发起注册应用实例
public class DiscoveryClient implements EurekaClient {boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的register方法httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}

AbstractJerseyEurekaHttpClient的register()方法使用POST请求调用Eureka Server的apps/${APP_NAME}接口,参数为InstanceInfo,实现注册实例信息的注册

2、Eureka Server接收注册

Eureka Server接收注册核心流程如下图:

在这里插入图片描述

1)、接收注册请求
@Produces({"application/xml", "application/json"})
public class ApplicationResource {@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// 参数合法性校验// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注册应用实例信息registry.register(info, "true".equals(isReplication));// 返回204状态码return Response.status(204).build();  // 204 to be backwards compatible}  
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {// 续约过期时间int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}// 注册应用实例信息super.register(info, leaseDuration, isReplication);// Eureka Server复制replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的register(...)方法注册实例信息

2)、Lease
public class Lease<T> {enum Action {Register, Cancel, Renew};public static final int DEFAULT_DURATION_IN_SECS = 90;/*** InstanceInfo实体*/private T holder;/*** 取消注册时间戳*/private long evictionTimestamp;/*** 注册时间戳*/private long registrationTimestamp;/*** 开始服务时间戳*/private long serviceUpTimestamp;/*** 租约最后更新时间戳*/// Make it volatile so that the expiration task would see this quickerprivate volatile long lastUpdateTimestamp;/*** 租约持续时长(毫秒)*/private long duration;public Lease(T r, int durationInSecs) {holder = r;registrationTimestamp = System.currentTimeMillis();lastUpdateTimestamp = registrationTimestamp;duration = (durationInSecs * 1000);}
3)、注册应用实例信息

调用了AbstractInstanceRegistry的register(...)方法,注册实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {/*** 租约映射* key1:应用名* key2:应用实例信息编号* value:租约*/private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {// 获取读锁read.lock();Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());// 增加注册次数到监控REGISTER.increment(isReplication);// 获得应用实例信息对应的租约if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();// 添加应用gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 创建租约Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 添加到租约gMap.put(registrant.getId(), lease);// 添加到最近注册的调试队列synchronized (recentRegisteredQueue) {recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));}// 添加到应用实例覆盖状态映射// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// 获得应用实例最终状态,并设置应用实例的状态// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 设置租约的开始服务的时间戳(只有第一次有效)// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}// 设置应用实例信息的操作类型为添加registrant.setActionType(ActionType.ADDED);// 添加到最近租约变更记录队列recentlyChangedQueue.add(new RecentlyChangedItem(lease));// 设置租约的最后更新时间戳registrant.setLastUpdatedTimestamp();// 设置响应缓存过期invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {// 释放锁read.unlock();}}

二、应用实例续约

1、Eureka Client发起续约

Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期

默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会

1)、初始化定时任务

Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下:

public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
2)、发起续约
public class DiscoveryClient implements EurekaClient {/*** 最后成功向Eureka Server心跳时间戳*/private volatile long lastSuccessfulHeartbeatTimestamp = -1;private class HeartbeatThread implements Runnable {public void run() {// 调用续约方法if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的sendHeartBeat方法httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}  

AbstractJerseyEurekaHttpClient的renew()方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约

2、Eureka Server接收续约

Eureka Server接收续约核心流程如下图:

在这里插入图片描述

1)、接收续约请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@PUTpublic Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 续约boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// 续约失败// Not found in the registry, immediately ask for a registerif (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}// 比较InstanceInfo的lastDirtyTimestamp属性// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());return response;}
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {public boolean renew(final String appName, final String id, final boolean isReplication) {// 续约if (super.renew(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);return true;}return false;}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)方法续约实例信息

2)、续约应用实例信息

调用了AbstractInstanceRegistry的renew(...)方法,续约实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {public boolean renew(String appName, String id, boolean isReplication) {// 增加续约次数到监控RENEW.increment(isReplication);// 获取应用名对应的租约Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) {leaseToRenew = gMap.get(id);}// 租约不存在if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {InstanceInfo instanceInfo = leaseToRenew.getHolder();if (instanceInfo != null) {// touchASGCache(instanceInfo.getASGName());InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),instanceInfo.getOverriddenStatus().name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}// 新增续租每分钟次数renewsLastMin.increment();// 设置租约最后更新时间leaseToRenew.renew();return true;}}
public class Lease<T> {public void renew() {// 设置租约最后更新时间戳lastUpdateTimestamp = System.currentTimeMillis() + duration;}

续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁

三、应用实例下线

1、Eureka Client发起下线

应用实例关闭时,Eureka Client向Eureka Server发起下线应用实例。需要满足如下条件才可发起:

  • 配置eureka.registration.enabled=true,应用实例开启注册开关。默认为false
  • 配置eureka.shouldUnregisterOnShutdown=true,应用实例开启关闭时下线开关。默认为true
public class DiscoveryClient implements EurekaClient {@PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled=true&& clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown=trueapplicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}}void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");// 调用AbstractJerseyEurekaHttpClient的cancel方法EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> cancel(String appName, String id) {String urlPath = "apps/" + appName + '/' + id;ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.delete(ClientResponse.class);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}

AbstractJerseyEurekaHttpClient的cancel()方法使用DELETE请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,实现应用实例信息的下线

2、Eureka Server接收下线

Eureka Server接收下线请求核心流程如下图:

在这里插入图片描述

1)、接收下线请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@DELETEpublic Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {try {// 下线boolean isSuccess = registry.cancel(app.getName(), id,"true".equals(isReplication));if (isSuccess) {logger.debug("Found (Cancel): {} - {}", app.getName(), id);return Response.ok().build();} else {logger.info("Not Found (Cancel): {} - {}", app.getName(), id);return Response.status(Status.NOT_FOUND).build();}} catch (Throwable e) {logger.error("Error (cancel): {} - {}", app.getName(), id, e);return Response.serverError().build();}}  
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic boolean cancel(final String appName, final String id,final boolean isReplication) {// 下线if (super.cancel(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to send renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}}return true;}return false;}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的cancel(...)方法下线应用实例信息

2)、下线应用实例信息

调用了AbstractInstanceRegistry的cancel(...)方法,下线应用实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {@Overridepublic boolean cancel(String appName, String id, boolean isReplication) {return internalCancel(appName, id, isReplication);}protected boolean internalCancel(String appName, String id, boolean isReplication) {try {// 获得读锁read.lock();// 增加取消注册次数到监控CANCEL.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;// 移除租约映射if (gMap != null) {leaseToCancel = gMap.remove(id);}// 添加到最近取消注册的调试队列synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}// 移除应用实例覆盖状态映射InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}// 租约不存在if (leaseToCancel == null) {// 添加取消注册不存在到监控CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {// 设置租约的取消注册时间戳leaseToCancel.cancel();// 添加到最近租约变更记录队列InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {instanceInfo.setActionType(ActionType.DELETED);recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();vip = instanceInfo.getVIPAddress();svip = instanceInfo.getSecureVipAddress();}// 设置响应缓存过期invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {// 释放锁read.unlock();}}  
public class Lease<T> {public void cancel() {if (evictionTimestamp <= 0) {// 设置取消注册时间戳evictionTimestamp = System.currentTimeMillis();}}

参考:

Eureka 源码解析 —— 应用实例注册发现(一)之注册

Eureka 源码解析 —— 应用实例注册发现(二)之续租

Eureka 源码解析 —— 应用实例注册发现(三)之下线

这篇关于Eureka核心源码解析(一):应用实例注册、续约、下线的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/573789

相关文章

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

C语言中位操作的实际应用举例

《C语言中位操作的实际应用举例》:本文主要介绍C语言中位操作的实际应用,总结了位操作的使用场景,并指出了需要注意的问题,如可读性、平台依赖性和溢出风险,文中通过代码介绍的非常详细,需要的朋友可以参... 目录1. 嵌入式系统与硬件寄存器操作2. 网络协议解析3. 图像处理与颜色编码4. 高效处理布尔标志集合

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Python利用ElementTree实现快速解析XML文件

《Python利用ElementTree实现快速解析XML文件》ElementTree是Python标准库的一部分,而且是Python标准库中用于解析和操作XML数据的模块,下面小编就来和大家详细讲讲... 目录一、XML文件解析到底有多重要二、ElementTree快速入门1. 加载XML的两种方式2.

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

Java中的Lambda表达式及其应用小结

《Java中的Lambda表达式及其应用小结》Java中的Lambda表达式是一项极具创新性的特性,它使得Java代码更加简洁和高效,尤其是在集合操作和并行处理方面,:本文主要介绍Java中的La... 目录前言1. 什么是Lambda表达式?2. Lambda表达式的基本语法例子1:最简单的Lambda表

java解析jwt中的payload的用法

《java解析jwt中的payload的用法》:本文主要介绍java解析jwt中的payload的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java解析jwt中的payload1. 使用 jjwt 库步骤 1:添加依赖步骤 2:解析 JWT2. 使用 N

Python中__init__方法使用的深度解析

《Python中__init__方法使用的深度解析》在Python的面向对象编程(OOP)体系中,__init__方法如同建造房屋时的奠基仪式——它定义了对象诞生时的初始状态,下面我们就来深入了解下_... 目录一、__init__的基因图谱二、初始化过程的魔法时刻继承链中的初始化顺序self参数的奥秘默认

Python结合PyWebView库打造跨平台桌面应用

《Python结合PyWebView库打造跨平台桌面应用》随着Web技术的发展,将HTML/CSS/JavaScript与Python结合构建桌面应用成为可能,本文将系统讲解如何使用PyWebView... 目录一、技术原理与优势分析1.1 架构原理1.2 核心优势二、开发环境搭建2.1 安装依赖2.2 验