JAVA线程池解析
什么是线程池
线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候就从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销
线程池带来的好处
- 降低资源消耗
- 提高相应速度
- 提高线程的可管理性
线程池可选择的阻塞队列
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
基于数组的有界阻塞队列
@Test
public void arrayBlockingQueue() throws InterruptedException {
/**
* 基于数组的有界阻塞队列,队列容量为10
*/
ArrayBlockingQueue queue =
new ArrayBlockingQueue<Integer>(10);
// 循环向队列添加元素
for (int i = 0; i < 20; i++) {
queue.put(i);
System.out.println("向队列中添加值:" + i);
}
}
基于链表的有界/无界阻塞队列
@Test
public void linkedBlockingQueue() throws InterruptedException {
/**
* 基于链表的有界/无界阻塞队列,队列容量为10
*/
LinkedBlockingQueue queue =
new LinkedBlockingQueue<Integer>();
// 循环向队列添加元素
for (int i = 0; i < 20; i++) {
queue.put(i);
System.out.println("向队列中添加值:" + i);
}
}
同步移交阻塞队列
(直接提交) 如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
工作队列的默认选项是 SynchronousQueue,此策略可以 避免在处理可能具有内部依赖性的请求集时出现锁。
该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。
@Test
public void test() throws InterruptedException {
/**
* 同步移交阻塞队列
*/
SynchronousQueue queue = new SynchronousQueue<Integer>();
// 插入值
new Thread(() -> {
try {
queue.put(1);
System.out.println("插入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 删除值
new Thread(() -> {
try {
queue.take();
System.out.println("删除成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000L * 60);
}
线程池可选择的饱和策略
- AbortPolicy终止策略(默认)
- DiscardPolicy抛弃策略
- DiscardOldestPolicy抛弃旧任务策略
- CallerRunsPolicy调用者运行策略
线程池的执行示意图
- 主线程执行execute,将任务提交到核心线程池,核心线程池进行处理或新增核心线程
- 如果核心线程池满了,则会将任务提交到阻塞队列里面,核心线程会轮询消费阻塞队列
- 阻塞队列也满了则会将任务提交到最大线程池里面,让非核心线程处理
- 非核心线程也增加到了最大线程池,则会引起饱和策略
常用的线程池
常用的线程池之newCachedThreadPool
/**
* 线程数量无限的线程池(核心线程为0,最大线程数是无限大的值,默认的阻塞队列是同步移交策略,意味着有一个任务就有有一个线程去消费,然后去接受另一个任务,这个线程池会创建无数个线程最终系统崩溃)
*
* @return
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
常用的线程池之newFixedThreadPool
/**
* 线程数量固定线程池(线程个数虽然固定了,但是无界的队列是没有限制的,任务队列也是会把内存挤爆的)
* @param nThreads
* @return
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
常用的线程池之newSingleThreadExecutor
/**
* 单一线程池
*/
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
向线程池提交任务
import org.junit.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class RunTest {
@Test
public void submitTest()
throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService threadPool =
Executors.newCachedThreadPool();
/**
* 利用submit方法提交任务,接收任务的返回结果
*/
Future<Integer> future = threadPool.submit(() -> {
Thread.sleep(1000L * 10);
return 2 * 5;
});
/**
* 阻塞方法,直到任务有返回值后,才向下执行
*/
Integer num = future.get();
System.out.println("执行结果:" + num);
}
@Test
public void executeTest() throws InterruptedException {
// 创建线程池
ExecutorService threadPool =
Executors.newCachedThreadPool();
/**
* 利用execute方法提交任务,没有返回结果
*/
threadPool.execute(() -> {
try {
Thread.sleep(1000L * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer num = 2 * 5;
System.out.println("执行结果:" + num);
});
Thread.sleep(1000L * 1000);
}
}
线程池的状态
线程池饱的四种饱和策略使用与代码调试
定义线程池
/**
* 线程池
*/
private static ThreadPoolExecutor executor =
new ThreadPoolExecutor(
// 核心线程数和最大线程数
2, 3,
// 线程空闲后的存活时间
60L, TimeUnit.SECONDS,
// 有界阻塞队列
new LinkedBlockingQueue<Runnable>(5)
);
重写线程执行
/**
* 任务
*/
class Task implements Runnable {
/**
* 任务名称
*/
private String taskName;
public Task(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("线程[ " + Thread.currentThread().getName()
+ " ]正在执行[ " + this.taskName + " ]任务...");
try {
Thread.sleep(1000L * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程[ " + Thread.currentThread().getName()
+ " ]已执行完[ " + this.taskName + " ]任务!!!");
}
}
终止策略
demo
/**
* 终止策略
* TODO 抛出异常,拒绝任务提交
*/
@Test
public void abortPolicyTest() {
// 设置饱和策略为 终止策略
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.AbortPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}
执行结果
结果说明(线程池的执行过程)
- 2个任务进入核心线程
- 第3个到第7个任务,会暂存到任务队列中,因为有界队列定义为5
- 第8个任务,会启动最大线程,去执行
- 第9个第10任务,没有线程可以去执行,被终止抛出
抛弃策略
demo
/**
* 抛弃策略
* TODO 直接丢弃掉新提交的任务
*/
@Test
public void discardPolicyTest() {
// 设置饱和策略为 抛弃策略
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}
执行结果
结果说明
线程任务第9与第10因为已满则被直接抛弃
抛弃旧任务策略
demo
/**
* 抛弃旧任务策略
* TODO 丢弃掉任务队列中的旧任务,暂存新提交的任务
*/
@Test
public void discardOldestPolicyTest() {
// 设置饱和策略为 抛弃旧任务策略
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}
执行结果
结果说明
线程3与线程4进入优先进入队列等待,也是最先被抛弃
调用者运行策略
demo
/**
* 调用者运行策略
* TODO 借用主线程来执行多余任务
*/
@Test
public void callerRunsPolicyTest() {
// 设置饱和策略为 调用者运行策略
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}
执行结果
结果说明
线程任务9与10因为满了,则让主程执行,这就是调用者运行策略