本文接上文Apollo核心源码解析(一):Portal发布配置、Admin Service发送ReleaseMessage、Config Service通知客户端
3、Apollo Client
上图简要描述了Apollo客户端的实现原理:
- 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送(通过Http Long Polling实现)
- 客户端还会定时从Apollo配置中心服务端拉取应用的最新配置
- 这是一个fallback机制,为了防止推送机制失效导致配置不更新
- 客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
- 定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property:
apollo.refreshInterval
来覆盖,单位为分钟
- 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
- 客户端会把从服务端获取到的配置在本地文件系统缓存一份
- 在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置
- 应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知
从代码层面讲,Apollo Client从Config Service读取配置。Client的轮询包括两部分:
- RemoteConfigRepository定时轮询Config Service的配置读取
/configs/{appId}/{clusterName}/{namespace:.+}
- 详细请查看
com.ctrip.framework.apollo.configservice.controller.ConfigController
的configs/{appId}/{clusterName}/{namespace:.+}
接口
- 详细请查看
- RemoteConfigLongPollService ,长轮询Config Service的配置变更通知
/notifications/v2
接口- 当有新的通知时,触发RemoteConfigRepository,立即轮询Config Service的配置读取
/configs/{appId}/{clusterName}/{namespace:.+}
接口 - 接口的逻辑,在Config Service通知客户端有详细解析
- 当有新的通知时,触发RemoteConfigRepository,立即轮询Config Service的配置读取
采用推拉结合的模式,client的定时轮询,可以保持最终一致;client的长轮询是定时轮询的实时补充。在Soul网关中的Http长轮询实现数据同步中也是相同的思路
- 一个Namespace对应一个RemoteConfigRepository
- 多个RemoteConfigRepository注册到全局唯一的RemoteConfigLongPollService中
1)、ConfigRepository
public interface ConfigRepository {
/**
* 读取配置
*
* @return config
*/
Properties getConfig();
/**
* 设置上游的Repository.主要用于LocalFileConfigRepository,从Config Service读取配置,缓存在本地文件
*
* @param upstreamConfigRepository
*/
void setUpstreamRepository(ConfigRepository upstreamConfigRepository);
/**
* 添加RepositoryChangeListener
*
* @param listener
*/
void addChangeListener(RepositoryChangeListener listener);
/**
* 移除RepositoryChangeListener
*
* @param listener
*/
void removeChangeListener(RepositoryChangeListener listener);
ConfigRepository的子类如下图:
AbstractConfigRepository
com.ctrip.framework.apollo.internals.AbstractConfigRepository
实现ConfigRepository接口,配置Repository抽象类
public abstract class AbstractConfigRepository implements ConfigRepository {
private static final Logger logger = LoggerFactory.getLogger(AbstractConfigRepository.class);
/**
* RepositoryChangeListener数组
*/
private List<RepositoryChangeListener> m_listeners = Lists.newCopyOnWriteArrayList();
protected PropertiesFactory propertiesFactory = ApolloInjector.getInstance(PropertiesFactory.class);
/**
* 尝试同步
*
* @return 是否同步成功
*/
protected boolean trySync() {
try {
//同步
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
protected abstract void sync();
@Override
public void addChangeListener(RepositoryChangeListener listener) {
if (!m_listeners.contains(listener)) {
m_listeners.add(listener);
}
}
@Override
public void removeChangeListener(RepositoryChangeListener listener) {
m_listeners.remove(listener);
}
/**
* 触发监听器们
*
* @param namespace
* @param newProperties
*/
protected void fireRepositoryChange(String namespace, Properties newProperties) {
//循环RepositoryChangeListener数组
for (RepositoryChangeListener listener : m_listeners) {
try {
//触发监听器
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
}
RemoteConfigRepository
com.ctrip.framework.apollo.internals.RemoteConfigRepository
实现AbstractConfigRepository抽象类,远程配置Repository。实现从Config Service拉取配置,并缓存在内存中。定时 + 实时刷新缓存
1)构造方法
public class RemoteConfigRepository extends AbstractConfigRepository {
private static final Logger logger = LoggerFactory.getLogger(RemoteConfigRepository.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
private final ConfigServiceLocator m_serviceLocator;
private final HttpUtil m_httpUtil;
private final ConfigUtil m_configUtil;
/**
* 远程配置长轮询服务
*/
private final RemoteConfigLongPollService remoteConfigLongPollService;
/**
* 指向ApolloConfig的AtomicReference,缓存配置
*/
private volatile AtomicReference<ApolloConfig> m_configCache;
/**
* namespace名字
*/
private final String m_namespace;
/**
* ScheduledExecutorService对象
*/
private final static ScheduledExecutorService m_executorService;
/**
* 指向ServiceDTO(Config Service信息)的AtomicReference
*/
private final AtomicReference<ServiceDTO> m_longPollServiceDto;
/**
* 指向ApolloNotificationMessages的AtomicReference
*/
private final AtomicReference<ApolloNotificationMessages> m_remoteMessages;
/**
* 加载配置的RateLimiter
*/
private final RateLimiter m_loadConfigRateLimiter;
/**
* 是否强制拉取缓存的标记
* 若为true,则多一轮从Config Service拉取配置
* 为true的原因:RemoteConfigRepository知道Config Service有配置刷新
*/
private final AtomicBoolean m_configNeedForceRefresh;
/**
* 失败定时重试策略
*/
private final SchedulePolicy m_loadConfigFailSchedulePolicy;
private static final Gson GSON = new Gson();
static {
//单线程池
m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("RemoteConfigRepository", true));
}
/**
* Constructor.
*
* @param namespace the namespace
*/
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
//尝试同步配置
this.trySync();
//初始化定时刷新配置的任务
this.schedulePeriodicRefresh();
//注册自己到RemoteConfigLongPollService中,实现配置更新的实时通知
this.scheduleLongPollingRefresh();
}
RemoteConfigRepository构造方法中分别调用了trySync()
尝试同步配置,schedulePeriodicRefresh()
初始化定时刷新配置的任务,scheduleLongPollingRefresh()
注册自己到RemoteConfigLongPollService中实现配置更新的实时通知
2)trySync()
public abstract class AbstractConfigRepository implements ConfigRepository {
/**
* 尝试同步
*
* @return 是否同步成功
*/
protected boolean trySync() {
try {
//同步
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
protected abstract void sync();
RemoteConfigRepository中实现sync()
方法:
public class RemoteConfigRepository extends AbstractConfigRepository {
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
//获得缓存的ApolloConfig对象
ApolloConfig previous = m_configCache.get();
//1)从Config Service加载ApolloConfig对象
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
//若不相等,说明更新了,设置到缓存中
if (previous != current) {
logger.debug("Remote Config refreshed!");
//设置到缓存
m_configCache.set(current);
//2)发布Repository的配置发生变化,触发对应的监听器们
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
private ApolloConfig loadApolloConfig() {
//限流
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
//计算重试次数
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
//获得所有的Config Service的地址
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
//循环读取配置重试次数直到成功 每一次都会循环所有的ServiceDTO数组
for (int i = 0; i < maxRetries; i++) {
//随机所有的Config Service 的地址
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
//优先访问通知配置变更的Config Service的地址 并且获取到时,需要置空,避免重复优先访问
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
//循环所有的Config Service的地址
for (ServiceDTO configService : randomConfigServices) {
//sleep等待,下次从Config Service拉取配置
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
//组装查询配置的地址
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from {}", url);
//创建HttpRequest对象
HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {
//发起请求,返回HttpResponse对象
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
//设置是否强制拉取缓存的标记为false
m_configNeedForceRefresh.set(false);
//标记成功
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
//无新的配置,直接返回缓存的ApolloConfig对象
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
}
//有新的配置,进行返回新的ApolloConfig对象
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
if(ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}
sync()
方法中代码1)处调用loadApolloConfig()
实现从Config Service加载配置
代码2)处调用AbstractConfigRepository#fireRepositoryChange(namespace, newProperties)
方法发布Repository的配置发生变化,触发对应的监听器们
3)schedulePeriodicRefresh()
public class RemoteConfigRepository extends AbstractConfigRepository {
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
//创建定时任务,定时刷新配置
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
//尝试同步配置
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
4)scheduleLongPollingRefresh()
public class RemoteConfigRepository extends AbstractConfigRepository {
private void scheduleLongPollingRefresh() {
//将自己注册到RemoteConfigLongPollService中,实现配置更新的实时通知
//当RemoteConfigLongPollService长轮询到该RemoteConfigRepository的Namespace下的配置更新时,会回调onLongPollNotified()方法
remoteConfigLongPollService.submit(m_namespace, this);
}
/**
* 当长轮询到配置更新时,发起同步配置的任务
*
* @param longPollNotifiedServiceDto
* @param remoteMessages
*/
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
//设置长轮询到配置更新的Config Service 下次同步配置时,优先读取该服务
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
//提交同步任务
m_executorService.submit(new Runnable() {
@Override
public void run() {
//设置是否强制拉取缓存的标记为true
m_configNeedForceRefresh.set(true);
//尝试同步配置
trySync();
}
});
}
2)、RemoteConfigLongPollService
com.ctrip.framework.apollo.internals.RemoteConfigLongPollService
远程配置长轮询服务。负责长轮询Config Service的配置变更通知/notifications/v2
接口。当有新的通知时,触发 RemoteConfigRepository ,立即轮询Config Service的配置读取/configs/{appId}/{clusterName}/{namespace:.+}
接口
构造方法:
public class RemoteConfigLongPollService {
private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLongPollService.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
private static final long INIT_NOTIFICATION_ID = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
//90 seconds, should be longer than server side's long polling timeout, which is now 60 seconds
private static final int LONG_POLLING_READ_TIMEOUT = 90 * 1000;
/**
* 长轮询ExecutorService
*/
private final ExecutorService m_longPollingService;
/**
* 是否停止长轮询的标识
*/
private final AtomicBoolean m_longPollingStopped;
/**
* 失败定时重试策略
*/
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
/**
* 长轮询的RateLimiter
*/
private RateLimiter m_longPollRateLimiter;
/**
* 是否长轮询已经开始的标识
*/
private final AtomicBoolean m_longPollStarted;
/**
* 长轮询的Namespace Multimap缓存
* key:namespace的名字
* value:RemoteConfigRepository集合
*/
private final Multimap<String, RemoteConfigRepository> m_longPollNamespaces;
/**
* 通知编号Map缓存
* key:namespace的名字
* value:最新的通知编号
*/
private final ConcurrentMap<String, Long> m_notifications;
/**
* 通知消息Map缓存
* key:namespace的名字
* value:ApolloNotificationMessages 对象
*/
private final Map<String, ApolloNotificationMessages> m_remoteNotificationMessages;//namespaceName -> watchedKey -> notificationId
private Type m_responseType;
private static final Gson GSON = new Gson();
private ConfigUtil m_configUtil;
private HttpUtil m_httpUtil;
private ConfigServiceLocator m_serviceLocator;
/**
* Constructor.
*/
public RemoteConfigLongPollService() {
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigLongPollService", true));
m_longPollStarted = new AtomicBoolean(false);
m_longPollNamespaces =
Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
m_notifications = Maps.newConcurrentMap();
m_remoteNotificationMessages = Maps.newConcurrentMap();
m_responseType = new TypeToken<List<ApolloConfigNotification>>() {
}.getType();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
}
submit()
:
public class RemoteConfigLongPollService {
/**
* 提交RemoteConfigRepository到长轮询任务
*
* @param namespace
* @param remoteConfigRepository
* @return
*/
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
//添加到m_longPollNamespaces中
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
//添加到m_notifications中
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
//若未启动长轮询定时任务,进行启动
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}
startLongPolling()
启动长轮询任务:
public class RemoteConfigLongPollService {
/**
* 启动长轮询任务
*/
private void startLongPolling() {
//CAS设置长轮询任务已经启动 若已经启动,不重复启动
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
//获得长轮询任务的初始化延迟时间,单位毫秒
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
//提交长轮询任务 该任务会持续且循环执行
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
//初始等待
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
//执行长轮询
doLongPollingRefresh(appId, cluster, dataCenter, secret);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
doLongPollingRefresh()
执行长轮询:
public class RemoteConfigLongPollService {
/**
* 执行长轮询
*
* @param appId
* @param cluster
* @param dataCenter
* @param secret
*/
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
//循环执行,直到停止或线程中断
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
//限流
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
//获得Config Service的地址
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
//组装长轮询通知变更的地址
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
//创建HttpRequest对象,并设置超时时间
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
//发起请求,返回HttpResponse对象
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
//有新的通知,刷新本地的缓存
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
//通知对应的RemoteConfigRepository们
notify(lastServiceDto, response.getBody());
}
//无新的通知,重置连接的Config Service的地址,下次请求不同的Config Service,实现负载均衡
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
//标记成功
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
notify()
通知对应的RemoteConfigRepository们:
public class RemoteConfigLongPollService {
/**
* 通知对应的RemoteConfigRepository们
*
* @param lastServiceDto
* @param notifications
*/
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
//循环ApolloConfigNotification
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
//创建RemoteConfigRepository数组,避免并发问题
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
//获得远程的ApolloNotificationMessages对象并克隆
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
//循环RemoteConfigRepository进行通知
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
//进行通知
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}
3)、和Spring集成的原理
Spring从3.1版本开始增加了ConfigurableEnvironment
和PropertySource
:
- ConfigurableEnvironment
- Spring的ApplicationContext会包含一个Environment(实现ConfigurableEnvironment接口)
- ConfigurableEnvironment自身包含了很多个PropertySource
- PropertySource
- 属性源
- 可以理解为很多个Key - Value的属性配置
在运行时的结构形如:
需要注意的是,PropertySource之间是有优先级顺序的,如果有一个Key在多个property source中都存在,那么在前面的property source优先
所以对上图的例子:
- env.getProperty(“key1”) -> value1
- env.getProperty(“key2”) -> value2
- env.getProperty(“key3”) -> value4
Apollo和Spring/Spring Boot集成原理:在应用启动阶段,Apollo从远端获取配置,然后组装成PropertySource并插入到第一个即可,如下图所示:
4、配置中心通用设计模型
1)、配置分级
配置中心的配置是需要分级的,提高复用行。比如支持存储全局配置、机房配置和节点配置。其中,节点配置优先级高于机房配置,机房配置优先级高于全局配置。也就是说,优先读取节点的配置,如果节点配置不存在,再读取机房配置,最后读取全局配置
2)、变更推送如何实现
配置信息存储之后,需要考虑如何将配置的变更推送给服务端,实现配置的动态变更。一般会有两种思路来实现变更推送:一种是轮询查询的方式(拉模式),一种是长轮询推送的方式(推模式)
拉模式:
轮询查询就是应用程序向配置中心客户端注册一个监听器,配置中心的客户端定期地(比如1分钟)查询所需要的配置是否有变化,如果有变化则通知触发监听器,让应用程序得到变更通知
那如何比较配置是否发生了变更呢?总不能每次请求都把所需要的配置全量返回吧,那么配置中心服务的带宽就会成为瓶颈
给配置中心的每一个配置项多存储一个根据配置项计算出来的MD5值。配置项一旦变化,这个MD5值也会随之改变。配置中心客户端在获取到配置的同时,也会获取到配置的MD5值,并且存储起来。那么在轮询查询的时候,需要先确认存储的MD5值和配置中心的MD5是不是一致的。如果不一致,这就说明配置中心里存储的配置项有变化,然后才会从配置中心拉取最新的配置
推模式:
推模式通常采用HTTP长轮询的方式,在配置中心服务端保存每个连接关注的配置项列表。当配置中心感知到配置变化后,就可以通过这个连接把变更的配置推送给客户端。这种方式需要保持长连,也需要保存连接和配置的对应关系,实现上要比轮询的方式复杂一些,但是相比轮询方式来说,能够更加实时地获取配置变更的消息
Apollo采用推拉结合的模式,client的定时轮询,可以保持最终一致;client的长轮询是定时轮询的实时补充
3)、如何保证配置中心高可用
配置中心客户端在获取到配置信息后,会同时把配置信息同步地写入到内存缓存,并且异步地写入到文件缓存中。内存缓存的作用是降低客户端和配置中心的交互频率,提升配置获取的性能;而文件的缓存的作用就是灾备,当应用程序重启时,一旦配置中心发生故障,那么应用程序就会优先使用文件中的配置,这样虽然无法得到配置的变更消息(因为配置中心已经宕机了),但是应用程序还是可以启动起来的,算是一种降级的方案
参考:
https://ctripcorp.github.io/apollo/#/zh/design/apollo-design
https://www.iocoder.cn/Apollo
https://time.geekbang.org/column/article/175164