最近在忙于搭建公司内部的应用日志分析系统,鉴于公司架构的要求,所有的服务都必须双活。对于应用日志分析系统,现在普遍采用的都是ELK stack的框架,前端部分的kibana是必选项。而如果需要将kibana做成双活,则两个kibana之间需要做一定的同步(因为共享一个elasticsearch集群作为数据存储,则两个kibana之间在执行alarm,report等动作时,需要一定的同步机制,避免同一个alarm或者report在两台服务器上发两遍)。当然,同步的方法有很多,你可以选择直接使用elasticsearch作为同步的服务器,不过这会涉及到更多的开发工作和数据的设计工作。我是一个懒人,毕竟日志分析系统上使用到了kafka和zookeeper,那还是用zookeeper来做分布式同步吧,毕竟这货就是拿来干这个的。
但kibana是一个nodejs开发的应用,弄起来还是不如java顺手,这里分享一下,其中踩过的一些坑。
需用同步的是kibana是的一个插件sentinl。这是一个类似于x-pack的watcher的插件,用于告警和报告服务。因为安装了两个kibana服务器,一旦订制了一条告警规则,规则会被存储于elasticsearch,两个服务器都会定时出发,造成同一个告警双发的问题。为了解决这个问题,需要在两台服务器间做同步。因为kibana是用node开发,因此需要使用到node上的node-zookeeper。这里需要注意的是,该模块是一个node native module,需要在对应的平台上编译,具体方法见之前的两篇博文:用native C++模块扩展Node.js和使用docker编译不同平台上的node native module。
栗子
先来看个简单的例子,该例子是node-zookeeper里自带的test.js:
var ZooKeeper = require ("zookeeper");
var zk = new ZooKeeper({
connect: "localhost:8888" // zk server的服务器地址和监听的端口号
,timeout: 200000 // 以毫秒为单位
,debug_level: ZooKeeper.ZOO_LOG_LEVEL_WARN
,host_order_deterministic: false
});
zk.connect(function (err) {
if(err) throw err;
console.log ("zk session established, id=%s", zk.client_id);
zk.a_create ("/node.js1", "some value", ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path) {
if (rc != 0) {
console.log ("zk node create result: %d, error: '%s', path=%s", rc, error, path);
} else {
console.log ("created zk node %s", path);
process.nextTick(function () {
zk.close ();
});
}
});
});
其中:
- connect: 包含主机名和ZooKeeper服务器的端口。
- timeout:以毫秒为单位,表示ZooKeeper等待客户端通信的最长时间,之后会声明会话已死亡。ZooKeeper的会话一般设置超时时间5-10秒。
- debug_level:设置日志的输出级别,有四种级别:ZOO_LOG_LEVEL_ERROR, ZOO_LOG_LEVEL_WARN, ZOO_LOG_LEVEL_INFO, ZOO_LOG_LEVEL_DEBUG
- host_order_deterministic: 初始化zk客户端实例后,该实例是否是按确定顺序去连接ZooKeeper Server集群中的主机,直到连接成功,或者该会话被断开。
常见API
- connect():连接ZooKeeper Server
- a_create (path, data, flags, path_cb): 创建一个znode,并赋值,可以决定这个znode的节点类型(永久、临时、永久有序、临时有序)
- a_get(path, watch, data_cb): path: 我们想要获取数据的zonde节点路径。 watch: 表示我们是否想要监听该节点后续的数据变更。data_cb(rc ,error, stat, data): rc:return code,0为成功。 error:错误信息。stat:znode的元数据信息。data: znode中的数据。
- a_set( path, data, version, stat_cb ): 需要注意的是,ZooKeeper并不允许局部写入或读取znode的数据,当设置一个znode节点的数据或读取时,znode节点的内容或被整个替换或全部读取出来。path: 我们想要设置数据的zonde节点路径。data:我们想要设置的数据,一个znode节点可以包含任何数据,数据存储为字节数组(byte array)。字节数组的具体格式特定于每个应用的实现,ZooKeeper不直接提供解析的支持,用户可以使用如Protobuf、Thrift、Avro或MessagePack等序列化协议来处理保存在znode中的数据格式,一般UTF-8编码的字符串就够用了。version:znode的version,从stat中抽取出来的。data_cb(rc, error, stat): 设置数据的回调。
- close(): 关闭客户端连接
- a_exists(path, watch, stat_cb): 判断znode是否存在
- adelete( path, version, voidcb ):删除znode,结尾加上”“是为了不和保留字”delete”冲突。。。
- a_get_children(path, true, function (rc, error, children_cb): 获取path指定的节点下的子节点,返回的是一个未排序的数组
使用zookeeper做锁
创建一个zookeeperHelper.js
'use strict'
const ZooKeeper = require('zookeeper');
const Promise = require('bluebird');
const _ = require('lodash');
'development';
let connect = 'localhost:2181';
let timeout = 20000; // client的超时时间,单位毫秒。server端在该时间内没有收到心跳会判断客户端掉线
let path = '/sentinl';
let debug_level = ZooKeeper.ZOO_LOG_LEVEL_WARN;
let host_order_deterministic = false;
let defaultInitOpt = {
connect,
timeout,
debug_level,
host_order_deterministic
};
class ZK {
constructor(opt) {
this.opt = opt;
this._initZk();
}
_initZook() {
this.zookeeper = new ZooKeeper(this.opt || defaultInitOpt);
}
_initZk() {
this.zookeeper = new ZooKeeper(this.opt || defaultInitOpt);
}
registZk() {
let self = this;
self.zookeeper.connect(function (err, client) {
if (err) throw err;
console.log('zk session established, id=%s', self.zookeeper.client_id);
self.client = client;
// create parent node
client.a_create(path, null, ZooKeeper.ZOO_PERSISTENT, function (rc, error, path) {
if (rc != 0) {
console.log("zk node create result: %d, error: '%s', path=%s", rc, error, path);
} else {
console.log("created zk node %s", path);
}
})
// create children node
client.a_create(path + '/' + 'alarm', null, ZooKeeper.ZOO_SEQUENCE | ZooKeeper.ZOO_EPHEMERAL, function (rc, error, path) {
if (rc != 0) {
console.log("zk node create result: %d, error: '%s', path=%s", rc, error, path);
} else {
let pathArr = path.split('/');
self.node = pathArr[pathArr.length - 1];
console.log("mynode is %s", self.node);
}
})
})
}
getLock() {
let self = this;
return new Promise((resolve, reject) => {
console.log('zk session established, id=%s', self.zookeeper.client_id);
self.client.a_get_children(path, true, function (rc, error, children) {
if (rc !== 0) {
console.log('zk node get result: %d, error: "%s", stat=%s, children=%s', rc, error, stat, children);
} else {
children = children.sort();
console.log('get zk children: ' + children);
if (children[0] === self.node) {
resolve(true);
}
else {
resolve(false);
}
}
})
});
}
}
module.exports = ZK;
将该模块拷贝到sentinl的server/lib/目录。同时更新sentinl的package.json文件。加入以下dependency
"zookeeper": "^3.4.9"
通过以下命令,下载zookeeper:
npm update
修改schedule.js文件:
import ZookeeperHelper from './zookeeperHelper';
/**
* Schedules and executes watchers in background
*/
export default function Scheduler(server) {
const config = getConfiguration(server);
let watcher;
let client;
const zkHelper = new ZookeeperHelper();
...
/* Run Watcher in interval */
server.sentinlStore.schedule[task._id].later = later.setInterval(() => {
zkHelper.getLock().then(function (result) {
if(result)
{
server.log(['status', 'info', 'Sentinl', 'zookeeper'],
'get the lock, trigger watcher ' + task._id);
watching(task);
}
else {
server.log(['status', 'info', 'Sentinl', 'zookeeper'],
'not get the lock');
}
}, function (error) {
server.log(['status', 'error', 'Sentinl', 'scheduler'],error);
})
}, interval);
这样,就可以通过一个简单的zookeeper锁,保证同一个时刻只有一个kibana服务器能够响应告警和报告服务