目录
logstash配置文件示例
input {
beats {
port => "5011"
}
}
filter {
if "include" in [tags] {
mutate{
split => ["message","123"]
add_field => {
"beforeinclude" => "%{[message][0]}"
}
}
mutate{
gsub => ["beforeinclude", ",", ""]
}
mutate{
add_field => {
"StandardizedMessage" => "%{beforeinclude}123%{[message][1]}"
}
}
}
if "include" in [tags] or "exclude" in [tags]{
#分割“
mutate{
add_field => {
"StandardizedMessage2" => "%{StandardizedMessage}"
}
}
mutate{
#貌似不能用双引号分割
gsub => ["StandardizedMessage2", "\"", "||"]
}
mutate{
split => ["StandardizedMessage2","||"]
add_field => {
"PAGE" => "%{[StandardizedMessage2][3]}"
}
}
mutate{
split => ["StandardizedMessage"," "]
add_field => {
"URL" => "%{[StandardizedMessage][7]}"
}
add_field => {
"INFO" => "%{[StandardizedMessage][0]}"
}
}
mutate{
gsub => ["INFO", "\"", ""]
}
truncate {
fields => [ "URL" ]
length_bytes => 1000
}
grok {
match => { "message" => "%{HTTPDATE:requesttimestamp}" }
}
mutate{
add_field => {
"tem1" => "%{requesttimestamp}"
}
remove_field => [ "requesttimestamp" ]
}
date{
match=>["tem1","dd/MMM/yyyy:HH:mm:ss Z"]
target=>"logdatetime"
}
mutate{
remove_field => [ "tem1" ]
}
ruby{
code => "event.set('tem2', (event.get('logdatetime').time.localtime + 8*60*60))"
}
mutate{
add_field => {
"tmp3" => "%{tem2}"
}
remove_field => [ "tem2" ]
}
mutate{
split => ["tmp3", "."]
add_field => {
"ACCESS_TIME" => "%{[tmp3][0]}"
}
remove_field => [ "tmp3" ]
}
mutate{
gsub => ["ACCESS_TIME", "T", " "]
}
}
}
output{
if "exclude" in [tags] or "include" in [tags] {
jdbc {
driver_jar_path => "/home/elk/jar/jdbc/mysql-connector-java-5.1.46-bin.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://192.168.2.123:3306/cm?user=123&password=123"
statement => [ "INSERT INTO buff (URL,ACCESS_TIME) VALUES(?, ?)", "URL","ACCESS_TIME"]
}
}
}
可以参考Logstash最佳实践
http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
logstash的配置文件总共分为三个部分 输入input 处理filter 输出output
输入input
filebeat
从filebeat中传输过来数据(端口为filebeat中配置的端口)
input {
beats {
port => "5011"
}
}
标准输入(屏幕)
input{stdin{}}
读取文件(File)
input
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
有一些比较有用的配置项,可以用来指定 FileWatch 库的行为:
discover_interval
logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
exclude
不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
sincedb_path
如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile\.sincedb),可以通过这个配置定义 sincedb 文件到其他位置。
sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
start_position
logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 "beginning",logstash 进程就从头开始读取,有点类似 cat,但是读到最后一行不会终止,而是继续变成 tail -F。
读取网络数据(TCP)
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
读取mysql
input {
stdin { }
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/database"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "C:/Program Files (x86)/MySQL/Connector.J 5.1/mysql-connector-java-5.1.40-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"<br>
statement => "SELECT * FROM session"<br>
schedule => "* * * * *"
}
}
处理filter
基础知识
变量message为logstash 从input得到的一整条字符串。
可以用"%{AAA}"代表变量名AAA这个变量的值
"%{[AAA][3]}"代表变量名AAA这个数组变量(通常用split分割后)的第四个变量的值(数组从0开始)
filter 区段之内,是顺序执行的。
if使用
第一种用法是if 标签
如果这一次进入的流是job-exclude标签或job-include标签,则执行大括号内的操作
if "job-exclude" in [tags] or "job-include" in [tags]{
mutate{
split => ["message","123"]
add_field => {
"beforeexclude" => "%{[message][0]}"
}
}
}
第二种用法是变量的表达式
if "123" in [Name] {
mutate { remove_field => [ "Name" ] }
}
如果Name变量中有123这个字符串,则。。。
Grok 正则捕获
在grok中,支持以正则表达式的方式提取所需要的信息,其中,正则表达式又分两种,一种是内置的正则表达式(可以满足我们大部分的要求),一种是自定义的正则表达式,形式分别如下:
# 内置的正则表达式方式,下面的写法表示从输入的消息中提取IP字段,并命名为sip
%{IP:sip}
# 自定义的正则表达式,开始与终止符分别为(?与?),下面的写法表示获取除,以外的字符,并命名为log_type
(?<log_type>[^,]+?)
以一个具体的例子来说明grok用法,假定需要解析的消息内容样本如下:
日志类型:僵尸网络日志, 源IP:192.168.101.251, 源端口:63726, 目的IP:124.127.48.41, 目的端口:1390, 攻击类型:异常流量, 严重级别:低, 系统动作:被记录, URL:-\n
为了快速判断我们的正则表达式是否正确,不妨进行在线测试,地址为(在线测试地址)[http://grokdebug.herokuapp.com/],最后的结果为:
^日志类型:(?<log_type>[^,]+?), 源IP:%{IP:sip}, 源端口:%{NUMBER:sport:int}, 目的IP:%{IP:dip}, 目的端口:%{NUMBER:dport:int}, 攻击类型:(?<att_type>[^,]+?), 严重级别:(?<slevel>[^,]{1,}?), 系统动作:(?<sys_act>[^,]{1,}?), URL:(?<url>.+)$
在上面的表达式中,为了提高正则表达式的解析效率,我们需要进行整行匹配,于是添加了正则表达式的开始与结尾字符“^$”,此外为了便于统计与分析,我们还需要对相关类型进行转换(如将字符串转换为整数),例如我们所需要的端口为整数,表达式为%{NUMBER:dport:int}。需要注意的是,grok也就支持两种数据类型转换,分别为float与int。
例子
grok {
match => { "message" => "%{HTTPDATE:requesttimestamp}" }
}
将message中匹配httpdate格式的东西放入变量requesttimestamp
时间处理(Date)
filter {
grok {
match => ["message", "%{HTTPDATE:logdate}"]
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
注意:时区偏移量只需要用一个字母 Z 即可。
将变量匹配为相应格式
具体格式:
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
数据修改(Mutate)(重要)
filters/mutate 插件是 Logstash 另一个重要插件。它提供了丰富的基础类型数据处理能力。
变量增减
mutate{
add_field => {
"StandardizedMessage2" => "%{StandardizedMessage}"
}
}
新建变量名为StandardizedMessage2,它的值为名字为StandardizedMessage这个变量的值
mutate{
remove_field => [ "tem1" ]
}
删除tem1这个变量
类型转换
类型转换是 filters/mutate 插件最初诞生时的唯一功能。
可以设置的转换类型包括:"integer","float" 和 "string"。示例如下:
filter {
mutate {
convert => ["request_time", "float"]
}
}
注意:mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的 filters/ruby 插件完成。
字符串处理
gsub(替换)
仅对字符串类型字段有效
例如
gsub => ["beforeinclude", ",", "|"]
将beforeinclude这个变量的逗号换为|
gsub => ["urlparams", "[\\?#]", "_"]
split(分割)
filter {
mutate {
split => ["message", "|"]
}
}
随意输入一串以|分割的字符,比如 "123|321|adfd|dfjld*=123",可以看到如下输出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123"
],
"@version" => "1",
"@timestamp" => "2014-08-20T15:58:23.120Z",
"host" => "raochenlindeMacBook-Air.local"
}
注意split分割是永久性的,如果要用两种东西分割,请先用变量A=这个变量B,再分割变量a和b
具体使用如
mutate{
split => ["message","123"]
add_field => {
"before" => "%{[message][0]}"
}
}
join(合并)
仅对数组类型字段有效
我们在之前已经用 split 割切的基础再 join 回去。配置改成:
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
12|23 改为12,34
merge(合并数组)
合并两个数组或者哈希字段。把后面的变量合并到前面的那个
依然在之前 split 的基础上继续:
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "message"]
}
}
把后面的变量合并到前面的那个
我们会看到输出:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "123",
[5] "321",
[6] "adfd",
[7] "dfjld*=123"
],
"@version" => "1",
"@timestamp" => "2014-08-20T16:05:53.711Z",
"host" => "raochenlindeMacBook-Air.local"
}
如果 src 字段是字符串,会自动先转换成一个单元素的数组再合并。把上一示例中的来源字段改成 "host":
filter {
mutate {
split => ["message", "|"]
}
mutate {
merge => ["message", "host"]
}
}
结果变成:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123",
[4] "raochenlindeMacBook-Air.local"
],
"@version" => "1",
"@timestamp" => "2014-08-20T16:07:53.533Z",
"host" => [
[0] "raochenlindeMacBook-Air.local"
]
}
看,目的字段 "message" 确实多了一个元素,但是来源字段 "host" 本身也由字符串类型变成数组类型了!
rename
重命名某个字段,如果目的字段已经存在,会被覆盖掉:
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
update
更新某个字段的内容。如果字段不存在,不会新建。
replace
作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
执行次序
需要注意的是,filter/mutate 内部是有执行次序的。其次序如下:
rename(event) if @rename
update(event) if @update
replace(event) if @replace
convert(event) if @convert
gsub(event) if @gsub
uppercase(event) if @uppercase
lowercase(event) if @lowercase
strip(event) if @strip
remove(event) if @remove
split(event) if @split
join(event) if @join
merge(event) if @merge
filter_matched(event)
而 filter_matched 这个 filters/base.rb 里继承的方法也是有次序的。
@add_field.each do |field, value|
end
@remove_field.each do |field|
end
@add_tag.each do |tag|
end
@remove_tag.each do |tag|
end
输出output
mysql写入
output{
if "exclude" in [tags] or "include" in [tags] {
jdbc {
driver_jar_path => "/home/elk/jar/jdbc/mysql-connector-java-5.1.46-bin.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://192.168.2.123:3306/cm?user=123&password=123"
statement => [ "INSERT INTO buff (URL,ACCESS_TIME) VALUES(?, ?)", "URL","ACCESS_TIME"]
}
}
}
注意 jar包位置,数据库位置,用户名密码,以及变量的数量与values的?,后面的那些为变量的名称,
意味第一个?的值为url变量的值
标准输出(Stdout)
output {
stdout {
codec => rubydebug
workers => 2
}
}
解释
输出插件统一具有一个参数是 workers。Logstash 为输出做了多线程的准备。
其次是 codec 设置。codec 的作用在之前已经讲过。可能除了 codecs/multiline ,其他 codec 插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。换句话说。上面配置示例的完全写法应该是:
output {
stdout {
codec => rubydebug {
}
workers => 2
}
}
单就 outputs/stdout 插件来说,其最重要和常见的用途就是调试。所以在不太有效的时候,加上命令行参数 -vv 运行,查看更多详细调试信息。
保存成文件(File)
配置示例
output {
file {
path => "/home/elk/logstash-6.3.1/job.log"
codec => line { format => "custom format: %{message}"}
}
}