一、场景
最近做工作的时候,我们需要将树结构的数据存入到Redis中,由于数据量比较大,在存入Redis的时候使用了多线程。
二、四种方式创建线程池的方式
我以前的文章总结过四种方式:https://blog.csdn.net/weixin_42228950/article/details/100583510
三、为什么阿里不允许使用Executors创建线程池?
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
我们通过JDK四种创建线程池的方式源码可以看到,实际也是通过ThreadPoolExecutor来构建的线程池,那么阿里为什么要禁止使用呢?
newCachedThreadPool:可缓存的线程池,corePoolSize = 1,maximumPoolSize = Interger.MAX_VALUE;队列使用的是SynousQueue,它的内部没有结构来存储任何元素,如果任务数很大,而创建的那个线程一直没有处理完成任务,就会一直创建新线程来处理,最终有可能会造成OOM。
newFixedThreadPool:固定线程数量的线程池,corePoolSize=maximumPoolSize,采用的无界阻塞队列LinkedBlockingQueue,如果任务量很大,线程池来不及处理,就会将任务一直添加到队列中,最终有可能会造成OOM。
newSingleThreadExecutor:单例线程池,corePoolSize = maximumPoolSize = 1,采用是无界队列LinkedBlockingQueue如果任务量很大,线程池来不及处理,最终有可能会造成OOM。
newScheduledThreadPool:周期线程池,不做分析。
四、ThreadPoolExecutor构建单例线程池
单例线程池代码如下:
package com.bootMybatis.thread;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
* 线程池单例
* @Date: 2020/7/25 22:06
*/
@Component
public class ThreadPoolSingleton {
private static volatile ExecutorService executorService;
private static int corePoolSize = 10;//核心池大小
private static int maximumPoolSize = 15;//最大线程数
private static long keepAliveTime = 10;//活跃时间
private static int capacity = 45;//队列大小
public ThreadPoolSingleton(){}
public static ExecutorService getExecutorService() {
if (null == executorService){
synchronized (ThreadPoolSingleton.class) {
if (executorService == null) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build();
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(capacity);
executorService = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.DAYS,workQueue,threadFactory,new ThreadPoolExecutor.AbortPolicy());
}
}
}
return executorService;
}
}
线程类代码:
package com.bootMybatis.thread;
import com.bootMybatis.mapper.StudentMapper;
import com.bootMybatis.model.Student;
import java.util.List;
/**
* @Author: yd
* @Date: 2020/7/27 20:04
*/
public class BatchInsertDataRunnable implements Runnable{
private StudentMapper studetMapper;
private List<Student> list;
public BatchInsertDataRunnable(StudentMapper studetMapper, List<Student> list) {
this.studetMapper = studetMapper;
this.list = list;
}
@Override
public void run() {
// 执行业务
}
}
线程类调用类:
public void insertData(){
try {
ThreadPoolSingleton.getExecutorService().execute(new BatchInsertDataRunnable(studetMapper,new ArrayList<Student>()));
}catch (Exception e){
}
}
五、如何设置线程池参数
corePoolSize:线程池核心线程数量
maximumPoolSize:线程池最大线程数量
keepAliveTime:非核心线程的超时时长
unit:超时时长单位
workQueue :线程池中的队列
handler:当线程池无法处理任务时的处理策略
我们一般把任务分为三种:CPU密集型的任务、IO密集型任务、混合型任务;
CPU密集型的任务:,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。
设置线程大小一般为:CPU数量+1
IO密集型任务:CPU性能相对硬盘、内存要好很多,系统运作时,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。
设置线程大小一般为:CPU数量 * 2
混合型任务:要根据实际情况分析。
总结:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。