String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());
如果要自定义rowkey,修改源代码是唯一的办法,RegexHbaseEventSerializer.java就是我们要修改的文件。我们可以新建一个类来继承,原始文件不要去修改。不过今天我测试的时候是直接修改源文件的。
我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54
String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());
前3个字段都在文件名中,这就意味着,我必须解析文件名获得这3个字段,那么配置文件必须添加header:
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1
OK,来整理一下思路,我需要解析文件名来获取3个字段作为rowkey组成部分,那么配置需要添加header,然后把源代码自动生成rowkey的规则替换成我们自己的规则,就是这么简单。
1. 新建解析文件名的方法:
public String splitFileName() {
for (Map.Entry<String, String> entry : headers.entrySet()) {
return entry.getValue();
}
return null;
}
既然是要解析文件名,很显然要知道怎么获取文件名,从代码可以知道headers.entrySet就是获取header的方法,因为我就一个header,所以一次循环就return结果。
2. 替换默认rowkey生成规则
protected byte[] getRowKey(Calendar cal) {
/*
* NOTE: This key generation strategy has the following properties:
*
* 1) Within a single JVM, the same row key will never be duplicated. 2)
* Amongst any two JVM's operating at different time periods (according
* to their respective clocks), the same row key will never be
* duplicated. 3) Amongst any two JVM's operating concurrently
* (according to their respective clocks), the odds of duplicating a
* row-key are non-zero but infinitesimal. This would require
* simultaneous collision in (a) the timestamp (b) the respective nonce
* and (c) the random string. The string is necessary since (a) and (b)
* could collide if a fleet of Flume agents are restarted in tandem.
*
* Row-key uniqueness is important because conflicting row-keys will
* cause data loss.
*/
this.fileName = splitFileName();
this.machineNo = fileName.split("_")[1];
this.fileTimeStamp = fileName.split("_")[2];
this.fileNo = fileName.split("_")[3].split("\\.")[0];
String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());
//String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());
return rowKey.getBytes(charset);
}
注释掉的那行就是默认的规则,新的是我自己要的规则。
就这样完成了,打个包替换之前的包,消费一个文件来测试,结果正如我们所期望的:
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147016#55 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0
B13612145#1529637655#3#1530085147017#56 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147017#56 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0
B13612145#1529637655#3#1530085147018#57 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147018#57 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0
B13612145#1529637655#3#1530085147018#58 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147018#58 column=cf:ext_toolno, timestamp=1530085147229, value=30
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0
B13612145#1529637655#3#1530085147019#59 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3
B13612145#1529637655#3#1530085147019#59 column=cf:ext_toolno, timestamp=1530085147229, value=30
今天测试的时候碰到2个问题:
1. 消费文件有几次出现文件名已经修改为.COMPLETE,但是我HBASE数据没有任何增加,而且没有报任何错误,。给我的感觉就是没有消费。测试了几次,都是如此,很是困惑,后来突然想起来之前有人提到过如果一个很大的文件需要放到spooldir目录会发生错误,因为文件一进去就会消费,但是文件又在拷贝过程。后来我改成先把原始文件名添加.COMPLETE,拷贝完成之后,再修改文件名去掉.COMPLETE.
2. 时间冲突
rowkey的规则里有时间,我有一个文件60行数据,消费之后只有48条,因为之前我同过spark 消费也出现过这个问题,因此很容易知道这是因为rowkey冲突了,导致数据覆盖了,因此把源文件的nonce.getAndIncrement()加到ROWKEY即可。
简单说就是循环的过程cal.getTimeInMillis()这个玩意会可能重复,很多人觉得微秒级别不应该出现重复,事实上我碰到过2次,因此现在对通过时间作为rowkey格外小心。