基于flume与solr搭建的日志收集系统
本文介绍如何使用flume+solr搭建日志收集与分析系统。flume作为日志收集的中间件,接收来自应用程序的消息,经过预处理之后,将消息提交到solr,solr完成持久化及建立索引后,可方便的提供查询检索功能。能够通过检索条件,快速地查找到想要的日志信息,省去了去日志文件里,去翻一个又一个的日志文件。为故障定位,数据分析提供了极大的遍历。
下面就通过个人的实践经验,一步一步的为大家介绍安装部署,配置和上手使用的过程。以及在实践过程中所趟过的坑。
环境要求
linux 6.5及以上
JDK1.8 及以上
ZooKeeper 3.4.6
solr基于zookeeper实现集群化,各solr节点注册到zookeeper上,由zookeeper完成调度。
zookeeper的配置
zookeeper采用集群模式,这里就不介绍zookeeper集群的搭建方式了,网上由较丰富的资料,这里介绍一下在zookeeper上创建solr目录。在刚接触zookeeper的时候,在这个地方卡住了。
到zookeeper bin目录,运行./zkCli.sh -server 192.168.17.128:2181
,进入到zookeeper,输入命令create /solr ""
创建solr节点
安装配置solr
安装:
本文介绍介基于solr-6.4.2版本的搭建过程。
http://archive.apache.org/dist/lucene/solr/6.4.2/solr-6.4.2.tgz
下载文件后,解压到指定目录。解压后,进入到bin目录,创建启动脚本start.sh,内容如下:
#!/bin/sh
export ZK_HOST=192.168.17.128:2181/solr
./solr start -c -z $ZK_HOST -p 8394 -force
运行start.sh就能够启动solr集群了。solr5.0之后,集成了jetty,能够很方便地运行solr web工程。
在浏览器访问http://192.168.17.128:8393,就可进入到solr界面,如下图:
创建collection:
- 进入到SOLR_HOME/server/solr/configsets/,拷贝一份sample_techproducts_configs/到当前目录,根据需要命名,如hello
- 进入到 ./hello/conf目录,修改schema.xml文件,定义collection字段属性,详情参考 https://www.cnblogs.com/DASOU/p/5907645.html
- 修改solrconfig.xml配置,配置hard commit和soft commit策略,如下所示,将openSearcher属性值设置为true。【此处应该根据需要,灵活配置策略,在效率与实时性上做好平衡,参见 https://blog.csdn.net/limengliang4007/article/details/78092252】
<autoCommit>
<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>
<openSearcher>true</openSearcher>
</autoCommit>
<!-- softAutoCommit is like autoCommit except it causes a
'soft' commit which only ensures that changes are visible
but does not ensure that data is synced to disk. This is
faster and more near-realtime friendly than a hard commit.
-->
<autoSoftCommit>
<maxTime>${solr.autoSoftCommit.maxTime:-1}</maxTime>
</autoSoftCommit>
- 上传配置到zookeeper,创建collection。回到SOLR_HOME/bin目录,执行
./solr create_collection -c hello -d hello -n hello -shards 1 -replicationFactor 1 -p 8393
-c为collection名称,-d为collection配置文件目录名称,-shards 该collection要创建的分片数,-replicationFactor 每个分片要创建的副本数【建议为奇数】 - 重启solr,完成solr的配置
flume的安装配置
安装:
本文基于flume-1.7.0版本的搭建过程
http://mirrors.hust.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
下载文件,解压到制定目录下
配置:
- 进入FLUME_HOME/conf/目录,将flume-conf.properties.template重命名为flume-conf.properties,内容修改为如下所示,配置的说明请参考 https://blog.csdn.net/qq_26418435/article/details/51604434
agent.sources = avroSrc1
agent.channels = fileChannel1 fileChannel2
agent.sinks = solrSink1 fileSink1
# Fan out the source to 2 kind of channel
agent.sources.avroSrc1.type = avro
agent.sources.avroSrc1.port = 41418
agent.sources.avroSrc1.bind = 192.168.17.128
agent.sources.avroSrc1.channels = fileChannel1 fileChannel2
agent.sources.avroSrc1.selector.type = replicating
#channel
agent.channels.fileChannel1.type = SPILLABLEMEMORY
agent.channels.fileChannel1.memoryCapacity = 10000
#agent.channels.fileChannel1.overflowCapacity = 10000000
agent.channels.fileChannel1.checkpointDir = /opt/flume/data/channel1/checkpoint
agent.channels.fileChannel1.dataDirs = /opt/flume/data/channel1/data
agent.channels.fileChannel2.type = SPILLABLEMEMORY
agent.channels.fileChannel2.memoryCapacity = 10000
#agent.channels.fileChannel2.overflowCapacity = 10000000
agent.channels.fileChannel2.checkpointDir = /opt/flume/data/channel2/checkpoint
agent.channels.fileChannel2.dataDirs = /opt/flume/data/channel2/data
# Each sink's type must be defined
#solr sink to collect all data to solr to index
agent.sinks.solrSink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent.sinks.solrSink1.channel = fileChannel1
agent.sinks.solrSink1.morphlineFile = /opt/flume/conf/morphline.conf
agent.sinks.solrSink1.morphlineId = morphline1
# file sink
agent.sinks.fileSink1.type = file_roll
agent.sinks.fileSink1.channel = fileChannel2
agent.sinks.fileSink1.sink.directory = /opt/flume/data/sink/41418
agent.sinks.fileSink1.sink.rollInterval = 86400
- 在同一目录下新建morphline.conf文件,内容如下,collection为上面solr添加的collection,zkHost是solr集群注册的zookeeper地址
SOLR_LOCATOR : {
# Name of solr collection
collection : "hello"
# ZooKeeper ensemble
zkHost : "192.168.17.128:2181/solr"
# Max number of documents to pass per RPC from morphline to Solr Server
batchSize : 10
}
morphlines : [
{
id : morphline1
# Import all morphline commands in these java packages and their subpackages.
# Other commands that may be present on the classpath are not visible to this morphline.
importCommands :["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{ readJson { } }
{
java {
imports : """
import com.fasterxml.jackson.databind.JsonNode;
import org.kitesdk.morphline.base.Fields;
import org.kitesdk.morphline.api.Record;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import java.util.Map;
import java.util.Iterator;
// import com.cloudera.cdk.morphline.base.Fields; // use this for CDK
"""
code : """
JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);
Record out = new Record();
Iterator<Map.Entry<String, JsonNode>> fields = rootNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> node = fields.next();
String key = node.getKey();
JsonNode value = node.getValue();
if (value == null) {
continue;
}
//process int field
if("_t".equals(key) || "_l".equals(key)){
try{
out.put(key, value.asLong());
}catch(Exception e){}
}else if(value.isBoolean()){
out.put(key, value.asBoolean());
}else if (value.isLong()) {
out.put(key, value.asLong());
} else if(value.isDouble()) {
out.put(key, value.asDouble());
}else if(value.isTextual()){
out.put(key, value.asText());
}
}
return child.process(out);
"""
}
}
# convert timestamp field to native Solr timestamp format
# such as 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : _time
inputFormats : ["unixTimeInMillis"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : Asia/Chongqing
}
}
{
convertTimestamp {
field : _rtime
inputFormats : ["unixTimeInMillis"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : Asia/Chongqing
}
}
{
convertTimestamp {
field : _atime
inputFormats : ["unixTimeInMillis"]
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : Asia/Chongqing
}
}
{ logInfo { format :"output converted json: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]
- 拷贝solr的包到FLUME_HOME/lib目录下,solr就是上述配置的solr目录下的包。注意版本不能弄错,不然会出现flume无法找到solr对应collection的问题。solr的目录包括:SOLR_HOME/contrib/morphlines-core/lib/, SOLR_HOME/dist/, SOLR_HOME/dist/solrj-lib/, SOLR_HOME/server/solr-webapp/webapp/WEB-INF/lib/
- 除此之外还缺少一个包 kite-morphlines-json-1.1.0.jar,下载此包,添加到FLUME_HOME/lib中
- 回到 FLUME_HOME/bin 目录下,创建start.sh启动脚本,内容如下:
#!/bin/sh
nohup ./flume-ng agent -n agent -c ../conf -f /opt/flume/conf/flume-conf-avro.properties -Xmx1025m -Dflume.root.logger=INFO,console &
- 启动脚本,在当前目录的nohup.out文件中可以看到日志,如果报某个目录不存在的异常,手动创建即可,再重启flume
应用
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
public class FlumeTest {
public void sendCase() throws Exception {
ObjectMapper mapper = new ObjectMapper();
RpcClient client = RpcClientFactory.getDefaultInstance("192.168.17.128", 41418);
Map message = messBuild("s00000000001", "19438340293", System.currentTimeMillis(), System.currentTimeMillis(),
"POST", "{\"name\":\"Mike\"}", "{\"resp\":\"hello\"}");
Event event = EventBuilder.withBody(mapper.writeValueAsBytes(mapper));
client.append(event);
client.close();
}
public Map messBuild(String sessionID, String phoneNo, long resultTime, long beginTime, String action, String req, String resp) {
Map<String, String> logMap = new HashMap<>();
logMap.put("_sid", sessionID);
logMap.put("_pn", phoneNo);
logMap.put("_time", String.valueOf(System.currentTimeMillis()));
logMap.put("_atime", String.valueOf(beginTime));
logMap.put("_rtime", String.valueOf(resultTime));
logMap.put("_act", action);
logMap.put("req", req);
logMap.put("_ret", resp);
return logMap;
}
}