版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lyl15000183371/article/details/89711541
应用背景
现处于新零售行业的一家公司。商城和CRM每天会产生大量的业务数据。我们需要将这些数据进行简单处理,转存到hive数据库中,供其它部门同事使用。
架构设计
webApi+kafka+flume+hdfs,这里主要记录一下flume的使用。
Flume搭建
这里采用3 agent + 2 Collector模式,详情参考
区别
我们需要处理的不是单个表数据,而是多张表。那么问题来了,我怎么将flume-agent和flume-collector对应上了。因为我们将数据按tableName分开存储
后面想到了采用不同表对应不同端口的方式进行对接。
table:port 映射
tableName | port |
---|---|
table1 | 50010 |
table2 | 50011 |
table3 | 50012 |
table4 | 50013 |
table5 | 50014 |
Agent 配置
table1.sources = seqGenSrc
table1.channels = fileChannel
table1.sinks = k1 k2
#set gruop
table1.sinkgroups = g1
# source的来源通过kafka获取
# 请参考https://flume.apache.org/FlumeUserGuide.html#kafka-source
table1.sources.seqGenSrc.type = org.apache.flume.source.kafka.KafkaSource
#kafka zookeeper 地址
# table1.sources.seqGenSrc.zookeeperConnect = zk:port
#kafka 集群地址
table1.sources.seqGenSrc.kafka.bootstrap.servers = kafkaServer:port
table1.sources.seqGenSrc.kafka.topics.regex = .*\_table1$
table1.sources.seqGenSrc.groupId = flume_coupon_list
table1.sources.seqGenSrc.kafka.consumer.auto.offset.reset = earliest
#其它kafka配置类似couponList.sources.seqGenSrc.+kafka配置
table1.sources.seqGenSrc.channels = fileChannel
table1.channels.fileChannel.type = file
table1.channels.fileChannel.checkpointDir = /data/flume/kafka/table1/chk
table1.channels.fileChannel.dataDirs = /data/flume/kafka/table1/data
table1.channels.fileChannel.capacity = 3000
table1.channels.fileChannel.transactionCapacity = 1500
# set sink1
table1.sinks.k1.channel = fileChannel
# table1.sinks.k1.requiredAcks = 1
table1.sinks.k1.type = avro
table1.sinks.k1.hostname = flume-collector1
table1.sinks.k1.port = 50010
# set sink2
table1.sinks.k2.channel = fileChannel
# table1.sinks.k1.requiredAcks = 1
table1.sinks.k2.type = avro
table1.sinks.k2.hostname = flume-collector2
table1.sinks.k2.port = 50010
#set sink group
table1.sinkgroups.g1.sinks = k1 k2
#set failover
table1.sinkgroups.g1.processor.type = failover
table1.sinkgroups.g1.processor.priority.k1 = 1
table1.sinkgroups.g1.processor.priority.k2 = 10
table1.sinkgroups.g1.processor.maxpenalty = 10000
Collector1 配置
#set Agent name
table1.sources = seqGenSrc
table1.channels = fileChannel
table1.sinks = hdfsSink
# other node,nna to nns
table1.sources.seqGenSrc.type = avro
table1.sources.seqGenSrc.bind = flume-collector1
table1.sources.seqGenSrc.port = 50010
table1.sources.seqGenSrc.interceptors = i1
table1.sources.seqGenSrc.interceptors.i1.type = www.lyl.com.flume.interceptor.CrmInterceptor$Builder
table1.sources.seqGenSrc.channels = fileChannel
# channel的配置,将Source接收到数据的一个缓冲到内存中。
# 详细说明请参考https://flume.apache.org/FlumeUserGuide.html#memory-channel
table1.channels.fileChannel.type = file
table1.channels.fileChannel.checkpointDir = /data/flume/kafka/table1/chk
table1.channels.fileChannel.dataDirs = /data/flume/kafka/table1/data
table1.channels.fileChannel.capacity = 3000
table1.channels.fileChannel.transactionCapacity = 1500
# sink到hdfs
table1.sinks.hdfsSink.type = hdfs
# sink到hdfs的地址
table1.sinks.hdfsSink.hdfs.path = hdfs://LYL/%{topic}/%{shardGrp}/%Y/%m/%d
#默认值30 hdfs sink间隔多长时间将临时文件滚动成最终目标文件,单位:秒,默认30;如果设置成0,则表示不根据时间来滚动
table1.sinks.hdfsSink.hdfs.rollInterval = 79200
#当临时文件达到该大小(单位:bytes)时,滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;
table1.sinks.hdfsSink.hdfs.rollSize = 0
table1.sinks.hdfsSink.hdfs.rollCount = 0
#解决InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
table1.sinks.hdfsSink.hdfs.callTimeout = 3600000
table1.sinks.hdfsSink.hdfs.minBlockReplicas = 1
table1.sinks.hdfsSink.hdfs.writeFormat = Text
table1.sinks.hdfsSink.hdfs.fileType = DataStream
#每个批次刷新到HDFS上的events数量
table1.sinks.hdfsSink.hdfs.batchSize = 1000
table1.sinks.hdfsSink.hdfs.threadsPoolSize = 100
table1.sinks.hdfsSink.hdfs.idleTimeout = 3600
#table1.sinks.hdfsSink.hdfs.filePrefix = %H
#table1.sinks.hdfsSink.hdfs.fileSuffix = txt
# 指定从哪个channel sink数据
table1.sinks.hdfsSink.channel = fileChannel
Collector2 配置
#set Agent name
table1.sources = seqGenSrc
table1.channels = fileChannel
table1.sinks = hdfsSink
# other node,nna to nns
table1.sources.seqGenSrc.type = avro
table1.sources.seqGenSrc.bind = flume-collector2
table1.sources.seqGenSrc.port = 50010
table1.sources.seqGenSrc.interceptors = i1
table1.sources.seqGenSrc.interceptors.i1.type = www.lyl.com.flume.interceptor.CrmInterceptor$Builder
table1.sources.seqGenSrc.channels = fileChannel
# channel的配置,将Source接收到数据的一个缓冲到内存中。
# 详细说明请参考https://flume.apache.org/FlumeUserGuide.html#memory-channel
table1.channels.fileChannel.type = file
table1.channels.fileChannel.checkpointDir = /data/flume/kafka/table1/chk
table1.channels.fileChannel.dataDirs = /data/flume/kafka/table1/data
table1.channels.fileChannel.capacity = 3000
table1.channels.fileChannel.transactionCapacity = 1500
# sink到hdfs
table1.sinks.hdfsSink.type = hdfs
# sink到hdfs的地址
table1.sinks.hdfsSink.hdfs.path = hdfs://LYL/%{topic}/%{shardGrp}/%Y/%m/%d
#默认值30 hdfs sink间隔多长时间将临时文件滚动成最终目标文件,单位:秒,默认30;如果设置成0,则表示不根据时间来滚动
table1.sinks.hdfsSink.hdfs.rollInterval = 79200
#当临时文件达到该大小(单位:bytes)时,滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;
table1.sinks.hdfsSink.hdfs.rollSize = 0
table1.sinks.hdfsSink.hdfs.rollCount = 0
#解决InterruptedException: Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.
table1.sinks.hdfsSink.hdfs.callTimeout = 3600000
table1.sinks.hdfsSink.hdfs.minBlockReplicas = 1
table1.sinks.hdfsSink.hdfs.writeFormat = Text
table1.sinks.hdfsSink.hdfs.fileType = DataStream
#每个批次刷新到HDFS上的events数量
table1.sinks.hdfsSink.hdfs.batchSize = 1000
table1.sinks.hdfsSink.hdfs.threadsPoolSize = 100
table1.sinks.hdfsSink.hdfs.idleTimeout = 3600
#table1.sinks.hdfsSink.hdfs.filePrefix = %H
#table1.sinks.hdfsSink.hdfs.fileSuffix = txt
# 指定从哪个channel sink数据
table1.sinks.hdfsSink.channel = fileChannel
拦截器
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
/**
* @author lyl
* @date 2019/1/7
*/
public class BaseInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> resultList = new ArrayList<>();
for (Event event : list) {
Event r = intercept(event);
if (r != null) {
resultList.add(r);
}
}
return resultList;
}
@Override
public void close() {
}
}
// -------------------华丽分割线------------------------
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.log4j.Logger;
import www.lyl.com.flume.model.CommonConstants;
import www.lyl.com.flume.model.KafkaCommonObj;
import www.lyl.com.flume.util.Tool;
import java.util.Map;
/**
* @author lyl
* @Desc TODO
* @Date 2019/2/28
*/
public class CrmInterceptor extends BaseInterceptor{
private Logger logger = Logger.getLogger(CrmInterceptor.class);
private CrmInterceptor() {
}
@Override
public Event intercept(Event event) {
String topic = "";
String shardGrp="";
try {
Map<String, String> headers = event.getHeaders();
String msg = new String(event.getBody());
if(StringUtils.isNotBlank(msg)){
KafkaCommonObj kco = JSON.parseObject(msg,KafkaCommonObj.class);
logger.info("msg:"+msg);
topic = kco.getTopic();
String tableName = kco.getTableName();
String className = Tool.lineToHump(tableName);
String modelPath = "www.lyl.com.flume.model";
Class cls = Class.forName(modelPath+"."+className);
String dataJson = kco.getData();
Object obj = JSON.parse(dataJson);
StringBuffer sb = new StringBuffer();
if (obj instanceof JSONArray){
JSONArray jsonArray = JSON.parseArray(dataJson);
logger.info("本次flume处理数据条数:"+jsonArray.size());
if (jsonArray.size()>0){
for (int i=0; i<jsonArray.size();i++){
sb.append(JSONObject.parseObject(jsonArray.getString(i),cls).toString());
if (i!=jsonArray.size()-1) {
sb.append("\n");
}
}
}
}else{
logger.info("本次flume处理数据条数:1");
sb.append(JSON.parseObject(dataJson.replace(sp,nul),cls).toString());
}
event.setBody(sb.toString().trim().getBytes());
headers.put(CommonConstants.TIMESTAMP,String.valueOf(kco.getTimeMillis()));
shardGrp = kco.getShardGrp();
}
headers.put("topic", topic);
headers.put("shardGrp", shardGrp);
}catch (Exception e){
logger.error("intercept fail");
e.printStackTrace();
}
return event;
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CrmInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
不足
每张表都要一组这样的配置,若后期有任何需要修改的地方。就都得改,比较费神。前期也想过,直接通配所有表,在拦截器内部进行区分,在配置中可以获取到headers中的数据。但是考虑到,单个进程所承受的压力太多,不得不按表分开处理。