前言
在日常问题排查中,我们经常在ELK中根据traceId来查询请求的日志链路,在同步请求中,根据traceId一站到底,很爽,那如果是异步请求该如何处理呢?项目中的异步请求都是结合线程池来开启异步线程,下面结合slf4j中的MDC和线程池来实现异步线程的traceId传递。
重写ThreadPoolTaskExecutor中方法
下面的工具类,分别在Callable和Runnable异步任务执行前通过MDC.setContextMap(context)设置请求映射上下文
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* @desc: 定义MDC工具类,支持Runnable和Callable两种,目的就是为了把父线程的traceId设置给子线程
*/
public class MdcUtil {
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (CollectionUtils.isEmpty(context)) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {
// 清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
MDC.clear();
}
};
}
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
if (CollectionUtils.isEmpty(context)) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}
下面定义一个ThreadPoolMdcExecutor 类来继承ThreadPoolTaskExecutor 类,重写execute和submit方法
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* @desc: 把当前的traceId透传到子线程特意加的实现。
* 重点就是 MDC.getCopyOfContextMap(),此方法获取当前线程(父线程)的traceId
*/
public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable task) {
super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
下面定义线程池,就可以使用ThreadPoolMdcExecutor
@Bean(name = "callBackExecutorConfig")
public Executor callBackExecutorConfig() {
ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor();
// 配置核心线程数
executor.setCorePoolSize(10);
// 配置最大线程数
executor.setMaxPoolSize(20);
// 配置队列大小
executor.setQueueCapacity(200);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-Thread-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 执行初始化
executor.initialize();
return executor;
}
定义好线程池之后,我们就可以使用callBackExecutorConfig线程池进行异步任务,避免异步线程中的traceId丢失。
线程池增强
上面是通过继承ThreadPoolTaskExecutor来,重写execute和submit方法,设置MDC.setContextMap(context)设置上下文,我们也可以通过实现TaskDecorator 接口来增强线程池
public class TaskDecoratorForMdc implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
try {
Optional<Map<String, String>> contextMapOptional =ofNullable(MDC.getCopyOfContextMap());
return () -> {
try {
contextMapOptional.ifPresent(MDC::setContextMap);
runnable.run();
} finally {
MDC.clear();
}
};
} catch (Exception e) {
return runnable;
}
}
}
接下来,定义线程池,对线程池进行增强
@Bean(name = "callBackExecutorConfig")
public Executor callBackExecutorConfig() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
// 配置核心线程数
executor.setCorePoolSize(10);
// 配置最大线程数
executor.setMaxPoolSize(20);
// 配置队列大小
executor.setQueueCapacity(200);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-Thread-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// abort:在调用executor执行的方法中抛出异常 RejectedExecutionException
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//线程池增强
threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorForMdc());
// 执行初始化
executor.initialize();
return executor;
}
总结
上面两种方式其实本质都是通过Mdc来进行异步线程间的traceId同步,可以看下Mdc的源码,最终还是通过InheritableThreadLocal来实现子线程获取父线程信息
public class BasicMDCAdapter implements MDCAdapter {
private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal =
new InheritableThreadLocal<Map<String, String>>() {
protected Map<String, String> childValue(Map<String, String> parentValue) {
return parentValue == null ? null : new HashMap(parentValue);
}
};
//省略若干
......
}