Eureka客户端注册流程
本文的内容是Eureka Client的服务注册的主要流程,如果你是第一次使用Eureka Client,你可以先看这里入门.
目录
入口
本文是基于spring-cloud-netflix 1.4.0.RELEASE版本
spring cloud的入口都相对容易找.eureka客户端注册的入口类是EurekaDiscoveryClientConfiguration和EurekaClientAutoConfiguration:
EurekaDiscoveryClientConfiguration:
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
public class EurekaDiscoveryClientConfiguration {
//省略
}
由注解@ConditionalOnClass(EurekaClientConfig.class)可知:只要EurekaClientConfig类在classpath下就会实例化该bean.
EurekaClientAutoConfiguration:
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
//省略
}
由@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)可以看出EurekaClientAutoConfiguration的实例化依赖于EurekaDiscoveryClientConfiguration。所以只要你依赖了spring-cloud-starter-eureka包就会启动EurekaClientAutoConfiguration,而该类初始化了eurekaServiceRegistry和discoveryClient。 所以即便你不加@EnableDiscoveryClient注解在你的Application启动类上,只要你加载了它的依赖就会自动做服务注册和发现。
EurekaDiscoveryClientConfiguration类还做了两件事:
一件是监听RefreshScopeRefreshedEvent事件(跟自动配置相关),另外一件是实例化EurekaHealthCheckHandler(跟健康检查相关)
先着重看第一件事:
@Configuration
@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
protected static class EurekaClientConfigurationRefresher {
@Autowired(required = false)
private EurekaClient eurekaClient;
@Autowired(required = false)
private EurekaAutoServiceRegistration autoRegistration;
@EventListener(RefreshScopeRefreshedEvent.class)
public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
//This will force the creation of the EurkaClient bean if not already created
//to make sure the client will be reregistered after a refresh event
if(eurekaClient != null) {
eurekaClient.getApplications();
}
if (autoRegistration != null) {
// register in case meta data changed
this.autoRegistration.stop();
this.autoRegistration.start();
}
}
}
可以看到这里用到了EurekaAutoServiceRegistration,从名字就可以看出这个是Eureka 自动注册的关键类了。
EurekaAutoServiceRegistration
先看它的实现的接口:
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
private int order = 0;
//...省略
}
由于实现了SmartLifecycle接口,该类会在所有的bean实例化之后被调用其start()方法:
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
this.serviceRegistry.register(this.registration);
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
该方法比较简单,就是第一次运行时将客户端信息注册。 至于注册信息具体有哪些,我会专门写篇博客来讲,请期待…
ServiceRegistry
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getInstanceConfig().getAppname()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
if (reg.getHealthCheckHandler() != null) {
reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
}
}
这里最重要的一步是在ApplicationInfoManager中设置当前的状态.那么设置状态又发生了什么呢? 接下来继续看ApplicationInfoManager类:
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
instanceStatusMapper.map(status)这一步应该是为未来留下的接口,现有的实现只是return status。instanceInfo.setStatus(next)会将参数next和之前的状态进行比较.如果不一致就实例信息是脏数据isInstanceInfoDirty标记为true,并记录最后修改时间lastDirtyTimestamp,并返回之前的状态:
public synchronized InstanceStatus setStatus(InstanceStatus status) {
if (this.status != status) {
InstanceStatus prev = this.status;
this.status = status;
setIsDirty();
return prev;
}
return null;
}
如果有状态返回,则表示状态发生了变化,则需要通知所有的监听者发生了StatusChangeEvent事件. > 目前没看到该事件有监听器负责监听并处理,那么服务注册是怎么发送到服务端的呢?继续往下看
public void registerStatusChangeListener(StatusChangeListener listener) {
listeners.put(listener.getId(), listener);
}
通过查找ApplicationInfoManager的registerStatusChangeListener方法的使用可以看到:
跳进去可以看到:
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
setInstanceStatus方法中的notify实际上是就做了两件事,1是记录状态的变化,2是调用instanceInfoReplicator.onDemandUpdate()方法。其实现如下:
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
简单说明下rateLimiter.acquire(burstSize, allowedRatePerMinute)这个限流是通过你配置参数中的replicationIntervalSeconds来指定的。防止单位时间内状态变化太快。
该方法的主要内容是像scheduler提交一个作业.该作业的内容是将scheduledPeriodicRef中没有完成的作业取消. > 实际上scheduledPeriodicRef中的未完成作业就是scheduler中未完成的作业:
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
然后调用this.run():
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
它首先会刷新本地的实例信息discoveryClient.refreshInstanceInfo()(这一步的细节在服务发现博客再细聊),然后通过isInstanceInfoDirty和lastDirtyTimestamp来判断是否是脏数据,如果是的话就调用discoveryClient.register()注册到eureka server中。 然后继续看discoveryClient.register():
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
eurekaTransport.registrationClient.register(instanceInfo)最终会调到AbstractJerseyEurekaHttpClient的register方法:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
这里主要是通过rest(url:http://host:port/eureka/apps/{appName})请求将注册信息注册到eureka server中。
小结:总得来说eureka client的服务注册的流程还是挺简单的。通过状态的变化来notify ApplicationInfoManager的匿名StatusChangeListener监听器,监听器通过scheduler调度进行注册客户端实例信息.