自定义线程池
ThreadPoolExecutor的构造方法
public ThreadPoolExecutor(
int corePoolSize, //核心线程数目
int maximumPoolSize, //线程池最大线程数
long keepAliveTime, //闲置的非核心线程最大存活时间
TimeUnit unit, //存活时间的时间单位
BlockingQueue<Runnable> workQueue, //保存待执行任务的阻塞队列
ThreadFactory threadFactory, //创建线程的工厂
RejectedExecutionHandler handler) //饱和策略,当阻塞队列满并且线程数超过maximumPoolSize时采取的策略
复制代码
参数核心:corePoolSize,maximumPoolSize和workQueue,感觉ThreadFactory和RejectedExecutionHandler属于辅助。
任务类 Task
/**
* 任务类
*/
public class Task implements Runnable{
//运行的第几个线程
private final AtomicLong count=new AtomicLong(0);
//将ThreadPoolExecutor作为属性,用来查看当前Executor中阻塞队列的大小
private ThreadPoolExecutor threadPoolExecutor;
Task(ThreadPoolExecutor threadPoolExecutor){
this.threadPoolExecutor=threadPoolExecutor;
}
@Override
public void run() {
long l=this.count.getAndIncrement();
System.out.println(Thread.currentThread().getName()+" Start..."+l
+" blocking queue size: "+threadPoolExecutor.getQueue().size());
try{
Thread.sleep(3*1000);
}catch (InterruptedException ie){
System.out.println(ie.getMessage());
}
System.out.println(Thread.currentThread().getName()+" End..."+l);
}
}
复制代码
线程工厂类 CustomizeThreadFactory
/**
* 自定义线程工厂
*/
public class CustomizeThreadFactory implements ThreadFactory {
private final String factoryName;
//创建的线程序列号,与factoryName作为线程名
private final AtomicLong nextId;
CustomizeThreadFactory(String factoryName){
this.factoryName=factoryName;
this.nextId=new AtomicLong(1);
}
@Override
public Thread newThread(Runnable r) {
String threadName=this.factoryName+"-Task-"+this.nextId.getAndIncrement();
return new Thread(r,threadName);
}
}
复制代码
饱和策略类 CustomizeRejectedExecutionHandler
/**
* 自定义饱和策略
*/
public class CustomizeRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("task rejected. " + executor.toString());
}
}
复制代码
自定义线程池
/**
* 自定义线程池
*/
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
5, //corePoolSize,核心线程数5
10, //maximumPoolSize,最大线程数10
10, //闲置的非核心线程最大存活时间 10s
TimeUnit.SECONDS, //存活时间单位秒
new ArrayBlockingQueue<Runnable>(100), //保存待执行任务的阻塞队列 100
new CustomizeThreadFactory("ThreadPool-1"), //创建线程的工厂
new CustomizeRejectedExecutionHandler()); //饱和策略,当阻塞队列满并且线程数超过maximumPoolSize时采取的策略
Task task=new Task(threadPoolExecutor);
for(int i=0;i<10;i++){
threadPoolExecutor.execute(task);
}
threadPoolExecutor.shutdown();
}
}
复制代码
运行
ThreadPool-1-Task-2 Start...1 blocking queue size: 5
ThreadPool-1-Task-5 Start...4 blocking queue size: 5
ThreadPool-1-Task-3 Start...3 blocking queue size: 5
ThreadPool-1-Task-4 Start...2 blocking queue size: 5
ThreadPool-1-Task-1 Start...0 blocking queue size: 5
# 这里停顿了3秒,即执行run()中的sleep
ThreadPool-1-Task-3 End...3
ThreadPool-1-Task-1 End...0
ThreadPool-1-Task-1 Start...5 blocking queue size: 3
ThreadPool-1-Task-4 End...2
ThreadPool-1-Task-2 End...1
ThreadPool-1-Task-5 End...4
ThreadPool-1-Task-4 Start...7 blocking queue size: 2
ThreadPool-1-Task-3 Start...6 blocking queue size: 3
ThreadPool-1-Task-2 Start...8 blocking queue size: 0
ThreadPool-1-Task-5 Start...9 blocking queue size: 0
复制代码
自定义线程池中核心线程数为5,最大线程数为10,任务的阻塞队列大小100。
for循环中提交了10个任务让线程池中线程执行,但由运行结果可知线程池先让5个线程来执行任务,并非是直接10个线程来运行10个任务。为什么?答案在execute()源码中。
execute(Runnable command)源码分析
源码
ps:源码中的注释便说明了代码执行情况
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//ctl:AtomicInteger,高3位为线程池状态,低29位为线程个数
//if①
if (workerCountOf(c) < corePoolSize) {//判断线程个数是否小于定义的核心线程数
if (addWorker(command, true))//创建一个worker线程传入Runnable,true表示核心线程,false为非核心
return;
c = ctl.get();
}
//if②
if (isRunning(c) && workQueue.offer(command)) {//workQueue.offer(command)将任务添加到阻塞队列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//if③
else if (!addWorker(command, false))
reject(command);//执行饱和策略
}
复制代码
逻辑图
executor()执行逻辑图如下:
完整逻辑图如下:baijiahao.baidu.com/s?id=168166…
运行结果解析
线程池属性:核心线程5,最大线程数10,阻塞队列最多存放100个任务。
for循环提交10个任务,一开始线程池中线程数为0,在执行if①时判断成立,创建一个核心线程(重复5次,创建5个核心线程);
当for执行到第6次时,此时线程池中线程数为5,执行if①判断失败,继续向下执行if②将任务存放到阻塞任务队列中(重复5次,因为队列大小为10>剩余任务数5,所以剩下的任务都能放到阻塞队列中)。if②中的语句为重新判断线程池的状态和线程数与这里的分析无关。
此次过程没有创建非核心线程来处理任务更没有触发饱和策略(即10个任务都是由5个核心线程执行的)。
修改自定义线程池属性查看运行结果
由之前的运行结果和逻辑图可知,线程池优先让核心线程把任务处理完(要不说是核心线程,核心就得多干活),当核心线程处理不了(阻塞队列满)时,再创建非核心线程来帮他们处理任务。
创建非核心线程
什么时候会创建非核心线程呢?由源码可知,当阻塞队列满且线程池未满时,用户再提交一个任务会创建非核心线程。
修改线程池属性
修改阻塞队列大小为4,查看运行结果。
/**
* 自定义线程池
*/
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
5, //corePoolSize,核心线程数5
10, //maximumPoolSize,最大线程数10
10, //闲置的非核心线程最大存活时间
TimeUnit.SECONDS, //存活时间单位秒
new ArrayBlockingQueue<Runnable>(4), //保存待执行任务的阻塞队列
new CustomizeThreadFactory("ThreadPool-1"), //创建线程的工厂
new CustomizeRejectedExecutionHandler()); //饱和策略,当阻塞队列满并且线程数超过maximumPoolSize时采取的策略
Task task=new Task(threadPoolExecutor);
for(int i=0;i<10;i++){
threadPoolExecutor.execute(task);
}
threadPoolExecutor.shutdown();
}
}
复制代码
运行结果
# 一开始创建了6个线程来执行任务
ThreadPool-1-Task-2 Start...0 blocking queue size: 0
ThreadPool-1-Task-4 Start...3 blocking queue size: 4
ThreadPool-1-Task-5 Start...4 blocking queue size: 4
ThreadPool-1-Task-3 Start...2 blocking queue size: 0
ThreadPool-1-Task-1 Start...1 blocking queue size: 0
ThreadPool-1-Task-6 Start...5 blocking queue size: 4
ThreadPool-1-Task-1 End...1
ThreadPool-1-Task-4 End...3
ThreadPool-1-Task-5 End...4
ThreadPool-1-Task-2 End...0
ThreadPool-1-Task-3 End...2
ThreadPool-1-Task-6 End...5
ThreadPool-1-Task-3 Start...9 blocking queue size: 0
ThreadPool-1-Task-4 Start...8 blocking queue size: 1
ThreadPool-1-Task-5 Start...7 blocking queue size: 2
ThreadPool-1-Task-1 Start...6 blocking queue size: 3
ThreadPool-1-Task-1 End...6
ThreadPool-1-Task-3 End...9
ThreadPool-1-Task-4 End...8
ThreadPool-1-Task-5 End...7
复制代码
分析
线程池属性:核心线程大小5,最大线程数10,阻塞队列大小4。
for循环提交10个任务,前5次循环创建5个核心线程执行任务;第6次循环会将任务放到阻塞队列中,因为此次阻塞队列大小为4,所以从第6次到第9次循环会把阻塞队列填满;当第10次for循环时阻塞队列满(if①和if②都不满足),则执行if③创建一个非核心线程来执行任务。所以一开始会有6个线程执行任务。
触发饱和策略
什么时候会触发饱和策略呢?由源码可知,当阻塞队列满并且线程池达到最大线程数时会触发饱和策略。
修改线程池属性
将阻塞队列大小改为4,最大线程数和核心线程数都为5。
/**
* 自定义线程池
*/
public class ThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
5, //corePoolSize,核心线程数5
5, //maximumPoolSize,最大线程数5
10, //闲置的非核心线程最大存活时间
TimeUnit.SECONDS, //存活时间单位秒
new ArrayBlockingQueue<Runnable>(4), //保存待执行任务的阻塞队列
new CustomizeThreadFactory("ThreadPool-1"), //创建线程的工厂
new CustomizeRejectedExecutionHandler()); //饱和策略,当阻塞队列满并且线程数超过maximumPoolSize时采取的策略
Task task=new Task(threadPoolExecutor);
for(int i=0;i<10;i++){
threadPoolExecutor.execute(task);
}
threadPoolExecutor.shutdown();
}
}
复制代码
运行结果
ThreadPool-1-Task-2 Start...0 blocking queue size: 4
ThreadPool-1-Task-4 Start...3 blocking queue size: 4
# 触发了饱和策略
task rejected. java.util.concurrent.ThreadPoolExecutor@6d6f6e28[Running, pool size = 5, active threads = 5, queued tasks = 4, completed tasks = 0]
ThreadPool-1-Task-1 Start...2 blocking queue size: 4
ThreadPool-1-Task-3 Start...1 blocking queue size: 4
ThreadPool-1-Task-5 Start...4 blocking queue size: 4
ThreadPool-1-Task-2 End...0
ThreadPool-1-Task-1 End...2
ThreadPool-1-Task-2 Start...5 blocking queue size: 3
ThreadPool-1-Task-4 End...3
ThreadPool-1-Task-4 Start...7 blocking queue size: 1
ThreadPool-1-Task-5 End...4
ThreadPool-1-Task-3 End...1
ThreadPool-1-Task-5 Start...8 blocking queue size: 0
ThreadPool-1-Task-1 Start...6 blocking queue size: 2
ThreadPool-1-Task-2 End...5
ThreadPool-1-Task-1 End...6
ThreadPool-1-Task-4 End...7
ThreadPool-1-Task-5 End...8
复制代码
分析
线程池属性:核心线程数5,最大线程数5,阻塞队列大小4。
for循环提交10个任务,同上面分析一样,当执行第10次循环之前,有5个核心线程,阻塞队列满大小为4;提交第10个任务时if①和if②都失败,执行if③时,创建非核心线程失败(因为在执行完第5次for时线程池就满了),此时执行饱和策略,第10个任务也被抛弃了。
结果中没有任务9。
问题:1000个并发线程,10台机器,每个机器4核,设计线程大小
链接:baijiahao.baidu.com/s?id=168166…
只看了一下大致分析,其中关于tomcat线程池的设计还没有看,负载均衡也没看。。。
参考:《Java并发编程之美》
<https://baijiahao.baidu.com/s?id=1681664088671801338&wfr=spider&for=pc>
复制代码