Flume 丢失数据问题

利用flume读取日志文件的时候,同一秒钟日志文件还在写入,但被flume读取到,被标识成.COMPLETED抛出了异常

错误日志如下:

2017-05-10 18:09:13,530 (cluster-ClusterId{value='5912e6c933662351ead2de96', description='null'}-192.168.0.126:27017) [INFO - com.mongodb.diagnostics.logging.SLF4JLogger.info(SLF4JLogger.java:71)] Monitor thread successfully connected to server with description ServerDescription{address=192.168.0.126:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 4, 3]}, minWireVersion=0, maxWireVersion=5, maxDocumentSize=16777216, roundTripTimeNanos=1929078}
2017-05-10 18:09:13,622 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - com.mongodb.diagnostics.logging.SLF4JLogger.info(SLF4JLogger.java:71)] Opened connection [connectionId{localValue:2, serverValue:135}] to 192.168.0.126:27017
2017-05-10 18:09:13,658 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.riderzen.flume.sink.MongoSink.process(MongoSink.java:139)] can't process events
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
	at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
	at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
	at org.riderzen.flume.sink.MongoSink.parseEvents(MongoSink.java:231)
	at org.riderzen.flume.sink.MongoSink.process(MongoSink.java:137)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)
2017-05-10 18:09:13,662 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.riderzen.flume.sink.MongoSink.parseEvents(MongoSink.java:223)] can't process events, drop it!


临时的解决方案如下:

先把日志文件生成在/data/logs/rawdata中,然后写了一个shell保留最后一个文件,把其它的文件传到flume的日志监听目录中。

功能:先把数据目录,通过日期筛选出来最后一个文件保留,其它的移动到新的目录中。

pro.sh

#!/bin/sh
cd /data/logs/rawdata
while true; do
time=`date +%H:%M:%S`
server=`ls  |grep -v \`ls -rt|tail -1\` `
if [[ -n "$server" ]] || [[ "$time" == "00:00:00" ]]; then
mv $server /data/logs/flume
fi
sleep 2
done


注:more /proc/cpuinfo |grep "physical id"|uniq|wc -l  先查看一下Linux内核,如果是单核可以把时间设置成"00:00:01" 去执行任务,

输出echo 

`date +%H:%M:%S`

查看一下循环频率
如果大家有什么好的办法可以告诉我:)

后来发现改成file模式就没有丢失数据了:

配置如下:

a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/logs/flume
#a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.fileHeader =true
a1.sources.r1.interceptors =il
a1.sources.r1.interceptors.il.type =timestamp
#a1.sources.r1.fileHeaderKey.op=upsert
a1.sources.r1.channels =c1

#a1.channels.c1.type =memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir=/data/logs/tmp
a1.channels.c1.dataDirs=/data/logs/complate
a1.channels.c1.capacity =1000
a1.channels.c1.transactionCapacity =100
#a1.channels.c1.keep-alive=3


a1.sinks.s1.type = org.riderzen.flume.sink.MongoSink
a1.sinks.s1.host = 192.168.0.126
a1.sinks.s1.port = 27017
a1.sinks.s1.model = single
a1.sinks.s1.db = dtc
a1.sinks.s1.collection = log
a1.sinks.s1.batch = 2
a1.sinks.s1.channel =c1







猜你喜欢

转载自blog.csdn.net/thl331860203/article/details/71643335