Executors线程工具类,位于java.util.concurrent包下面:
主要用来创建四类线程池:
1.newFixedThreadPool(int, ThreadFactory)
创建一个定长的线程池,指定线程的数量大小,可以控制线程的最大并发量,超出的线程会在队列中等待
源码如下:
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到核心线程和最大线程数都是固定的,并且keepAliveTime为0,也就是当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。始终保证线程池中的线程数固定,任务队列为LinkedBlockingQueue(无界队列),也就是只要有任务就放到缓存中去等待.
2.newCachedThreadPool()
创建一个可缓存的线程池,如果线程池长度超过处理需要(也就是有空闲线程),会在60s后回收空闲线程,如线程数不够是,就会创建新的线程
源码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到初始化的时候,线程是0,最大线程为Integer.MAX_VALUE(可认为无界限),回收空闲线程等待时间60s,任务队列为SynchronousQueue
因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。
也就是相当于这个队列不会有缓存功能,只会阻塞,当有任务时候就马上执行,直到没有任务,没有任务的时候就会一直等待任务的到来
3.newScheduledThreadPool(int, ThreadFactory)
创建一个指定大小的线程池,支持定时和周期性任务执行,延时执行,类似一个定时器
源码如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
点进去看到:
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
同时查看该类的集成关系:
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
可以看到他继承了ThreadPoolExecutor,所以super调用的方法就是new ThreadPoolExecutor(),
可以看到核心线程指定了,最大线程为 Integer.MAX_VALUE,空闲等待为0,任务队列为DelayedWorkQueue
DelayedWorkQueue
中的元素第一个元素永远是 delay 时间最小的那个元素,如果 delay 没有到期,take 的时候便会 block 住。
了解了 DelayedWorkQueue
,理解 ScheduledThreadPoolExecutor
就容易了。当执行 schedule 方法是。如果不是重复的任务,那任务从 DelayedWorkQueue
取出之后执行完了就结束了。如果是重复的任务,那在执行结束前会重置执行时间并将自己重新加入到 DelayedWorkQueue
中
实例:
/*
* ScheduledExecutorService比Timer更安全,功能更强大
*/
public static void scheduledThreadPool() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
延时3秒执行任务,每1秒执行一次
4.newSingleThreadPool()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
源码:
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看到核心线程和最大线程都是1,空闲线程回收为0,阻塞队列为LinkedblockingQueue,队列是无界队列,会将任务放到队列中等待有线程在执行
new ThreadPoolExecutor();
中如果任务队列使用的是有界队列,那么当任务数量小于核心线程数的时候,增加任务不会创建线程,直到等于核心线程数,此时在增加任务,会将任务放到队列中去,直到达到队列的最大值,如果再来任务,就会创建新线程直到达到最大线程数,如果队列满了,并且最大线程数也达到了,此时在增加任务就会执行拒绝策略
中如果使用的是无界队列,,那么当任务数量小于核心线程数的时候,增加任务不会创建线程,直到等于核心线程数,后续再增加任务就会放入到队列中,由于队列是无界的所以就不会去创建线程来达到最大线程数,了,也就是说对于无界队列,最大线程数的配置是无效的,也是无意义的
有界队列的实例:
package com.example.demo.excecutor;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo {
@Test
public void test() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
executor.submit(new Task("1")); //最先初始
executor.submit(new Task("2")); //以下连着三个都会放入到缓存中去 第三
executor.submit(new Task("3")); //第四
executor.submit(new Task("4")); //第五
executor.submit(new Task("5")); //第二
executor.submit(new Task("6")); //拒绝
/**
* 对输出结果解析:
* 首先我创建的线程池 keepAliveTime 为0,目的是保证线程一执行完就回归到核心线程数
*
* 核心线程数为1,所以第一个任务会被核心线程执行
*
* 第2,3,4个任务会放到缓存队列中去,因为缓存队列最大为3
*
* 第五个任务的时候,缓存队列已经满了,但是最大线程数可以保持2个,所以会创建新线程直接执行他
*
* 第6个任务此时最大线程数也满了,就会执行拒绝策略,默认的拒绝策略是不执行,抛出异常
*
*/
}
class Task implements Runnable{
private String name;
public Task() {
}
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("线程"+name + "正在执行...");
}
}
}
输出结果如下:
对于无界队列:
如果在使用无界队列LinkedBlockingQueue是指定了大小的话,结果就和有界队列是一样的了,如下
如果队列不指定大小,那么执行的结果就不会抛出异常,并且执行顺序是依次的了,如下:
package com.example.demo.excecutor;
import org.junit.Test;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Demo2 {
@Test
public void test() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
executor.submit(new Task("1")); //第一
executor.submit(new Task("2")); //第二
executor.submit(new Task("3")); //第三
executor.submit(new Task("4")); 第四
executor.submit(new Task("5")); 第五
executor.submit(new Task("6")); //第六
}
class Task implements Runnable{
private String name;
public Task() {
}
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("线程"+name + "正在执行...");
}
}
}
执行结果如下:
java中四种拒绝策略:
AbortPolicy策略, CallRetunspolicy策略, DiscardOledestPolicy策略, DiscardPolicy策略
AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
在new ThreadPoolExecutor()中,如果不指定拒绝策略,那么默认的拒绝策略是AbortPolicy策略,
源码如下:
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
.......
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}