Source 是负责接收数据到 Flume Agent 的组件。
Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义source的接口:https://flume.apache.org/FlumeDeveloperGuide.html#source
根据官方说明自定义 MySource 需要继承 AbstractSource
类并实现 Configurable
和 PollableSource
接口。
实现相应方法:
getBackOffSleepIncrement()//暂不用
getMaxBackOffSleepInterval()//暂不用
configure(Context context)//初始化context(读取配置文件内容)
process()//获取数据封装成event并写入channel,这个方法将被循环调用。
//使用场景:读取MySQL数据或者其他文件系统。
1 需求
使用 Flume 接收数据,并给每条数据添加前缀,输出到控制台。
前缀可从flume配置文件中配置。
2 分析
3 编码
2 编写 Java 代码
package com.demo.day01;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String preText;
private Long delayTime;
public void configure(Context context) {
preText = context.getString("preText", "atguigu");
delayTime = context.getLong("delayTime");
}
public Status process() throws EventDeliveryException {
/*
1. 获取数据 2. 封装event 3. 写入channel
*/
// 1. 使用for循环模拟获取数据
try {
for (int i = 0; i < 5; i++) {
//2. 封装事件
SimpleEvent event = new SimpleEvent();
event.setHeaders(new HashMap<String, String>());
event.setBody((preText + i).getBytes());
// 3. 写入 channel
getChannelProcessor().processEvent(event);
Thread.sleep(delayTime);
}
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
}