线程池
先来总揽一下线程池结构
以上是线程池结构,常用的工具java.util.concurrent.Executors
结构如下
在Executors中常用的方法
Executors.newCachedThreadPool()
创建线程池核心poolSize = 0,最大poolSize=Integer.MAX_VALUE,线程任务执行完后,如果没有新任务,会在60s后被回收,使用SynchronousQueue队列做阻塞队列,队列容量是0,每次添加时,必须阻塞等待删除,每次删除都要等待添加,对任务工作时间比较小的任务,效率很高,对线程的影响如果一直有任务添加,并且正在工作的线程都没有结束,会一直new新线程出来执行,线程数量上限是Integer.MAX_VALUE.,默认拒绝策略,AbortPolicy,直接抛错,结束当前线程。
实例代码
private static void newCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Task());
}
executorService.shutdown();
}
Executors.newFixedThreadPool(5)
固定线程池大小的线程,核心poolSize=maxPoolSize,线程一旦启动,就不会被回收,如果其中某个线程挂掉了,会新启动一个线程来填充,使用LinkedBlockingQueue阻塞队列为等待队列,如果线程全都在执行,那么新添加进来的任务全部会放到阻塞队列中去,最大值是Integer.MAX_VALUE,默认拒绝策略,AbortPolicy,直接抛错,结束当前线程。
实例代码
private static void newFixedThreadPool() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.execute(new Task());
}
executorService.shutdown();
}
Executors.newScheduledThreadPool(5)
编排任务的线程池,定时执行,必须初始化一个核心线程数量大小,最大线程数Integer.MAX_VALUE,线程一旦new出来就不会回收,使用DelayedWorkQueue任务编排的阻塞队列,默认拒绝策略,AbortPolicy,直接抛错,结束当前线程。
示例代码,有三种类型,
第一种是直接延迟一定时间后,执行一次任务;
第二种,在延迟一定时间后,第一次执行,后续执行等待前一个线程执行完毕后再延迟一定时候后执行;
第三种,在延迟一定时候后,第一次执行,后续执行等待前一个任务执行开始的时间计时延迟,如果已经到了就执行,如果没到等待时刻到来执行。
/**
* 在initialDelay时间延迟后第一次执行,之后以上一个任务开始时间点开始计算,每间隔delay后执行一次,如果delay已经到了
* 则立即执行,如果没有到,等待时刻到来再执行
*/
private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(new Task(), 5, 10, TimeUnit.SECONDS);
// executorService.shutdown();
}
/**
* 在initialDelay时间延迟后第一次执行,之后以上一个任务结束时间点开始计算,每间隔delay后执行一次
*/
private static void scheduleWithFixedDelay() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleWithFixedDelay(new Task(), 5, 10, TimeUnit.SECONDS);
// executorService.shutdown();
}
/**
* 延迟一定时间后执行
*/
private static void scheduledDelayThreadPool() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.schedule(new Task(), 5, TimeUnit.SECONDS);
executorService.shutdown();
}
Executors.newSingleThreadExecutor()
单线程的线程池,如果在任务时,线程挂掉了,会立即新启一个线程,最大线程1,核心线程1,阻塞队列LinkedBlockingQueue,不会回收线程,默认拒绝策略,AbortPolicy,直接抛错,结束当前线程。
/**
* 单线程的线程池,如果在任务时,线程挂掉了,会立即新启一个线程
*/
private static void newSingleThreadExecutor() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Task());
executorService.shutdown();
}
Executors.newWorkStealingPool() java 1.8
1.8新增的线程池
public static void newWorkStealingPool() throws InterruptedException {
//forkJoin
ExecutorService forkJoinPool = Executors.newWorkStealingPool();
List<CallableTask> callableTaskList = new ArrayList<>();
callableTaskList.add(new CallableTask());
callableTaskList.add(new CallableTask());
callableTaskList.add(new CallableTask());
List<Future<Integer>> futures = forkJoinPool.invokeAll(callableTaskList);
futures.forEach(integerFuture -> {
try {
System.out.println(integerFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
这个实例不太好,下次想好了,再改哈,根据上面的层级关系图,forkJoin不属于ThreadPoolExecutor,接下来我们看看ForkJoinPool线程池的实现初始化时,线程数是内核数,非同步,异常处理为null
在使用ThreadPoolExecutor时,建议自己创建,不使用Executors工具类创建。用Executors里面对拒绝任务添加时很不友好,而使用自己创建ThreadPoolExecutor时,可以自己重写拒绝策略。
示例代码
private static void threadPoolExecutor() {
ExecutorService executorService = new ThreadPoolExecutor(4, 16, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 << 4),new MyRejectedExecutionHandler());
}
public static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("需要日志保存的消息。" + r.toString() + executor.toString());
}
}
优先级队列在线程池中的使用
模拟VIP用户,VIP等级高的优先执行。
public static class VIP implements Comparable<VIP>, Runnable {
private String name;
private int vipLevel;
public VIP(String name, int vipLevel) {
this.name = name;
this.vipLevel = vipLevel;
}
@Override
public int compareTo(VIP o) {
return Integer.compare(o.vipLevel, this.vipLevel);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + name + " 执行。vip等级" + vipLevel);
}
}
private static void threadPoolExecutor() {
PriorityBlockingQueue<Runnable> priorityBlockingQueue = new PriorityBlockingQueue();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("VIP 线程-%d").build();
ExecutorService executorService =
new ThreadPoolExecutor(4,
16,
10,
TimeUnit.SECONDS,
priorityBlockingQueue,
threadFactory,
new MyRejectedExecutionHandler());
for (int i = 0; i < 100; i++) {
int vipLevel = i % 2;
executorService.execute(new VIP("用户" + i, vipLevel));
}
}
总结
本章节对线程池的处理进行了讲解,需要多加练习。