1、首先设置一个观察者类,方便获取zookeeper中的信息
package com.topologyauditdemo;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Observer {
private ZooKeeper zk = null;
public Observer(String zookeeperUrl, int sessionTimeout) throws Exception {
zk = new ZooKeeper(zookeeperUrl, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到watch通知后的回调函数
System.out.println("event.getType() is : " + event.getType() + ",event.getPath() is : " + event.getPath() + "!");
//因为监听器只会监听一次,这样可以一直监听,且只监听"/"目录
try {
zk.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public List<String> getChildren(String znodePath) throws Exception {
//获取子节点
List<String> children = zk.getChildren(znodePath, true);
return children;
}
/**
* 获取znode数据
* @throws Exception
*/
public String getData(String znodePath) throws Exception {
byte[] data = zk.getData(znodePath, false, new Stat());
return new String(data);
}
public void close(){
try {
zk.close();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、从zookeeper中获取数据
package com.topologyauditdemo;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import com.alibaba.fastjson.JSONArray;
import com.travelsky.common.TopObject;
import com.travelsky.common.Utils;
public class Entry {
public static void main(String[] args) {
try {
//定义所需要统计的topology作业号的json集合
String topListPath = "D:\\TopListPath.json";
//配置文件,连接ldap的用户名、密码、和zookeeper地址等
String confPath = "D:\\EnvConfig.json";
//读取作业号,转换成String类型
String configContent = Utils.readFile(topListPath, 200);
//将读取到的作业号,转换成List集合存储
List<String> topList = JSONArray.parseArray(configContent, String.class);
//设置监听器,监听的zookeeper地址和超时时间
Observer ob = new Observer("10.*.*.*:2181,10.*.*.*:2181,10.*.*.*:2181", 10000);
//定义集合,用来存储子分区的路径
List<String> topologyChildrenPartitionPaths = new ArrayList<String>();
//遍历这些所需要的作业号
for(String top : topList){
//拼接作业号
String topologyPath = "/storm-kafka-topology/" + top;
//获取作业号下的分区,具体的就是partition1之类的
List<String> topologyChildrenPartition = ob.getChildren(topologyPath);
//遍历该作业号下的所有子分区
for(String topologyChildren : topologyChildrenPartition){
//绝对路径的拼接,具体到了zookeeper下的监听目录以及分区情况
String path = topologyPath +"/" + topologyChildren;
//将Path添加到了集合中,以备后面使用
topologyChildrenPartitionPaths.add(path);
}
}
//起始时间
long startTimeStamp = System.currentTimeMillis();
//为了获得时间间隔
int count =0;
long timeWindow = 20000L;
boolean flag = true;
//创建这个对象,并把所需要的属性添加进去
List<TopObject> topPartitionList = new ArrayList<TopObject>();
while(flag){
count++;
for(String topologyChildrenPartitionPath :topologyChildrenPartitionPaths){
TopObject topObj = Utils.genTopObject(ob,startTimeStamp, timeWindow * count, topologyChildrenPartitionPath);
Utils.getTopicInformation(ob, topObj);
topPartitionList.add(topObj);
}
//创建HbaseEntry对象
HbaseEntry he = new HbaseEntry();
//将配置文件信息,表名,和具体所需要的信息存入进去
he.putTopObjToHbase(confPath, "STORM-OFFSET-MONITOR", topPartitionList);
//执行完进行clear操作
topPartitionList.clear();
//startTimeStamp 和 currentTimeStamp虽然都是当前的系统时间,但是因为过程中还要执行别的代码逻辑,所以两者之间会有差值
long currentTimeStamp = System.currentTimeMillis();
//让它sleep一段时间,主要是为了时间间隔
Thread.sleep(startTimeStamp + timeWindow * count - currentTimeStamp);
if(func()){
break;
}
}
ob.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
static boolean func(){
return false;
}
}
3、hbase的入口代码
package com.topologyauditdemo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import com.travelsky.common.ConnectionUtils;
import com.travelsky.common.TopObject;
import com.travelsky.common.Utils;
public class HbaseEntry {
static byte[] ROWKEY_SEPARATOR = "+".getBytes();
static byte[] COLUMN_FAMILY = "t".getBytes();
public void putTopObjToHbase(String confPath, String tableName , List<TopObject> topObList) {
Configuration conf = new Configuration();
//加载配置文件信息
Utils.loadEnvProperties(conf, confPath);
Connection connection = null;
Table table = null;
try {
//创建连接对象
User user = ConnectionUtils.createUser(conf);
connection = ConnectionFactory.createConnection(conf, user);
table = connection.getTable(TableName.valueOf(tableName));
BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i++) {
System.out.println("Failed to sent put " + e.getRow(i) + ".");
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName)).listener(listener);
params.writeBufferSize(1000000L);
BufferedMutator mutator = connection.getBufferedMutator(params);
for(TopObject topOb :topObList){
//设计行键
byte[] topologyNameBytes = Bytes.add(topOb.getTopologyName().getBytes(), ROWKEY_SEPARATOR);
byte[] partitionNumBytes = Bytes.add(Bytes.toBytes(topOb.getPartitionNum()), ROWKEY_SEPARATOR);
byte[] startTimeBytes = Bytes.add(Bytes.toBytes(topOb.getStartTimeStamp()), ROWKEY_SEPARATOR);
byte[] topicNameBytes = topOb.getTopic().getBytes();
byte[] prefix = Bytes.add(topologyNameBytes, partitionNumBytes, startTimeBytes);
byte[] rowkey = Bytes.add(prefix, topicNameBytes);
Put put = new Put(rowkey);
//设计列族和列
put.addColumn(COLUMN_FAMILY, Bytes.toBytes(topOb.getTimeFromStart()), Bytes.toBytes(topOb.getkOffset() - topOb.getsOffset()));
mutator.mutate(put);
}
mutator.flush();
} catch (IOException e) {
e.printStackTrace();
} finally{
try {
table.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
4、写了一个入口程序,让写进hbase中的字节码文件转换成表格的形式展现出来
package com.topologyauditexport;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import com.travelsky.common.ConnectionUtils;
import com.travelsky.common.Utils;
public class Entry {
//表格中的,的字节码
static byte[] FILE_SAPERATOR = ",".getBytes();
public static void main(String[] args) {
try {
myFunc();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void myFunc() throws InterruptedException {
//ldap的配置文件信息
String confPath = "D:\\EnvConfig.json";
Configuration conf = new Configuration();
//加载配置文件
Utils.loadEnvProperties(conf, confPath);
Connection connection = null;
Table table = null;
//定义写出到文件的名字
String fileName = "D:\\topologyaudit.csv";
OutputStream outst = null;
try {
outst = new FileOutputStream(fileName);
//写出时所设计的每个字段的字节码
outst.write("topologyname,partition,topic,delta,startTime,timeFromStart\n".getBytes());
User user = ConnectionUtils.createUser(conf);
connection = ConnectionFactory.createConnection(conf, user);
// convert to your hbase table
//需要获取的表名
table = connection.getTable(TableName
.valueOf("STORM-OFFSET-MONITOR"));
System.out.println("-----------");
Scan scan = new Scan();
scan.setCaching(1000);
ResultScanner rs = table.getScanner(scan);
for (Result r : rs) {
//读取行键信息
byte[] rowkey = r.getRow();
int firstPlusSymbolPos = 0;
int secondPlusSymbolPos = 0;
int thirdPlusSymbolPos = 0;
for (int i = 0; i < rowkey.length; i++) {
//+的ASCII码表的表示是43
if ((int) rowkey[i] == 43) {
firstPlusSymbolPos = i;
//第二个+的位置,因为int是4个字节
secondPlusSymbolPos = firstPlusSymbolPos + 5;
//第三个+的位置,因为long是8个字节
thirdPlusSymbolPos = secondPlusSymbolPos + 9;
break;
}
}
//往表格中写入数据
List<Cell> cells = r.listCells();
for(Cell cell : cells){
//行键,从最开始到第一个+号-----topologyname
outst.write(Bytes.copy(rowkey, 0,firstPlusSymbolPos));
outst.write(FILE_SAPERATOR);
//行键,截取第一个+号,后面4位------partitionnum
outst.write(String.valueOf(Bytes.toInt(rowkey, firstPlusSymbolPos + 1, 4)).getBytes());
outst.write(FILE_SAPERATOR);
//行键,截取最后一个+到最后-----topic
outst.write(Bytes.copy(rowkey, thirdPlusSymbolPos + 1,rowkey.length - 1 - thirdPlusSymbolPos));
outst.write(FILE_SAPERATOR);
//-----delta---差值数据
outst.write(String.valueOf(Bytes.toLong(CellUtil.cloneValue(cell))).getBytes());
outst.write(FILE_SAPERATOR);
//------starttime---起始时间
outst.write(String.valueOf(Bytes.toLong(rowkey, secondPlusSymbolPos + 1, 8)).getBytes());
outst.write(FILE_SAPERATOR);
//------timeFormStart-----时间间隔
outst.write(String.valueOf(Bytes.toLong(CellUtil.cloneQualifier(cell))).getBytes());
outst.write("\n".getBytes());
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
outst.close();
table.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
至此,代码内容基本完全搞定,一些工具类将不做展示。