Source
public class FlinkNacosSource extends RichSourceFunction<String> {
private Properties properties;
private ConfigService configService;
private String config;
private String dataId;
private String group;
private String serverAddr;
private static final long NACOS_TIMEOUT_MS=5000L;
public FlinkNacosSource() { }
public FlinkNacosSource(Properties properties, String dataId, String group,String serverAddr) {
this.properties = properties;
this.dataId = dataId;
this.group = group;
this.serverAddr = serverAddr;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if(properties != null){
configService = NacosFactory.createConfigService(properties);
}else if(!org.apache.commons.lang3.StringUtils.isBlank(serverAddr)){
configService = NacosFactory.createConfigService(serverAddr);
}else {
throw new RuntimeException("I have to know the absolute address of Nacos,Otherwise, it will not work");
}
config = configService.getConfig(dataId,group,NACOS_TIMEOUT_MS);
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
config = configInfo;
}
});
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while(true){
Thread.sleep(2000L);
String[] split = config.split("\r\n");
for (String s : split) {
String[] split1 = s.split("=");
String key = split1[0];
String value = split1[1];
System.out.println(key +"="+value);
}
sourceContext.collect(String.valueOf(System.currentTimeMillis()));
}
}
@Override
public void cancel() {
//可以写钉钉报警
}
public static class Builder {
private Properties properties;
private String dataId;
private String group;
private String serverAddr;
public Builder setProperties(Properties properties) {
this.properties = properties;
return this;
}
public Builder setDataId(String dataId) {
this.dataId = dataId;
return this;
}
public Builder setGroup(String group) {
this.group = group;
return this;
}
public Builder setIp(String serverAddr) {
this.serverAddr = serverAddr;
return this;
}
public FlinkNacosSource builder() {
return new FlinkNacosSource(this.properties, this.dataId, this.group,this.serverAddr);
}
}
}