系统中有一个新的需求,通过flume的TaildirSource去读取rotate文件数据,所谓的rotate文件就是指文件中的数据在达到我们指定大小的时候,系统会自动重命名将历史记录文件备份为一个新的文件,例如指定log文件大小为1M,系统在log文件中不断生成日志信息,如果该文件达到1M,那么系统就会产生log.1、log.2这样的历史文件;通过配合使用flume的TaildirSource去监听log文件的时候发现,不能生成log.1、log.2这样的历史文件,也就是如果有TaildirSource去监听这个文件的话,那么该文件在达到指定大小之后是不能被重命名的,原因是文件被进程占用,不能修改其名字,相当于我们在windows系统中重命名一个正在打开的文件,是不可以的。
其实我们通过简单的java测试类就可以测试RandomAccessFile和FileChannel 两个类读取文件时候的差异,通过RandomAccessFile读取文件的时候,是不能够修改文件名称的,我想java中其他阻塞性的IO流也一样,而使用FileChannel 是可以的,但是获取FileChannel不能使用下面的方式:
经过研究TaildirSource的源代码发现,TailFile操作类在读取文件的时候,它使用的是RandomAccessFile,它是java中一个阻塞性的读取文件操作类,我们需要将其改为非阻塞文件操作类:FileChannel,并且在构造方法中获取该对象的方式是通过它自身的方法来进行:
FileChannel inChannel = FileChannel.open(Paths.get(file.getAbsolutePath()), StandardOpenOption.READ);
其实我们通过简单的java测试类就可以测试RandomAccessFile和FileChannel 两个类读取文件时候的差异,通过RandomAccessFile读取文件的时候,是不能够修改文件名称的,我想java中其他阻塞性的IO流也一样,而使用FileChannel 是可以的,但是获取FileChannel不能使用下面的方式:
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
通过上面方式获取到的FileChannel在读取文件的时候也不能够对文件进行重命名。
经过测试后,在TaildirSource的源代码的TailFile文件中将RandomAccessFile相应的方法变成FileChannel相应的方法,即可解决该问题。修改后的类源代码如下:
package com.centrify.platform.analytics.agent.mysource;
import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER_KEY;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.Lists;
public class TailFile {
private static final Logger logger = LoggerFactory.getLogger(TailFile.class);
private static final byte BYTE_NL = (byte) 10;
private static final byte BYTE_CR = (byte) 13;
private static final int BUFFER_SIZE = 8192;
private static final int NEED_READING = -1;
//private RandomAccessFile raf;
private FileChannel inChannel;
private final String path;
private final long inode;
private long pos;
private long lastUpdated;
private boolean needTail;
private final Map<String, String> headers;
private byte[] buffer;
private byte[] oldBuffer;
private int bufferPos;
private long lineReadPos;
public TailFile(File file, Map<String, String> headers, long inode, long pos)
throws IOException {
//this.raf = new RandomAccessFile(file, "r");
this.inChannel = FileChannel.open(Paths.get(file.getAbsolutePath()), StandardOpenOption.READ);
if (pos > 0) {
//raf.seek(pos);
inChannel.position(pos);
lineReadPos = pos;
}
this.path = file.getAbsolutePath();
this.inode = inode;
this.pos = pos;
this.lastUpdated = 0L;
this.needTail = true;
this.headers = headers;
this.oldBuffer = new byte[0];
this.bufferPos = NEED_READING;
}
//public RandomAccessFile getRaf() {
// return raf;
// }
public FileChannel getFileChannel(){
return inChannel;
}
public String getPath() {
return path;
}
public long getInode() {
return inode;
}
public long getPos() {
return pos;
}
public long getLastUpdated() {
return lastUpdated;
}
public boolean needTail() {
return needTail;
}
public Map<String, String> getHeaders() {
return headers;
}
public long getLineReadPos() {
return lineReadPos;
}
public void setPos(long pos) {
this.pos = pos;
}
public void setLastUpdated(long lastUpdated) {
this.lastUpdated = lastUpdated;
}
public void setNeedTail(boolean needTail) {
this.needTail = needTail;
}
public void setLineReadPos(long lineReadPos) {
this.lineReadPos = lineReadPos;
}
public boolean updatePos(String path, long inode, long pos) throws IOException {
if (this.inode == inode && this.path.equals(path)) {
setPos(pos);
updateFilePos(pos);
logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
return true;
}
return false;
}
public void updateFilePos(long pos) throws IOException {
//raf.seek(pos);
inChannel.position(pos);
lineReadPos = pos;
bufferPos = NEED_READING;
oldBuffer = new byte[0];
}
public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
boolean addByteOffset) throws IOException {
List<Event> events = Lists.newLinkedList();
for (int i = 0; i < numEvents; i++) {
Event event = readEvent(backoffWithoutNL, addByteOffset);
if (event == null) {
break;
}
events.add(event);
}
return events;
}
private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException {
Long posTmp = getLineReadPos();
LineResult line = readLine();
if (line == null) {
return null;
}
if (backoffWithoutNL && !line.lineSepInclude) {
// logger.info("Backing off in file without newline: "
// + path + ", inode: " + inode + ", pos: " + raf.getFilePointer());
logger.info("Backing off in file without newline: "
+ path + ", inode: " + inode + ", pos: " + inChannel.position());
updateFilePos(posTmp);
return null;
}
Event event = EventBuilder.withBody(line.line);
if (addByteOffset == true) {
event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString());
}
return event;
}
private void readFile() throws IOException {
/*if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
} else {
buffer = new byte[BUFFER_SIZE];
}
raf.read(buffer, 0, buffer.length);*/
if ((inChannel.size() - inChannel.position()) < BUFFER_SIZE) {
buffer = new byte[(int) (inChannel.size() - inChannel.position())];
} else {
buffer = new byte[BUFFER_SIZE];
}
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
inChannel.read(byteBuffer);
bufferPos = 0;
}
private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA,
byte[] b, int startIdxB, int lenB) {
byte[] c = new byte[lenA + lenB];
System.arraycopy(a, startIdxA, c, 0, lenA);
System.arraycopy(b, startIdxB, c, lenA, lenB);
return c;
}
public LineResult readLine() throws IOException {
LineResult lineResult = null;
while (true) {
if (bufferPos == NEED_READING) {
//if (raf.getFilePointer() < raf.length()) {
if (inChannel.position() < inChannel.size()) {
readFile();
} else {
if (oldBuffer.length > 0) {
lineResult = new LineResult(false, oldBuffer);
oldBuffer = new byte[0];
setLineReadPos(lineReadPos + lineResult.line.length);
}
break;
}
}
for (int i = bufferPos; i < buffer.length; i++) {
if (buffer[i] == BYTE_NL) {
int oldLen = oldBuffer.length;
// Don't copy last byte(NEW_LINE)
int lineLen = i - bufferPos;
// For windows, check for CR
if (i > 0 && buffer[i - 1] == BYTE_CR) {
lineLen -= 1;
} else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
oldLen -= 1;
}
lineResult = new LineResult(true,
concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
oldBuffer = new byte[0];
if (i + 1 < buffer.length) {
bufferPos = i + 1;
} else {
bufferPos = NEED_READING;
}
break;
}
}
if (lineResult != null) {
break;
}
// NEW_LINE not showed up at the end of the buffer
oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
buffer, bufferPos, buffer.length - bufferPos);
bufferPos = NEED_READING;
}
return lineResult;
}
public void close() {
try {
//raf.close();
//raf = null;
inChannel.close();
inChannel = null;
long now = System.currentTimeMillis();
setLastUpdated(now);
} catch (IOException e) {
logger.error("Failed closing file: " + path + ", inode: " + inode, e);
}
}
private class LineResult {
final boolean lineSepInclude;
final byte[] line;
public LineResult(boolean lineSepInclude, byte[] line) {
super();
this.lineSepInclude = lineSepInclude;
this.line = line;
}
}
}