本文的内容是Eureka Client的服务注册的主要流程,如果你是第一次使用Eureka Client,你可以先看这里入门.

目录

入口

本文是基于spring-cloud-netflix 1.4.0.RELEASE版本

spring cloud的入口都相对容易找.eureka客户端注册的入口类是EurekaDiscoveryClientConfiguration和EurekaClientAutoConfiguration:

EurekaDiscoveryClientConfiguration:

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
public class EurekaDiscoveryClientConfiguration {
    //省略
}

由注解@ConditionalOnClass(EurekaClientConfig.class)可知:只要EurekaClientConfig类在classpath下就会实例化该bean.

EurekaClientAutoConfiguration:

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
    //省略
}

由@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)可以看出EurekaClientAutoConfiguration的实例化依赖于EurekaDiscoveryClientConfiguration。所以只要你依赖了spring-cloud-starter-eureka包就会启动EurekaClientAutoConfiguration,而该类初始化了eurekaServiceRegistry和discoveryClient。 所以即便你不加@EnableDiscoveryClient注解在你的Application启动类上,只要你加载了它的依赖就会自动做服务注册和发现。

EurekaDiscoveryClientConfiguration类还做了两件事:

一件是监听RefreshScopeRefreshedEvent事件(跟自动配置相关),另外一件是实例化EurekaHealthCheckHandler(跟健康检查相关)

先着重看第一件事:

@Configuration
@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
protected static class EurekaClientConfigurationRefresher {

    @Autowired(required = false)
    private EurekaClient eurekaClient;

    @Autowired(required = false)
    private EurekaAutoServiceRegistration autoRegistration;

    @EventListener(RefreshScopeRefreshedEvent.class)
    public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
        //This will force the creation of the EurkaClient bean if not already created
        //to make sure the client will be reregistered after a refresh event
        if(eurekaClient != null) {
            eurekaClient.getApplications();
        }
        if (autoRegistration != null) {
            // register in case meta data changed
            this.autoRegistration.stop();
            this.autoRegistration.start();
        }
    }
}

可以看到这里用到了EurekaAutoServiceRegistration,从名字就可以看出这个是Eureka 自动注册的关键类了。

EurekaAutoServiceRegistration

先看它的实现的接口:

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
    private int order = 0;
    //...省略
}

由于实现了SmartLifecycle接口,该类会在所有的bean实例化之后被调用其start()方法:

@Override
public void start() {
    // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
    if (this.port.get() != 0) {
        if (this.registration.getNonSecurePort() == 0) {
            this.registration.setNonSecurePort(this.port.get());
        }

        if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
            this.registration.setSecurePort(this.port.get());
        }
    }

    // only initialize if nonSecurePort is greater than 0 and it isn't already running
    // because of containerPortInitializer below
    if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

        this.serviceRegistry.register(this.registration);

        this.context.publishEvent(
                new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
        this.running.set(true);
    }
}

该方法比较简单,就是第一次运行时将客户端信息注册。 至于注册信息具体有哪些,我会专门写篇博客来讲,请期待…

ServiceRegistry

@Override
public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);

    if (log.isInfoEnabled()) {
        log.info("Registering application " + reg.getInstanceConfig().getAppname()
                + " with eureka with status "
                + reg.getInstanceConfig().getInitialStatus());
    }

    reg.getApplicationInfoManager()
            .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

    if (reg.getHealthCheckHandler() != null) {
        reg.getEurekaClient().registerHealthCheck(reg.getHealthCheckHandler());
    }
}

这里最重要的一步是在ApplicationInfoManager中设置当前的状态.那么设置状态又发生了什么呢? 接下来继续看ApplicationInfoManager类:

 public synchronized void setInstanceStatus(InstanceStatus status) {
    InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {
        return;
    }

    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {
        for (StatusChangeListener listener : listeners.values()) {
            try {
                listener.notify(new StatusChangeEvent(prev, next));
            } catch (Exception e) {
                logger.warn("failed to notify listener: {}", listener.getId(), e);
            }
        }
    }
}

instanceStatusMapper.map(status)这一步应该是为未来留下的接口,现有的实现只是return status。instanceInfo.setStatus(next)会将参数next和之前的状态进行比较.如果不一致就实例信息是脏数据isInstanceInfoDirty标记为true,并记录最后修改时间lastDirtyTimestamp,并返回之前的状态:

public synchronized InstanceStatus setStatus(InstanceStatus status) {
    if (this.status != status) {
        InstanceStatus prev = this.status;
        this.status = status;
        setIsDirty();
        return prev;
    }
    return null;
}

如果有状态返回,则表示状态发生了变化,则需要通知所有的监听者发生了StatusChangeEvent事件. > 目前没看到该事件有监听器负责监听并处理,那么服务注册是怎么发送到服务端的呢?继续往下看

public void registerStatusChangeListener(StatusChangeListener listener) {
    listeners.put(listener.getId(), listener);
}

通过查找ApplicationInfoManager的registerStatusChangeListener方法的使用可以看到:

跳进去可以看到:

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
        @Override
        public String getId() {
            return "statusChangeListener";
        }
    
        @Override
        public void notify(StatusChangeEvent statusChangeEvent) {
            if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                    InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                // log at warn level if DOWN was involved
                logger.warn("Saw local status change event {}", statusChangeEvent);
            } else {
                logger.info("Saw local status change event {}", statusChangeEvent);
            }
            instanceInfoReplicator.onDemandUpdate();
        }
    };
    
    if (clientConfig.shouldOnDemandUpdateStatusChange()) {
        applicationInfoManager.registerStatusChangeListener(statusChangeListener);
    }

setInstanceStatus方法中的notify实际上是就做了两件事,1是记录状态的变化,2是调用instanceInfoReplicator.onDemandUpdate()方法。其实现如下:

public boolean onDemandUpdate() {
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        scheduler.submit(new Runnable() {
            @Override
            public void run() {
                logger.debug("Executing on-demand update of local InstanceInfo");

                Future latestPeriodic = scheduledPeriodicRef.get();
                if (latestPeriodic != null && !latestPeriodic.isDone()) {
                    logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                    latestPeriodic.cancel(false);
                }

                InstanceInfoReplicator.this.run();
            }
        });
        return true;
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

简单说明下rateLimiter.acquire(burstSize, allowedRatePerMinute)这个限流是通过你配置参数中的replicationIntervalSeconds来指定的。防止单位时间内状态变化太快。

该方法的主要内容是像scheduler提交一个作业.该作业的内容是将scheduledPeriodicRef中没有完成的作业取消. > 实际上scheduledPeriodicRef中的未完成作业就是scheduler中未完成的作业:

Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);

然后调用this.run():

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

它首先会刷新本地的实例信息discoveryClient.refreshInstanceInfo()(这一步的细节在服务发现博客再细聊),然后通过isInstanceInfoDirty和lastDirtyTimestamp来判断是否是脏数据,如果是的话就调用discoveryClient.register()注册到eureka server中。 然后继续看discoveryClient.register():

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

eurekaTransport.registrationClient.register(instanceInfo)最终会调到AbstractJerseyEurekaHttpClient的register方法:

public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

这里主要是通过rest(url:http://host:port/eureka/apps/{appName})请求将注册信息注册到eureka server中。

小结:总得来说eureka client的服务注册的流程还是挺简单的。通过状态的变化来notify ApplicationInfoManager的匿名StatusChangeListener监听器,监听器通过scheduler调度进行注册客户端实例信息.