LogTypeinterCeptor(数据类型分发)
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeinterCeptor implements Interceptor {
public void initialize() {
}
public Event intercept(Event event) {
byte[] body = event.getBody();
String json = new String(body);
String logType = "";
if (json.trim().contains("start")){
logType="start";
}else {
logType="event";
}
Map<String, String> headers = event.getHeaders();
headers.put("logType",logType);
return event;
}
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceprs = new ArrayList<Event>(events.size());
for (Event event : events) {
Event intercept = intercept(event);
if (intercept != null){
interceprs.add(intercept);
}
}
return interceprs;
}
public void close() {
}
public static class Builder implements Interceptor.Builder{
public Interceptor build() {
return new LogTypeinterCeptor();
}
public void configure(Context context) {
}
}
}
LogETLinterCeptor(数据清洗拦截器)
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class LogETLinterCeptor implements Interceptor {
public void initialize() {
}
public Event intercept(Event event) {
//字段长度不够的过滤掉
//1595155563297|
// {"cm":{"ln":"-86.8","sv":"V2.0.7","os":"8.1.6","g":"[email protected]","mid":"m192","nw":"3G","l":"pt","vc":"3","hw":"750*1134","ar":"MX","uid":"u614","t":"1595082160416","la":"30.3","md":"sumsung-1","vn":"1.1.9","ba":"Sumsung","sr":"C"},"ap":"appclient.AppMain","et":[{"ett":"1595136335771","en":"display","kv":{"newsid":"n054","action":"1","extend1":"1","place":"0","category":"74"}},
// {"ett":"1595102971707","en":"notification","kv":{"ap_time":"1595069742264","action":"4","type":"4","content":""}},
// {"ett":"1595087536026","en":"active_foreground","kv":{"access":"1","push_id":"2"}},
// {"ett":"1595060909215","en":"active_background","kv":{"active_source":"1"}},
// {"ett":"1595120709374","en":"error","kv":{"errorDetail":"at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n at java.lang.reflect.Method.invoke(Method.java:606)\\n","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}}]}
byte[] body = event.getBody();
final String json = new String(body);
if (LogUtils.validated(json)) {
return event;
}
return null;
}
public List<Event> intercept(List<Event> events) {
ArrayList<Event> listEvent = new ArrayList<Event>(events.size());
//循环遍历event,过滤掉返回为空值的
for (Event event : events) {
Event intercept = intercept(event);
if (intercept != null) {
//将不为空的值返回
listEvent.add(intercept);
}
}
return listEvent;
}
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new LogETLinterCeptor();
}
public void configure(Context context) {
}
}
}
LogETLinterCeptor(定义的方法类)
import org.apache.commons.lang.math.NumberUtils;
public class LogUtils {
//具体的校验方法
public static boolean validated(String json) {
//切割数据
String[] jsonArray = json.split("\\|");
//1595155563297|
// {"cm":{"ln":"-86.8","sv":"V2.0.7","os":"8.1.6","g":"[email protected]","mid":"m192","nw":"3G","l":"pt","vc":"3","hw":"750*1134","ar":"MX","uid":"u614","t":"1595082160416","la":"30.3","md":"sumsung-1","vn":"1.1.9","ba":"Sumsung","sr":"C"},"ap":"appclient.AppMain","et":[{"ett":"1595136335771","en":"display","kv":{"newsid":"n054","action":"1","extend1":"1","place":"0","category":"74"}},
// {"ett":"1595102971707","en":"notification","kv":{"ap_time":"1595069742264","action":"4","type":"4","content":""}},
// {"ett":"1595087536026","en":"active_foreground","kv":{"access":"1","push_id":"2"}},
// {"ett":"1595060909215","en":"active_background","kv":{"active_source":"1"}},
// {"ett":"1595120709374","en":"error","kv":{"errorDetail":"at cn.lift.dfdfdf.control.CommandUtil.getInfo(CommandUtil.java:67)\\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\\n at java.lang.reflect.Method.invoke(Method.java:606)\\n","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}}]}
//判断长度
if (jsonArray.length < 2) {
return false;
}
//校验服务端时间是否13位、是否纯数字,
if (jsonArray[0].length() != 13 || !NumberUtils.isDigits(jsonArray[0])) {
return false;
}
//过滤掉空格,以{开头、以}结尾
if (!jsonArray[1].trim().startsWith("{") || !jsonArray[1].trim().endsWith("}")){
return false;
}
return true;
}
}