java线程池相关类图
Executor接口
public interface Executor {
void execute(Runnable command);
}
接口定义很简单,就一个execute()方法,用来执行提交的任务。接口的注释中说到,该接口的主要作用是将任务的提交与任务的执行以及线程的调度进行解耦,我们不用再像下面这样明确的创建一个线性,然后启动:
new Thread(new RunnableTask()).start()
现在我们可以这样去执行一个任务:
Executor executor = new Executor();
executor.executor(new RunnableTask1());
executor.executor(new RunnableTask2());
当然,Executor不要求每个任务都异步执行,我们可以在主线程中立即执行我们提交的任务,像这样:
class DirectExecutor implements Executor {
public void executor(Runnable r) {
r.run();
}
}
我们也可以另起一个线性去执行提交的任务:
class ThreadPerTaskExecutor implements Executor {
public void executor(Runnable r) {
new Thread(r).start();
}
}
ExecutorService接口
继承自Executor接口,相较于Executor,提供了更多对线程池的操作。
-
线程池可以被关闭
void shutdown(); List<Runnable> shutdownNow(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
shutdown()有序关闭先前已经提交的任务,同时不会再接受新的任务,如果线程池早已关闭不会产生额外的效果,该方法不会去等待先前提交的任务执行完毕;shutdownNow()试图停止正在执行或等待执行的任务,并返回等待执行的任务列表,该方法不会等待每一个任务终止。
awaitTermination()阻塞直到任一情况发生:①所有的任务在调用shutdown后执行完成;②超时;③当前线程被中断。
-
提交一个任务
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
submit()方法提交一个任务到线程池,任务执行完成将返回一个Future,可以通过Future.get()方法拿到执行结果。第二个submit()方法多了一个result参数,我们也可以将返回结果放到result上。
-
批量执行任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
invokeAll()提交一个任务集合,执行完成后将返回执行结果list,可以设置个超时时间,到时间没有执行完的任务将会被取消;invokeAny()是只要有一个执行完成就返回,剩余的任务全部取消执行,同时也可以设置一个超时时间,如果超过这个时间没有一个任务执行完就抛出TimeoutException。
ScheduledExecutorService 定时执行任务
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
schedule()方法在给定的延时后执行给定的任务;scheduleAtFixedRate()在给定延时initialDelay后任务以固定速率执行,如果任务的执行时间小于period,那么后面任务分别在initialDelay + period、initialDelay + 2period、initialDelay + 3period …开始执行,如果任务的执行时间大于period,那么下一个任务就会被延迟执行,也就是任务不会并发的去执行;scheduleWithFixedDelay()在给定的延时initialDelay后任务以给定的延时执行,也就是一个任务执行完成后,等待delay时间,然后开始下一个任务。
ThreadPoolExecutor 线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = (System.getSecurityManager() == null)
? null
: AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
真正的线程池实现类,可以看到构造函数的参数真的是…多,所以真正使用的时候我们并不自己去new ThreadPoolExecutor,而是利用工具类Executors来创建线程池。(但是最好自己去new,这样可以更加理解线程池的创建规则,避免资源耗尽)。参数解释:
- corePoolSize:核心线程大小,当新任务来时,如果池中的线程数小于corePoolSize,就创建新的线程来执行这个任务,即使当前池中有空闲的线程,直到线程数达到corePoolSize;
- maximumPoolSize:池中最大线程数,当运行的线程达到corePoolSize,此时还有任务来时,先把任务放到队列workQueue中,如果队列也满了,就继续创建线程,直到线程中达到maximumPoolSize;
- keepAliveTime:超过corePoolSize的线程如果空闲,最多存活的时间;
- unit:时间的单位;
- workQueue:等待执行的任务队列,常用的任务队列有:
- SynchronousQueue
- LinkedBlockingQueue
- ArrayBlockingQueue
- threadFactory:创建线程的工厂
- handler:当池中线程达到最大且任务队列也满了的时候对新来的任务的处理策略,策略有如下:
- AbortPolicy:默认策略,表示无法处理新来的任务,并抛出个RejectedExecutionException异常
- CallerRunsPolicy:使用调用者自己的线程处理新任务,这样也减慢了任务提交的速度
- DiscardPolicy:处理不了了,丢掉。。。
- DiscardOldestPolicy:丢弃任务队列中对头的任务,尝试重新执行当前任务
Executors线程池工具类
-
创建固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
创建一个具有足够线程来支持给定并行级别的线程池,这里用的ForkJoinPool来new,这个我后面学习
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
-
具有单个线程的线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
-
创建一个可缓存的线程池,新任务到来时先看池中有没有空闲线程,有就用,没有就新建,线程空闲超过60秒就会被销毁
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
单个定时执行的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
-
定时执行线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }