本文主要介绍线程池的概念,原理,以及简单实现一个线程池,若文中有不足或错误之处,请指出(ps:感激涕零,不要让我陷入错误的误区。。。)
一:线程池的基本概念和原理
在此之前,先来思考一个问题,为啥要用线程池呢?
线程越多,不一定就会执行的越快,受到CPU的影响,我们要控制线程的数量,线程池它的一个作用,就是用来管理线程的数量的。一般在计算机中,线程的数量是CPU数量的1到2倍。
线程池主要有下面几部分组成:
1:线程池管理器。用来管理线程池的,主要可以创建线程池,销毁线程池等。
2:工作线程。执行任务的线程,可以循环的去执行不同的任务。
3:任务接口。必须要去实现的接口,主要用来线程的调度,任务的收尾以及任务的状态等。
4:工作队列。存放任务的队列。
实现一个线程池,首先需要知道线程池的基本原理,我们知道线程池有个顶级接口Excutor,初始化一个线程池,有以下几个参数需要了解:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
5, //核心线程数
10, //最大线程数
5, //若最大线程数5s内没有任务执行,就销毁
TimeUnit.SECONDS, //时间单位
new LinkedBlockingDeque<>() //队列
);
1:核心线程数,线程池的核心线程,当有任务提交过来的时候就创建一个线程,执行任务。
2:工作队列,当核心线程数量达到我们设置的值时,就去判断工作队列是不是已经满了,要是没满,就把任务放入工作队列。
3:最大线程数,当工作队列已满,这时候就看是否达到了最大线程数,要是没有,就建立线程执行任务,要是达到了最大线程数,就执行拒绝策略。
二:实现一个简单的线程池
在实现之前,需要思考:实现线程池,需要去实现什么?怎么实现?需要做什么事情?
根据上面的介绍知道,实现一个线程池,肯定需要一个队列去存放任务,还需要一个集合去存放线程:
/** 需要一个任务队列,用来存放任务 */
private BlockingDeque<Runnable> blockingDeque;
/** 需要一个集合,存放工作线程 */
private static List<Thread> workerList;
那么接下来还需要做什么呢?既然任务队列和线程都有了,接下来就是从任务队列拿任务,然后线程循环的去执行任务:
/** 工作线程执行任务,需要可以循环的执行 */
public class Worker extends Thread {
private ThreadPoolDemo threadPool;
//构造方法
public Worker(ThreadPoolDemo pool) {
this.threadPool = pool;
}
@Override
public void run() {
//判断条件,线程是否关闭,任务队列中是否还有任务
while (isWorker == false || blockingDeque.size() > 0) {
Runnable task = null;
try {
if (isWorker) {
//非阻塞
task = blockingDeque.poll();
} else {
//阻塞
task = blockingDeque.take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
if (task != null) {
task.run();
}
}
}
}
下面就是初始化线程池的方法了,指定线程的数量,队列的大小等:
/** 初始化一个线程池
* 传入参数:线程数,工作队列数
*/
public ThreadPoolDemo(int threadSize, int queueSize) {
//初始化队列
blockingDeque = new LinkedBlockingDeque<>(queueSize);
//初始化工作线程集合(线程安全的)
workerList = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < threadSize; i ++) {
Worker worker = new Worker(this);
//启动线程
worker.start();
//把线程放入线程集合
workerList.add(worker);
}
}
然后,就是提供一个对外的接口,用来提交任务,下面提供一个阻塞和一个非阻塞的提交任务接口:
/** 提交任务的接口,阻塞方式 */
public void submit(Runnable task) {
try {
//关闭线程池,就不会有任务提交
if (!this.isWorker) {
this.blockingDeque.put(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** 提交任务的接口,非阻塞方式 */
public boolean execute(Runnable task) {
//关闭线程池,就不会有任务提交
if (this.isWorker) {
return false;
} else {
return this.blockingDeque.offer(task);
}
}
最后,就是关闭线程池,关闭线程池需要满足几个条件:
1:当线程池关闭的时候,不会有新的任务提交过来了;
2:线程再去拿任务的时候,就不能再阻塞了;
3:已经阻塞的线程,需要打断阻塞。
/** 标志位,代表线程池是否关闭 */
public volatile boolean isWorker = false;
/**
* 线程池关闭时:
* 1:不会有新的任务提交
* 2:队列中的任务提交时,不会再阻塞
* 3:已经阻塞的线程,需要打断阻塞
* 4:需要队列中的任务提交完毕
*/
public void shutDown() {
this.isWorker = true;
for (Thread worker : workerList) {
//线程阻塞的话,就要打断阻塞
if (worker.getState().equals(Thread.State.BLOCKED) ||
worker.getState().equals(Thread.State.WAITING)) {
worker.interrupt();
}
}
}
好了,到此基本的线程池就实现了。下面写一个测试:
public static void main(String[] args) {
ThreadPoolDemo pool = new ThreadPoolDemo(3, 6);
for (int i = 0; i<6; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("任务正在被提交");
}
});
pool.submit(thread);
}
}
最后的最后,知识有限,可能说的不够全面和深入,见谅。
--- 我自是年少,韶华倾付