项目升级需求: 需要通过flume采集过去时间段的日志,并存储到HDFS中,保证路径以 xxx/ymd=%Y-%m-%d/h=%H 来存储
实践很简单:
1)我们通过flume的RPCClient发送Event事件,给flume服务端,Event header中放入timeStamp
headers.put("timestamp", data.getTime().toString());
2)配置flume的properties:
a1.sinks.k1.hdfs.path = %{hdfsPath}/%{data_type}/ymd=%Y-%m-%d/h=%H
然而结果并不理想,之前的日志还是没有按日志时间归档。
查了下Manual,是timestamp的格式搞错了,需要ms单位的时间戳。
既然问题解决了,顺手源码走读下:
Flume很贴心的将HDFS sink相关的操作,放在Flume-hdfs-sink文件夹下。找到HDFSEventSink类。其中
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Set<BucketWriter> writers = new LinkedHashSet<>();
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}
// reconstruct the path name by substituting place holders
//注意:解析event header,并生成realPath
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
// Callback to remove the reference to the bucket writer from the
// sfWriters map so that all buffers used by the HDFS file
// handles are garbage collected.
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
sfWriters.remove(bucketPath);
}
}
};
...
}
在BucketPath中解析EventHeader,处理一些正则匹配,并生成realPath。
打开文件,找到具体实现方法replaceShorthand
protected static String replaceShorthand(char c, Map<String, String> headers,
TimeZone timeZone, boolean needRounding, int unit, int roundDown,
boolean useLocalTimestamp, long ts) {
String timestampHeader = null;
try {
if (!useLocalTimestamp) {
timestampHeader = headers.get("timestamp");
Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
"the Flume event headers, but it was null");
ts = Long.valueOf(timestampHeader);
} else {
timestampHeader = String.valueOf(ts);
}
} catch (NumberFormatException e) {
throw new RuntimeException("Flume wasn't able to parse timestamp header"
+ " in the event to resolve time based bucketing. Please check that"
+ " you're correctly populating timestamp header (for example using"
+ " TimestampInterceptor source interceptor).", e);
}
...
}
timestampHeader = headers.get("timestamp"); 明确了解析timestamp字段,并用Long.valueOf(timestampHeader)来获取时间戳。
结合上下文,发现加入没这个字段,会用当前时间来替代。所以导致错误!
PS:SDK中的测试用例如下:
TestHDFSEventSink类中
@Test
public void testTextAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
LOG.debug("Starting...");
final long rollCount = 3;
final long batchSize = 2;
final String fileName = "FlumeData";
String newPath = testPath + "/singleTextBucket";
int totalEvents = 0;
int i = 1, j = 1;
// clear the test directory
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(newPath);
fs.delete(dirPath, true);
fs.mkdirs(dirPath);
Context context = new Context();
// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.writeFormat", "Text");
context.put("hdfs.fileType", "DataStream");
Configurables.configure(sink, context);
Channel channel = new MemoryChannel();
Configurables.configure(channel, context);
sink.setChannel(channel);
sink.start();
Calendar eventDate = Calendar.getInstance();
List<String> bodies = Lists.newArrayList();
// push the event batches into channel
for (i = 1; i < 4; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
event.getHeaders().put("timestamp",
String.valueOf(eventDate.getTimeInMillis()));
event.getHeaders().put("hostname", "Host" + i);
String body = "Test." + i + "." + j;
event.setBody(body.getBytes());
bodies.add(body);
channel.put(event);
totalEvents++;
}
txn.commit();
txn.close();
// execute sink to process the events
sink.process();
}
sink.stop();
// loop through all the files generated and check their contains
FileStatus[] dirStat = fs.listStatus(dirPath);
Path[] fList = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
long expectedFiles = totalEvents / rollCount;
if (totalEvents % rollCount > 0) expectedFiles++;
Assert.assertEquals("num files wrong, found: " +
Lists.newArrayList(fList), expectedFiles, fList.length);
verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
显示正确的使用方法。