在前面的文章中介绍了zk的数据同步方式,但是当时只是从soul-bootstrap这个工程的角度去解析的。但是soul-bootstrap的启动也是要依赖于soul-admin的,那么本篇文章就继续从soul-admin工程的角度来看看。
关于zk数据同步的方式,从soul-admin的角度来解析的话,它数据同步的答题步骤如下:
- 启动构建ZookeeperDataInit类
- 执行ZookeeperDataInit类中的run方法(源头是SpringApplication.run(…)方法)
- 执行ZookeeperDataInit类中的run方法中的syncAll方法
- 最后是元数据的同步(中间也涉及其他的数据同步:appAuth、plugin、selector、ruleData)
DataSyncConfiguration
前面提到了ZookeeperDataInit的初始化构建,而它的构建是在DataSyncConfiguration配置类中进行的。那就看看构建的代码:
@Configuration
public class DataSyncConfiguration {
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
}
上面@ConditionalOnProperty注解的意思是指,可以通过配置文件来判断configuration是否需要被注入。@Import注解的意思应该都是了解的,这里就不提了。@ConditionalOnMissingBean注解是指当容器中存在这个bean时就不需要再构建了。
ZookeeperConfiguration
我们有看到@Import注解导入了ZookeeperConfiguration的配置类,这里就捎带的看一下,代码如下:
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
}
}
这里看到了@EnableConfigurationProperties这个注解,这个注解的作用是开启@ConfigurationProperties,而@ConfigurationProperties的作用是将配置文件转换成类对象,便于修改或者获取值。
既然如此,那就可以来看看ZookeeperProperties 这个类,如下:
@Data
@ConfigurationProperties(prefix = "soul.sync.zookeeper")
public class ZookeeperProperties {
private String url;
private Integer sessionTimeout;
private Integer connectionTimeout;
private String serializer;
}
从上面的代码变印证了前面对@EnableConfigurationProperties注解的解释了。这里@ConfigurationProperties注解里的prefix所对应的参数便在application.yml文件中。
ZookeeperDataChangedListener
这个类就是在前面说的ZookeeperConfiguration配置类中进行构建的,代码如下:
public class ZookeeperDataChangedListener implements DataChangedListener {
private final ZkClient zkClient;
public ZookeeperDataChangedListener(final ZkClient zkClient) {
this.zkClient = zkClient;
}
// 此方法在数据变化后,Spring事件发布器发布后,在onApplication方法中进行调用的
@Override
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
for (AppAuthData data : changed) {
String appAuthPath = ZkPathConstants.buildAppAuthPath(data.getAppKey());
// delete
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPath(appAuthPath);
continue;
}
// create or update
insertZkNode(appAuthPath, data);
}
}
// 同上
@SneakyThrows
@Override
public void onMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType) {
for (MetaData data : changed) {
String metaDataPath = ZkPathConstants.buildMetaDataPath(URLEncoder.encode(data.getPath(), "UTF-8"));
// delete
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPath(metaDataPath);
continue;
}
// create or update
insertZkNode(metaDataPath, data);
}
}
// 同上
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
for (PluginData data : changed) {
String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
// delete
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPathRecursive(pluginPath);
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
deleteZkPathRecursive(selectorParentPath);
String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
deleteZkPathRecursive(ruleParentPath);
continue;
}
//create or update
insertZkNode(pluginPath, data);
}
}
// 同上
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(changed.get(0).getPluginName());
deleteZkPathRecursive(selectorParentPath);
}
for (SelectorData data : changed) {
String selectorRealPath = ZkPathConstants.buildSelectorRealPath(data.getPluginName(), data.getId());
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPath(selectorRealPath);
continue;
}
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getPluginName());
createZkNode(selectorParentPath);
//create or update
insertZkNode(selectorRealPath, data);
}
}
@Override
public void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
String selectorParentPath = ZkPathConstants.buildRuleParentPath(changed.get(0).getPluginName());
deleteZkPathRecursive(selectorParentPath);
}
for (RuleData data : changed) {
String ruleRealPath = ZkPathConstants.buildRulePath(data.getPluginName(), data.getSelectorId(), data.getId());
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPath(ruleRealPath);
continue;
}
String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getPluginName());
createZkNode(ruleParentPath);
//create or update
insertZkNode(ruleRealPath, data);
}
}
private void insertZkNode(final String path, final Object data) {
createZkNode(path);
zkClient.writeData(path, data);
}
private void createZkNode(final String path) {
if (!zkClient.exists(path)) {
zkClient.createPersistent(path, true);
}
}
private void deleteZkPath(final String path) {
if (zkClient.exists(path)) {
zkClient.delete(path);
}
}
private void deleteZkPathRecursive(final String path) {
if (zkClient.exists(path)) {
zkClient.deleteRecursive(path);
}
}
}
上面代码是ZookeeperDataChangedListener 这个类的所有代码,至于其实现的接口这里也不多少了。我们还是看看这里的onXXX这些方法具体的调用位置,如下:
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
// 省略n行代码
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
// 省略n行代码
}
上面的onApplicationEvent方法便是接收事件后,然后根据其GoupKey,进行分发处理,这里的getSource就是具体的插件数据,另外一个是事件类型。
ZookeeperDataInit
这个类是进行数据初始化的类,因为从命名上就可以看出来,代码如下:
public class ZookeeperDataInit implements CommandLineRunner {
private final ZkClient zkClient;
private final SyncDataService syncDataService;
// 构造函数
public ZookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
this.zkClient = zkClient;
this.syncDataService = syncDataService;
}
// 这里的调用是源于 SpringApplication.run(....)
@Override
public void run(final String... args) {
String pluginPath = ZkPathConstants.PLUGIN_PARENT;
String authPath = ZkPathConstants.APP_AUTH_PARENT;
String metaDataPath = ZkPathConstants.META_DATA;
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
}
从run方法可以看到有调用syncAll这个方法,那么就看看这个方法:
@Service("syncDataService")
public class SyncDataServiceImpl implements SyncDataService {
// ......
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
// .........
}
上面代码中其实主要看metaDataService.syncData()就好,如下:
@Slf4j
@Service("metaDataService")
public class MetaDataServiceImpl implements MetaDataService {
// ......
@Override
public void syncData() {
List<MetaDataDO> all = metaDataMapper.findAll();
if (CollectionUtils.isNotEmpty(all)) {
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all)));
}
}
// .......
}
从上面的代码中我们可以看到eventPublisher调用了publishEvent方法,用于发布事件。
SyncDataServiceImpl
在这个类中我们只要看如下代码即可:
public void syncData() {
List<MetaDataDO> all = metaDataMapper.findAll();
if (CollectionUtils.isNotEmpty(all)) {
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all)));
}
}
此方法是用于手动更新数据时调用的,代码如下:
@RestController
@RequestMapping("/plugin")
public class PluginController {
// ......
@PostMapping("/syncPluginAll")
public SoulAdminResult syncPluginAll() {
boolean success = syncDataService.syncAll(DataEventTypeEnum.REFRESH);
if (success) {
return SoulAdminResult.success(SoulResultMessage.SYNC_SUCCESS);
} else {
return SoulAdminResult.error(SoulResultMessage.SYNC_FAIL);
}
}
// .......
}
总结
本篇文章大体介绍了soul-admin端启动时,做了一些调用流程。启动时构建zk的初始化类,并回调了run方法,在这个方法里做了全部数据的同步。然后是介绍了手动调用,在前端更新数据时,然后发布事件,最后处理事件。