学习目标:
熟悉 soul 的数据同步机制中的 zk 同步
学习内容:
1.zk数据同步原理
2.配置
3.源码分析
学习时间:2021年1月23号 早6-8点
学习产出:
-
zk数据同步原理
主要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。
zk节点信息:
-
配置
admin配置修改
soul:
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
bootstrap配置修改:
soul :
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
依赖:
<!--soul data sync start use zookeeper-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
-
源码分析
admin启动过程,zk相关的实例化:- ZookeeperConfiguration 实例化zkClient
- DataSyncConfiguration 实例化zookeeperDataChangedListener,并注入zkClient,坚挺zkClient的变化
- ZookeeperDataInit zk初始化,注入监听对象
bootstrap启动过程
1. ZookeeperSyncDataConfiguration 加载初始化zkClient,
@Bean public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) { return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout()); }
初始化ZookeeperSyncDataService,注入zkclient,并注入监听对象:
pluginSubscriber,metaSubscribers, authSubscribers@Bean public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use zookeeper sync soul data......."); return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); }
-
启动soul-examples-http,修改admin后台数据观察zk节点变化
启动前节点信息:
启动后:
admin后台修改数据,数据同步源码解读
修改SelectorList
ZookeeperDataChangedListener 监听数据变化private void upsertZkNode(final String path, final Object data) { if (!zkClient.exists(path)) { zkClient.createPersistent(path, true); } zkClient.writeData(path, data); }
问题点1:
zk节点变更为什么没有 触发CommonPluginDataSubscriber 讲节点信息同步到本地的操作?
```
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cachePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) {
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}
```