logback+kafka+ELK实现日志记录
前言
环境准备
配置Logback
pom文件改造
新增logback-spring.xml
boot配置文件一下
- 怎么去下载安装 我就不多说了,直接上代码。
日志平台
业务思路
用户请求进入后台根据搜索的参数去查询内容 返回所有信息,返回json数据,当用户查看详情后 根据查询到的内容里面的traceId去es里查询和traceId相等的所有日志信息 最后返回给前端
业务流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
日志查询结构图
日志查询模块包括日志精准查询、日志的模糊查询。精准查询可以根据异常码、时间间隔,准确的查出该用户这段时间内做了什么事情。而模糊查询,可以根据关键字,查出这段时间内包含该关键字的日志记录。
需要在yml文件里面添加kafka配置信息
spring:
kafka:
bootstrap-servers: 10.10.3.107:9092 # kafka的ip和端口
consumer:
group-id: kbaas-log
#指定消费者是否自动提交偏移量,默认是true,设置false需要自己什么时候提交偏移量
enable-auto-commit: true
#自动提交偏移量的频率
auto-commit-interval: 1000
#kafka的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #对应java数据类型
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
#序列化
#设置value.serializer为org.apache.kafka.common.serialization.StringSerializer,设置value的序列化方式为字符串,我们可以发送string类型的消息
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
在pom文件导入log-spring-boot-starter的jar和kafka,logback的jar
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!--logback-kafka-appender依赖-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
<!--导入ELK为json-->
<!-- logback-->
<dependency>
<groupId>com.ks.kbaas</groupId>
<artifactId>kbaas-commons-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.1</version>
</dependency>
创建自定义注解:Auth
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Auth {
/**
* @Description: 模块名称
* @Author PANCHAO
* @param
* @return java.lang.String
* @Date 17:42 2020/5/12
*/
String serve();
/**
* @Description: 操作的描述
* @Author PANCHAO
* @param
* @return java.lang.String
* @Date 17:42 2020/5/12
*/
String default();
}
注解介绍
/**
* @Target能标注在注解类型上
* ElementType
* TYPE
* 类和接口
* FIELD
* 属性
* METHOD
* 方法
* PARAMETER
* 方法中的参数
* CONSTRUCTOR
* 构造器
* ANNOTATION_TYPE
* 注解类型
* PACKAGE
* 包
* 指明了当前的注解类能标注在类的哪些部位
* @Target({ElementType.TYPE,ElementType.METHOD})
* 说明这个被标注的注解,既可以标注在类上也可以标注在方法上
*/
@Target(ElementType.TYPE)
/**
* 是否让@Description这个注解出现在帮助文档中
*/
@Documented
/**
* @Retention
* 说明@Description注解的作用域
* RetentionPolicy.SOURCE
* @Description这个注解在源文件中起作用
* RetentionPolicy.CLASS
* @Description这个注解即在源文件中起作用又在class文件中起作用
RetentionPolicy.RUNTIME
@Description在源文件、class文件、运行的过程中都起作用
*/
@Retention(RetentionPolicy.RUNTIME)
public @interface Description {
String value();//属性
}
3.在方法上加自定义注解
@GetMapping("/test")
@Auto(default = "操作内容",serve = "服务名")
public void LogSearch() {
}
4.需要在启动类上加@EnableLogging注解
@SpringBootApplication
@EnableDiscoveryClient
@EnableLogging
public class KbaasLogApplication {
public static void main(String[] args) {
SpringApplication.run(KbaasLogApplication.class, args);
}
}
logback配置文件修改
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志格式 -->
<property name="CONSOLE_LOG_PATTERN" value="%d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n"/>
<!-- 控制台日志 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoder的默认实现类是ch.qos.logback.classic.encoder.PatternLayoutEncoder -->
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"timestamp":"%d{yyyy-MM-dd HH:mm:ss}",
"level": "%p",
"logger" : "%logger",
"logThread": "%thread",
"serverName": "${server-name}",
"message": "%{msg}",
"traceId": "%X{traceId}",
"stackTrace": "%ex",
"ip": "%X{ip}",
"host" :"%property",
"pid": "${PID:-}",
"url" :"%X{url}",
"parameter": "%X{parameter}",
"parentSpanId": "%X{X-B3-ParentSpanId:-}",
"serve" : "%X{serve}",
"default" : "%X{default}"
}
</pattern>
</pattern>
</providers>
</encoder>
<!--kafka topic 需要与配置文件里面的topic一致 否则kafka会沉默并鄙视你-->
<topic>kbaas</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=kafka的ip和端口</producerConfig>
<producerConfig>acks=1</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="KafkaAppender"/>
</root>
</configuration>
通过AOP做日志收集
@Aspect
@Component
public class LogAspect {
@Resource
Slf4jUtils log;
/**
* 拦截所有控制器方法
*/
@Pointcut("execution(public * com.CC.CCC.CCCCC.controller..*.*(..))")
public void webLog() {
}
@Around("webLog()")
public Object aroundMethod(ProceedingJoinPoint joinPoint) throws Exception {
/*result为连接点的放回结果*/
Object result = null;
/*result为连接点的放回结果*/
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
// 记录下请求内容
//如果是表单,参数值是普通键值对。如果是application/json否则request.getParameter是取不到的。
String ip = request.getRemoteAddr();
String method = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName();
JSONObject jsonObject = new JSONObject();
List<Object> argList = new ArrayList<>();
if ("application/json".equals(request.getHeader("Content-Type"))) {
for (Object arg : joinPoint.getArgs()) {
// request/response无法使用toJSON
if (arg instanceof HttpServletRequest) {
argList.add("request");
} else if (arg instanceof HttpServletResponse) {
argList.add("response");
} else {
argList.add(JSONObject.toJSON(arg));
}
}
} else {
//记录请求的键值对
for (String key : request.getParameterMap().keySet()) {
JSONObject js = new JSONObject();
js.put(key, request.getParameter(key));
argList.add(js);
}
}
MDC.put("url", request.getRequestURL().toString());
MDC.put("ip", ip);
MDC.put("method", method);
MDC.put("arg", JSON.toJSON(argList).toString());
/*执行目标方法*/
try {
/*返回通知方法*/
result = joinPoint.proceed();
} catch (Throwable e) {
/*异常通知方法*/
MDC.put("throwable", e.getMessage());
StackTraceElement[] sta = e.getStackTrace();
StringBuffer str = new StringBuffer();
for (int i = 0; i < 15 && i < sta.length; i++) {
str.append(sta[i] + "\n");
}
log.error(str.toString());
//抛出异常
throw (Exception) e.fillInStackTrace();
}
/*后置通知*/
jsonObject.put("message", result);
log.info(jsonObject.toJSONString());
return result;
}
}
这里我通过MDC扩展了logback输出,所以需要在logback.xml中新增这几个,如:
"ip": "%X{ip}",
"url": "%X{url}",
"method": "%X{method}",
"arg": "%X{arg}",
"throwable": "%X{throwable}"
以上是日志记录logback发送kafka的代码操作步骤