编写不易,转载请注明:http://shihlei.iteye.com/blog/2306151
项目需要:
Flume收集日志,期望落地文件系统,按小时分割,并压缩保存。
Flume自带的File Roll Sink 只能按时间分割文件,不能定制存放目录,压缩文件等。所以自定义了Sink结合Log4j的RollingFileAppender的特性,完成该功能。
一 借助Log4j2 的Logger实现动态配置,filePattern分割文件压缩
package light.flume; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.RollingFileAppender; import org.apache.logging.log4j.core.appender.rolling.TimeBasedTriggeringPolicy; import org.apache.logging.log4j.core.appender.rolling.TriggeringPolicy; import org.apache.logging.log4j.core.config.AppenderRef; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.logging.log4j.core.layout.PatternLayout; /** * RollingFileLogger , * 基于Log4j2的RollingFileAppender,通过运行时修改配置的方式,在程序中实时指定要输出的目录和压缩规则 * * @author shilei * */ public class RollingFileLogger { private static final Logger logger = LogManager.getLogger(RollingFileLogger.class); private Logger fileWriter; private String fileName; private String filePattern; private String appenderName; private String loggerName; /** * 创建Logger * * @param id * logger唯一标识,相同的会覆盖 * @param fileName * log4j2 fileName , 例如 : logs/main.log * @param filePattern * log4j2 的filePattern * ,例如:logs/$${date:yyyy-MM}/main-%d{yyyy-MM-dd_hh_mm_ss}.log.gz */ public RollingFileLogger(String loggerId, String fileName, String filePattern) { this.fileName = fileName; this.filePattern = filePattern; appenderName = loggerId + "_appender"; loggerName = loggerId + "_logger"; logger.info("fileName : " + fileName); logger.info("filePattern : " + filePattern); logger.info("appenderName : " + appenderName); logger.info("loggerName : " + loggerName); updateLoggerConfig(); fileWriter = LogManager.getLogger(loggerName); } /** * 更新配置 */ private void updateLoggerConfig() { final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); final Configuration config = ctx.getConfiguration(); // add RollingFileAppender TriggeringPolicy policy = TimeBasedTriggeringPolicy.createPolicy("1", "true"); Layout<?> layout = PatternLayout.createLayout("%m%n", null, config, null, Charset.forName("utf-8"), true, false, null, null); Appender appender = RollingFileAppender.createAppender(fileName, filePattern, "true", appenderName, "true", "", "true", policy, null, layout, null, "true", "false", null, config); appender.start(); config.addAppender(appender); // add AsyncLogger AppenderRef ref = AppenderRef.createAppenderRef(appenderName, null, null); AppenderRef[] refs = new AppenderRef[] { ref }; LoggerConfig loggerConfig = LoggerConfig.createLogger(false, Level.INFO, loggerName, "true", refs, null, config, null); loggerConfig.addAppender(appender, null, null); config.addLogger(loggerName, loggerConfig); ctx.updateLoggers(); } public void write(String msg) { fileWriter.info("{}", msg); } public static void main(String[] args) throws Exception { RollingFileLogger writer = new RollingFileLogger("1", "test/test.data", "test/data/${date:yyyy-MM}/test-%d{yyyy-MM-dd_hh_mm_ss}.log.gz"); RollingFileLogger writer2 = new RollingFileLogger("2", "hehe/hehe.data", "hehe/data/${date:yyyy-MM}/test-%d{yyyy-MM-dd_hh_mm_ss}.log.gz"); for (int i = 0; true; i++) { writer.write("hh" + i); writer2.write("hehe" + i); TimeUnit.MICROSECONDS.sleep(100); } } }
二 自定义Flume Sink,集成
package light.flume; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 集成flume sink * * a1.channels = c1 * a1.sinks = k1 * a1.sinks.k1.type = light.flume.RollingFileFlumeSink * a1.sinks.k1.channel = c1 * a1.sinks.k1.sink.id = test * a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log * a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz * * * @author shilei * */ public class RollingFileFlumeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(RollingFileFlumeSink.class); private static final String SINK_ID = "sink.id"; private static final String SINK_FILENAME = "sink.filename"; private static final String SINK_FILEPATTERN = "sink.filepattern"; private RollingFileLogger rollingFileLogger; @Override public void configure(Context context) { String sinkId = context.getString(SINK_ID, "1"); String sinkFileName = context.getString(SINK_FILENAME, "/tmp/flume_rollingfile_sink/rolling_file.log"); String sinkFilePattern = context.getString(SINK_FILEPATTERN, "/tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz"); logger.info("{} : {} ", SINK_ID, sinkId); logger.info("{} : {} ", SINK_FILENAME, sinkFileName); logger.info("{} : {} ", SINK_FILEPATTERN, sinkFilePattern); rollingFileLogger = new RollingFileLogger(sinkId, sinkFileName, sinkFilePattern); } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to // do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); handleEvent(event.getBody()); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error) t; } } // you must add this line of code in order to close the Transaction. txn.close(); return status; } public void handleEvent(byte[] msg) { try { String msgStr = new String(msg, "utf-8"); rollingFileLogger.write(msgStr); } catch (Exception e) { logger.error("Cookie inject error : ", e.getMessage(), e); } } }
注:
官方costumer sink 的样例,没有关闭会话的方法,会报异常,如下
2016-06-17 15:28:40,486 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: begin() called when transaction is COMPLETED! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at drizzt.injector.flume.InjectFlumeSink.process(InjectFlumeSink.java:22) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)
解决方案:添加:txn.close();
三 Flume 使用Sink
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = light.flume.RollingFileFlumeSink a1.sinks.k1.channel = c1 a1.sinks.k1.sink.id = test a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd_hh_mm_ss}.log.gz
文件见附件,可直接放到flume lib下使用。