主要功能说明
监控节点应用配置文件nodes.txt,若文件有变化(监控文件最后修改时间),载入变化的目标监控节点。
应用将定期请求http连接,接收回应,完成一次心跳检测,支持失败次数和发送间隔配置,以及错误处理和回应信息处理的接口配置。
环境依赖
oracle jdk 1.8
http-client-4.5.2
http-core-4.4.4
nodes.txt格式
每行形如
127.0.0.1:80,81,82
1
主要类说明
FileChangeWatcher 继承Thread,每间隔一段时间检查文件是否被修改。并产生动作listener.doFileChange();
@Override
public void run() {
long last = getLastModified();
while(true){
long tmp = getLastModified();
if(tmp != last){
if(listener != null){
listener.doFileChange();
}
last = tmp;
}
try {
TimeUnit.MILLISECONDS.sleep(timeMs);
} catch (InterruptedException e) {
logger.error("interrupted", e);
}
}
}
HostStore 接口,提供host信息列表
public interface HostStore {
Map<String, Set<Integer>> getHostAsMap();
List<String> getHostAsList();
}
ChangableHostStore
public interface ChangableHostStore extends HostStore{
interface HostChangeListener{
void doHostChange();
}
void setHostChangeListener(HostChangeListener listener);
}
FileHostStore 实现ChangableHostStore接口,提供解析nodes.txt文件的功能
public Map<String, Set<Integer>> loadHostFromFile(File f){
Map<String, Set<Integer>> result = new HashMap<>();
try(BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(f)))){
String tmp;
while((tmp = br.readLine()) != null){
String[] split = tmp.split(":");
String[] split1 = split[1].split(",");
Set<Integer> collect = Stream.of(split1)
.map(Integer::decode)
.collect(Collectors.toSet());
if(result.containsKey(split[0])){
result.get(split[0]).addAll(collect);
}else{
result.put(split[0], collect);
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
AppMonitor主要的心跳检测入口
构造参数
public AppMonitor(File file, int failRetryTime, int intervalSendMs){
this.file = file;//监测的文件nodes.txt
this.failRetryTime = failRetryTime;//连接失败重试次数
this.intervalSendMs = intervalSendMs; //心跳包发送间隔时间
isHostUpdated = new AtomicBoolean(false);//节点列表是否更新标志
removeIfFail = false;//失败节点是否不再监测
client = newClient();//httpclient创建
hostStore = new FileHostStore(file, this);//专门负责监测文件变化
eventThreadPool = Executors.newFixedThreadPool(EVENTTHREADNUM);//事件处理线程池
loadHostList();//载入节点
}
重头戏,启动监控任务
public void start(){
String host;
while(true) {
while (hostList.size() == 0);
/*
同步锁定hostList,还原isHostUpdated的状态
*/
synchronized (hostList){
if (hostList.size() == 0)
continue;
host = hostList.remove();
isHostUpdated.set(false);
}
int restTry = failRetryTime + 1;//剩余的连接次数
do {
HttpGet httpGet = new HttpGet("http://" + host);
try (CloseableHttpResponse response = client.execute(httpGet)){
if (response.getStatusLine().getStatusCode() == 200) {//这里只视正常相应的为心跳检测成功!
if(needDealRtnMsg) {
String msg = readMsgFromResponse(response);
submitRtnDeal(msg);//向事件处理池提交返回信息处理
}
}else{
continue;
}
} catch (IOException e) {
logger.error("IOException");
continue;
}
break;//若会执行到此,表示心跳检测成功,那么restTry必然大于0,以下的失败处理用的是逆否命题!
}while(--restTry > 0);
if(restTry <= 0){
submitFailDeal(host);//向事件处理池提交错误处理
}
try {
TimeUnit.MILLISECONDS.sleep(intervalSendMs);
} catch (InterruptedException e) {
logger.error("wrong occur in sleeping", e);
}
if(removeIfFail && restTry <= 0){
continue;
}
/*
锁定同步hostList,在两次锁定hostlist被更改,那么,丢掉这个host
*/
synchronized (hostList) {
if (isHostUpdated.get()) {
continue;
}
hostList.add(host);
}
}
}
资源区(7分)
代码多数有jdk1.8的新语法,看不习惯?你来打我啊
资源太贵?没时间重构,写得渣,害羞 →_→
java code