Elastic Stock之日志收集:下篇
在上篇我们搭建了Elastic Stock(ES、Kibana、Logstash)来收集文件和微服务日志,统一收集,集中管理,对于开发、运维和DB来说都相当方便,给业务升级、扩展带来了更多的可能性和优化空间。但是这个方案在对于大型项目时还是有不足的,比如一个交易系统,或者一个抢单系统,在某个时刻突然流量暴增,在这个情况下作为数据收集中心的Logstash很可能会被撑爆,解决方案针对不同业务系统大家有自己的处理方式。使用消息队列对流量进行削峰这是比较通用的方案,同样也是Elastic支持的方案,可选的组件有redis、kafka、rabbitmq,这里我们选择使用rabbitmq来做数据采集队列以使我们的数据采集系统更加稳定。
此博客使用的环境
RabbitMQ:3.8.2
Logstash:6.2.2
spring-rabbit:2.1.2.RELEASE
Logstash集成RabbitMQ
此方案对于是不是SpringBoot项目没有要求。
因为看到网上大部分文章都是基于SpringBoot的所以特别给大家说一句:使用ELK+RabbitMQ采集日志等数据SpringMVC项目也没问题的。
RabbitMQ配置
RabbitMQ新建用户的时候不要忘了选择用户角色哦。不然会出现[com.rabbitmq.client.impl.ForgivingExceptionHandler] [AMQP Connection 192.168.91.129:5672] [] - An unexpected connection driver error occured java.net.SocketException: Socket Closed
RabbitMQ部署
RabbitMQ设置
首先新建一个exchange
再新建一个queue
绑定queue到exchange
以上是为了减少博客篇幅做了最小化配置,如果有其它要求请根据具体需要做配置。
调整Logstash管道配置
test.conf
input {
file {
path => ["/usr/share/logstash/pipeline/test.log"]
start_position => "beginning"
}
tcp {
# host:port就是上面appender中的 destination,这里其实把logstash作为服务,使用9600端口接收logback发出的消息
# host => "127.0.0.1"
port => 5044
mode => "server"
tags => ["tags"]
codec => json_lines
}
# 用于rabbitmq数据输入源配置
rabbitmq {
host => "172.17.0.5"
port => 5672
user => admin
password => admin
durable => true
queue => "q_logstash"
codec => plain
}
}
filter {
mutate {
gsub => ["message", "\r", ""]
}
dissect {
mapping => {"message" => "%{date} %{+date} [%{task} %{+task}] [%{type}] %{class} - %{info}"}
}
}
output {
stdout { codec => rubydebug }
#输出到es , es的IP指定为docker网络IP
elasticsearch { hosts => "172.17.0.2:9200" }
}
日志出现以下内容表示rabbitmq已经成功连接。
给项目使用RabbitMQ来传递日志到Logstash
引入依赖
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
Logback配置
logback.xml
<!-- 增加RabbitMQ传递日志配置-->
<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern>
<![CDATA[%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}.%method %line - %msg%n ]]>
</pattern>
</layout>
<host>192.168.91.129</host>
<port>5672</port>
<username>loguser02</username>
<password>loguser02</password>
<!-- <username>admin</username>
<password>admin</password>-->
<applicationId>rabbitlog</applicationId>
<routingKeyPattern>rabbitlog</routingKeyPattern>
<declareExchange>true</declareExchange>
<exchangeType>direct</exchangeType>
<exchangeName>ex_logstash</exchangeName>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>true</durable>
<deliveryMode>PERSISTENT</deliveryMode>
</appender>
<!-- ... ... -->
<!-- 日志输出级别-->
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="rollingFile"/>
<!-- <appender-ref ref="stash"></appender-ref>-->
<appender-ref ref="AMQP"></appender-ref>
</root>
测试
启动项目
出现以下日志表明Logback与RabbitMQ连接正常。
Logstash终端显示如下日志
Kibana监控
以上后没问题既表示我们的ELK+RabbitMQ工作正常。
数据采集扩展:Filebeat
FILEBEAT是Elastic stock中Beat套件中的一个,针对文件的轻量型日志采集器。
当您要面对成百上千的服务器、虚拟机和容器生成的日志时,Filebeat
将为您提供一种轻量型方法,用于转发和汇总日志与文件,让简单的事情不再繁杂。启动 Filebeat 后,打开 Logs UI,直接在
Kibana 中观看对您的文件进行 tail
操作的过程。通过搜索栏按照服务、应用程序、主机、数据中心或者其他条件进行筛选,以跟踪您的全部汇总日志中的异常行为。Filebeat 能够读取并转发日志行,如果出现中断,还会在一切恢复正常后,从中断前停止的位置继续开始。
Filebeat 让简单的事情简单化
对于一些中间件、系统等的文件日志建议还是使用beats来采集日志,不要直连Logstash,特别是业务系统部件(nginx、keepalived、mysql…)较多时。
针对于Filebeat的介绍官方已经介绍的很生动。
对于它的使用官方手册也已经介绍的很详细了。
所以我就不再这里再搬运了,大家有需要自己到官网看吧。
Logstash要使用Filebeat的话要增加配置如下
我已经把TCP监听注释掉了,因为会端口冲突。
通常不建议大家用直连Logstash的方式而是通过消息队列、Filebeat来采集数据。