一、背景说明
随着Jdk 升级到1.8后,项目中使用parallelStream 等并行流调用方式逐渐增多,在进行jstack 查看线程时,发现大量的ForkJoinPool.commonPool-worker-xx线程,但是无法具体定位到是哪个业务线在使用,考虑使用自定义线程池使用parallelStream,详细代码如下所示
二、自定义线程池
具体实现代码如下所示:
package com.timer.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.atomic.AtomicLong;
public class ForkJoinPoolFactory {
//自定义线程池
private static class DefaultForkJoinPoolThread extends ForkJoinWorkerThread {
protected DefaultForkJoinPoolThread(ForkJoinPool pool) {
super(pool);
}
}
/**
* 获取线程创建工厂
* @param trheadName 线程名称
* @return
*/
private ForkJoinPool.ForkJoinWorkerThreadFactory getForkJoinFactory(String trheadName){
ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
final AtomicLong count = new AtomicLong(0);
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
DefaultForkJoinPoolThread thread = new DefaultForkJoinPoolThread(pool);
thread.setName(trheadName+"-"+count.getAndIncrement());
return thread;
}
};
return factory;
}
/**
* 获取并行线程池
* @param trheadName 线程名称
* @return
*/
public ForkJoinPool get(String trheadName){
ForkJoinPool.ForkJoinWorkerThreadFactory factory = getForkJoinFactory(trheadName);
ForkJoinPool customThreadPool = new ForkJoinPool(4,
factory,
null,
false);
return customThreadPool;
}
/**
* 获取并行线程池
* @param parallelism 线程数量
* @param trheadName 线程名称
* @return
*/
public ForkJoinPool get(int parallelism,String trheadName){
ForkJoinPool.ForkJoinWorkerThreadFactory factory = getForkJoinFactory(trheadName);
ForkJoinPool customThreadPool = new ForkJoinPool(parallelism,
factory,
null,
false);
return customThreadPool;
}
public static void main(String[] args) {
try{
/***指定线程名称调用**/
ForkJoinPool customThreadPool = new ForkJoinPoolFactory().get(4,"测试线程");
List<Long> names = new ArrayList<>();
for(int i = 1;i<50;i++){
names.add(new Long(i));
}
/***调用方式一 存在返回值 支持返回值 子线程全部走完后,才会继续向下走**/
Object obj = customThreadPool.submit(
new Callable<Object>() {
@Override
public Object call() {
names.parallelStream().forEach(o->{
System.out.println("方式一===="+Thread.currentThread().getName()+"===="+o);
});
return null;
}
}
).get();
/***与上面代码效果一致**/
Object obj2 = customThreadPool.submit(()->{names.parallelStream().forEach(o->
{
System.out.println("方式一===="+Thread.currentThread().getName()+"===="+o);
});
}).get();
System.out.println("=======================1");
/***调用方式二 不需要返回值 完全并行,不等待子线程是否执行完成 **/
customThreadPool.submit(()->{
names.parallelStream().forEach(o->{
System.out.println("方式二===="+Thread.currentThread().getName()+"===="+o);
});
});
System.out.println("=======================2");
}catch (Throwable e){}
}
}