并发编程15
jdk内部线程池ExecutorService
- ThreadPoolExecutor
- ScheduledExecutorService
线程池状态
-
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
-
状态 value 说明 RUNNING(当线程池创建出来的初始状态) 111 能接受任务,能执行阻塞任务 SHUTDOWN(调用shutdown方法) 000 不接受新任务,能执行阻塞任务 肯定可以 執行正在執行的任務 STOP(调用shutDownNow) 001 不接受新任务,打断正在执行的任务,丢弃阻塞任务 TIDYING(中间状态) 010 任务全部执行完,活动线程也没了 TERMINATED(终结状态) 011 线程池终结 -
package BingFaBianCheng.bingFaBianCheng15.shadow.threadPool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestThreadPoolStatus { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(1); // 如果已经提交了9个任务,执行5个了,被shutdown了 // 此时再提交新任务不会再执行 // 但是阻塞队列中的4个任务会继续执行 // SHUTDOWN状态 executorService.shutdown(); // TIDYING中间状态,执行shutdown(),执行完阻塞队列中的任务, // 就会变成TIDYING中间状态 // 上一个状态停留一下,就会变成TERMINATED状态 // STOP状态 // 打断正在执行的任务,丢弃队列中的任务 executorService.shutdownNow(); } }
ThreadPoolExecutor
-
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
-
线程池的状态包括线程池中线程数量和线程运行状态两个参数
-
rs表示线程状态,wc表示工作线程数量
-
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
-
为了保证原子性,利用ctlOf指令去操作两个参数
构造方法
-
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
-
1、核心线程数
- 上节课自定义线程池中的核心线程set实际没有核心的意义,实际只有那一种线程类型
- 与核心线程相反的是空闲线程
- 核心线程数+空闲线程数 = 最大线程数
- 线程池是懒惰性的,不会一开始就创建线程,它是有任务提交的时候才会有创建线程
-
2、最大线程数(除核心线程之外,叫空闲线程,或者应急线程数)
-
3、针对空闲线程的存活时间 如果超时了则把空闲的线程kill
-
4、针对3的时间单位
-
5、任务存放的队列
-
6、线程工厂,主要是产生线程—给线程起个自定义名字
扫描二维码关注公众号,回复: 12476607 查看本文章 -
7、拒绝策略RejectedExecutionHandler,内部四种策略
- 默认策略—AbortPolicy
代码示例
package BingFaBianCheng.bingFaBianCheng15.shadow.threadPool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author 钢牌讲师--子路
* 演示 空闲线程和核心线程的概念---空闲线程和核心线程都会从队列当中去获取任务 随机
* 前提是空闲线程被弃用
**/
@Slf4j(topic = "e")
public class TestThreadExecutorPool {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
//懒惰性---不会再一开始就创建线程,他是有任务提交的时候才会创建线程
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(1,2,
3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
(r)->{
return new Thread(r,"t"+atomicInteger.incrementAndGet());
},new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i <2 ; i++) {
threadPoolExecutor.execute(new MyTask(i));
};
}
// 实现runnable接口
// 为了打印到底是哪个线程在执行哪个任务
static class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
// log.debug("线程数目{}", executor.getActiveCount());
log.debug("线程名称:{} 正在执行task{}", Thread.currentThread().getName(), taskNum);
try {
Thread.currentThread().sleep(1000);
//log.debug("线程数目{}", executor.getActiveCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("task{}执行完毕============", taskNum);
}
}
}
-
ThreadPoolExcutor的默认命名是pool-1-thread-1
-
第一个1是第几个线程池,第二个1是该线程中的第几个线程
测试结果
-
t1 – task0
-
t1 – task1
-
t2 – task2
-
t1 – task0
-
t2 – task1
-
t2 – task2
-
因为模拟的线程池只有1个核心线程,阻塞队列容量为1,空闲线程为1
-
从队列取出来后空闲线程和核心线程到底谁执行任务,是随机选择的,前提是空闲线程被启用了
-
如果执行任务数是2,全部都是t1执行
-
如果执行任务数是4,直接指向拒绝策略,报异常
-
getActiveCount是正在执行任务的线程数目
-
getPoolSize是获取仍然存活的线程数目
工作方式
-
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
-
当线程数达到 核心线程数上限,这时再加入任务,新加的任务会被加入队列当中去,前提是有界队列,任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程数目,作为空闲线程来执行任务
-
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略
-
所以如果是用的无界队列,空闲时间,最大线程数,存活时间单位这三个变量就没有意义了
工厂方法
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory
threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
ExecutorService executorService = Executors.newFixedThreadPool(n);
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
- 适用于任务量已知,相对耗时的任务(生成固定数目的线程来慢慢的执行任务,同一时刻无法执行的任务会被放到无界阻塞队列中阻塞)
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ExecutorService executorService = Executors.newCachedThreadPool()
-
工作中相对更常用
-
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,全部都是空闲线程60s后回收
-
一个可根据需要创建新线程的线程池,如果现有线程没有可用的,则创建一个新线程并添加到池中,如果有被使用完但是还没销毁的线程,就复用该线程。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源
-
这种线程池比较灵活,对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能(因为短期(小于60秒),一个线程执行完任务后,可以很快的复用)
-
t1执行task1慢,还没有执行完,task2又来了,此时会创建一个新的线程t2来执行task2,如果任务执行时间短,t1会继续执行task2,相当于是缓存下来了
-
使用同步队列SynchronousQueue队列,生成的空闲线程数会远小于提交的任务数,但是如果任务时间长一直没执行完,就会去创建新的线程,但是还是大大小于无界队列创建的线程数目,如果使用无界队列,会一直去无限创建线程。
-
SynchronousQueue队列
-
package BingFaBianCheng.bingFaBianCheng15.shadow.threadPool; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class SynchronousQueueTest { public static void main(String[] args) throws InterruptedException { SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); new Thread(()->{ log.debug("start put 1"); try { synchronousQueue.put("1"); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("end put 1"); log.debug("start put 2"); try { synchronousQueue.put("2"); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("end put 2"); },"t1").start(); TimeUnit.SECONDS.sleep(1); new Thread(()->{ log.debug("start take 1"); try { synchronousQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("end take 1"); },"t2").start(); } }
-
t1的软方法会一次put 1 和 2
-
但是不会执行put 2,会阻塞住
-
除非有其他线程take 1,才会去执行put 2
理解
- cache线程池的线程数量无限,但队列是阻塞的,无法无限入队,每一个新任务入队后,都会判断有没有可用线程池可用,有就会复用,不会创建新的线程池
- fixed线程池线程数量有限,队列无效,可以无限入队
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ExecutorService executorService = Executors.newSingleThreadExecutor();
- 希望多个任务排队执行,线程数固定为 1,任务数多于 1 时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放。
- 区别于自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改 ,Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor对象,可以强转后调用 setCorePoolSize 等方法进行修改;
提交任务
提交一个任务
void execute(Runnable command);
提交一个任务有返回值
Future<?> submit(Runnable task);//无返回值
Future<?> submit(Callable task);//有返回值
提交所有的任务
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws
InterruptedException;
提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
T invokeAny(Collection<? extends Callable<T>> tasks) throws
InterruptedException, ExecutionException;
shutdown
线程池状态变为SHUTDOWN
不会接收新任务
但已提交任务会执行完(包含在阻塞队列中的任务)
不会阻塞调用线程的执行
void shutdown();
线程池状态变为 STOP
不会接收新任务
会将队列中的任务返回
并用 interrupt 的方式中断正在执行的任务
List<Runnable> shutdownNow();
调用 shutdown后,调用线程并不会等待所有任务运行结束,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException;
防止线程中的某个任务出现问题,可能发生死锁,而没有继续往后执行,执行这个方法后,在指定时间后就会继续执行该方法后面的代码,用来在线程池执行完之后需要做的一些工作,比如回收资源等。