线上日志结构化收集
写这篇文章的起因是因为在我们的生产环境中,部署节点比较多,机器也比较多,平时看日志只能一台一台登录的去查看,很难受。
单纯使用ELK的话,日志内容是非结构化的,就是一大坨文字,非常不直观,因此把我线上结构化日志的流程分享一下
1.日志规范定义
不管使用什么方式收集,日志的来源是一定要符合我们预想的结构的,否则日志结构化就无从谈起,所以第一步,在我们所有的java应用中首先要规范logback.xml或者其他的日志文件格式,以下是目前我规范出的日志格式
<pattern><![CDATA[
%n%-4r | %d{yyyy-MM-dd HH:mm:ss} | %X{method} | %X{requestURIWithQueryString} | [ip=%X{remoteHost}, ref=%X{referrer}, ua=%X{userAgent}] | %level | %logger{35} | %m%n
]]></pattern>
其日志格式分别为 线程id | 时间 | http请求方式 | http请求路径 | 请求方的信息 ip、设备等 | 日志级别 | 日志内容
这里使用了" | " 来分隔不同的数据,方便我们转换的时候能更加方便一点
2. filebeat配置
https://www.elastic.co/cn/downloads/past-releases/
elastic旗下的技术栈都可以在这个网址下载,还可以任意选择版本。
我是推荐在自己本地使用传统方式进行安装,而不是homeberw或者npm,因为方便控制。
至于filebeat是什么,我在这里就不细讲了,filebeat属于elastic旗下beats的一种,专门用来收集日志用,并且消耗资源非常小,轻量级采集器。
如果只使用 filebeat + logstash 那么配置方式如下
filebeat.inputs:
- type: log
enabled: true
paths:
- /Users/lvqiushi/idea/engine/logs/test.log
# 多行日志合并
multiline.pattern: ^[0-9][0-9][0-9][0-9]
multiline.negate: true
multiline.match: after
multiline.timeout: 10s
# 额外增加的字段
fields:
serverip: vm58
appname: qkcheat
filebeat.config.modules:
path: ${
path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output:
logstash:
hosts: ["127.0.0.1:5044"]
processors:
- drop_fields:
#fields: ["beat", "input", "source", "offset","id","metadata"]
#fields: ["beat.hostname", "beat.name", "beat.version", "input_type", "beat"]#
#fields: ["agent.hostname", "agent.name", "agent.version", "input_type", "beat"]
fields:
- beat
- host
- input
- source
- offset
- prospector
- tags
- agent
- ecs
- log
如果要接入kafka的话,只需要更改一下output就好了
output.kafka:
enabled: true
# kafka集群地址
hosts: [xxx","xxx","xxx"]
topic: 'xx'
partition.hash:
reachable_only: true
compression: gzip
重点讲一下日志合并问题,通常程序打印的错误日志是包括很多行的,所以要在filebeat或者logstash中将多行日志合并起来,我是推荐在filebeat采集的时候就合并的。
至于filebeat中 multiline.*具体的参数含义大家百度一下就可以知道了。
3 logstash配置
logstash配置的话会比较难一点(logstash 高版本自带了本文所使用的所有插件,如果是4.x或者5.x还需要安装插件),
首先要区分使用的是filebeat直连,还是 filebeat + kafka,因为如果经过了kafka这一层以后,收到的结构会变的不一样,需要转换一下,很坑。
至于解析日志的方式—
- 我目前所使用的的思路是,写ruby脚本,分隔我定义好的日志列分隔符,从而把所需的字段一个个取出去。
- 使用gork插件通过正则表达式的方式也可以实现,但是考虑到正则匹配会比较困难,且消耗的cpu要比简单的字符串分隔要高,就放弃了这种想法。
如果使用正则表达式匹配,可以在这个网址中进行尝试http://grokdebug.herokuapp.com/。
filebeat直连logstash下的配置
input {
# stdin{} 命令行输入调试使用
beats {
port => 5044
}
}
filter {
ruby {
code =>'
message = event.get("message")
arr = message.split(" | ")
if arr.length >= 4
event.set("thread", arr[0])
event.set("time", arr[1])
event.set("method", arr[2])
event.set("url", arr[3])
event.set("base_params", arr[4])
event.set("level", arr[5])
event.set("java_class", arr[6])
event.set("log", arr[7])
fileds = event.get("fields")
event.set("server_ip",fileds["serverip"] )
event.set("app_name",fileds["appname"] )
event.set("[@metadata][show]", "is")
end
'
remove_field => [ "message", "tags", "fileds"]
}
}
output {
if[@metadata][show] == "is" {
stdout {}
}
}
filebeat + kafka + logstash配置(只增加了一个json格式化,因为从kafka过来的mq是纯字符串)
input {
kafka {
bootstrap_servers=>"xx"
topics=>["xx"]
}
}
filter {
json {
source => "message"
}
ruby {
code =>'
message = event.get("message")
arr = message.split(" | ")
if arr.length >= 4
event.set("thread", arr[0])
event.set("time", arr[1])
event.set("method", arr[2])
event.set("url", arr[3])
event.set("base_params", arr[4])
event.set("level", arr[5])
event.set("java_class", arr[6])
event.set("log", arr[7])
fileds = event.get("fields")
event.set("server_ip",fileds["serverip"] )
event.set("app_name",fileds["appname"] )
event.set("[@metadata][show]", "is")
end
'
remove_field => [ "message", "tags", "fileds"]
}
}
output {
if[@metadata][show] == "is" {
stdout {}
}
}
上面配置中的output都是直接打印在了控制台内,进行调试试用,如果调试完成,再换成合适的output输出,考虑到日志量庞大的问题,我选择的是放到es中进行存储。
由于日志放进ES已经被格式化了,只需要再让前端写一个很简单很简单的页面,来方便查询就可以了。
至于为什么不直接用kibana,kibana的搜索条件实在是太难用了,搜索日志应该尽可能的简单好用为主,例如我们最常用的grep功能,所以只需要放两个搜索框即可。