Java线程池:ThreadPoolExecutor的使用
通过ThreadPoolExecutor构建BIO伪异步I/O通信的例子深入理解Java中线程池的概念。
- Thread vs Exector
首先,捋清楚ThreadPoolExecutor的来龙去脉!!!
Java中创建线程的方法如下:
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
}).starrt();
但是存在如下弊端:
1.一个任务对应于一个线程,当并发任务较多时,不停的创建和销毁线程对系统性能影响较大;
2.thread类不便于线程的管理,无定时执行、线程终止等功能。
为了适应于多并发的情况,减少系统资源的开销,减少因竞争资源而导致堵塞的情况,可以自动或手动创建一定数量的复用线程。当已配置的线程使用完后,即没有控制线程,任务队列中的其他任务处于等待状态;当一个线程的执行完当前任务时,继续从任务队列中取出任务,若队列中没有任务,该线程处于空闲状态。线程池的概念应运而生。
- 四类静态线程池
Exector作为线程池的顶级接口,Java的Exector接口提供了四种静态工厂分别对应于四类线程池,且都继承了ExectorService接口。
1.Executors.newCachedThreadPool(),创建一个可缓存线程池,缓冲池容量大小为Integer.MAX_VALUE,如果线程池长度超过处理需要,可灵活回收空闲线程,无可回收线程,则新建线程。
2.Executors.newFixedThreadPool(int) 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3、Executors.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4、Executors.newSingleThreadExecutor() 创建一个单线程化(容量为1)的线程池,用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
其中,newCachedThreadPool()、newFixedThreadPool()、newSingleThreadExecutor()三种静态方法均返回ThreadPoolExecutor实例,newScheduledThreadPool ()则返回ScheduledExecutorService实例。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
综上,Exector族谱图如下:
(此图借用他人博客,在此谢谢版主)
- Executor
Executor接口内部定义了用于执行任务的execute方法,Runnable用于表示任务;该接口提供了一种生产者-消费者解耦的机制,提交任务相当于生产者,执行任务相当于消费者;
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
- ExecutorService
ExecutorService接口继承于Executor接口,是Executor借口的扩展,提供了终止任务和跟踪一个或者多个异步任务的方法。提供了shutdown和shutdownNow两种方法关闭ExectorService,拒绝新任务。
shutdown() 方法在终止前允许执行以前提交的任务;
shutdownNow阻止等待任务并停止现阶段执行的任务。
通过Exector.execycute(Runnable)方法创建并返回可悲取消执行或者等待完成的FUTURE任务,invokeAny或invokeAll调用任务队列中的任务,并等待至少一个会议哦在全部任务完成。
应用举例:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}}
- ThreadPoolExecutor构造函数说明
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
. corePoolSize
核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。
. maximumPoolSize
线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。
. keepAliveTime
非核心线程的闲置超时时间,超过这个时间就会被回收。
. unit
指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。
- workQueue
线程池中的任务队列.
常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
线程池的线程执行规则
下面都假设任务队列没有大小限制:
1.如果ThreadSize<=corePoolSize,那么直接启动一个核心线程来执行任务,不会放入队列中。
2.如果ThreadSize>corePoolSize,但<=maximumPoolSize,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
3.如果ThreadSize>corePoolSize,但<=maximumPoolSize,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
4.如果ThreadSize>corePoolSize,并且>maximumPoolSize,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
5.如果ThreadSize>corePoolSize,并且>maximumPoolSize,当任务队列是SynchronousQueue的时候,会因为线程池拒绝添加任务而抛出异常。
任务队列大小有限时
当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
SynchronousQueue没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。