ExecutorService接口定义: http://donald-draper.iteye.com/blog/2365738
Future接口定义: http://donald-draper.iteye.com/blog/2365798
FutureTask解析: http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义: http://donald-draper.iteye.com/blog/2366239
package java.util.concurrent; /** * A {@link CompletionService} that uses a supplied {@link Executor} * to execute tasks. This class arranges that submitted tasks are, * upon completion, placed on a queue accessible using {@code take}. * The class is lightweight enough to be suitable for transient use * when processing groups of tasks. * ExecutorCompletionService是执行器的补充或者说是辅助,用于执行任务。 任务提交到ExecutorCompletionService执行后,如果任务执行完,则添加到完成 队列,我们可以通过take,取回完成任务的结果。ExecutorCompletionService 是处理集合任务的一个轻量级的实现。 * <p> * * <b>Usage Examples.</b> * 实例: * Suppose you have a set of solvers for a certain problem, each * returning a value of some type {@code Result}, and would like to * run them concurrently, processing the results of each of them that * return a non-null value, in some method {@code use(Result r)}. You * could write this as: * 假设将一个确定的问题,分成n个部分,这n个部分可以并发执行,每个部分返回一个非null,结果 在其他一些方法中,使用这些结果。 * <pre> {@code * void solve(Executor e, * Collection<Callable<Result>> solvers) * throws InterruptedException, ExecutionException { * CompletionService<Result> ecs * = new ExecutorCompletionService<Result>(e); * for (Callable<Result> s : solvers) * ecs.submit(s); * int n = solvers.size(); * for (int i = 0; i < n; ++i) { * Result r = ecs.take().get(); * if (r != null) * use(r); * } * }}</pre> * * Suppose instead that you would like to use the first non-null result * of the set of tasks, ignoring any that encounter exceptions, * and cancelling all other tasks when the first one is ready: * 假设我们仅想用任务集合中第一个完成任务的结果,并忽略其他任务遇到的异常, 当第一个完成任务的结果可用时,取消其他任务。 * <pre> {@code * void solve(Executor e, * Collection<Callable<Result>> solvers) * throws InterruptedException { * CompletionService<Result> ecs * = new ExecutorCompletionService<Result>(e); * int n = solvers.size(); * List<Future<Result>> futures * = new ArrayList<Future<Result>>(n); * Result result = null; * try { * for (Callable<Result> s : solvers) * futures.add(ecs.submit(s)); * for (int i = 0; i < n; ++i) { * try { * Result r = ecs.take().get(); * if (r != null) { * result = r; * break; * } * } catch (ExecutionException ignore) {} * } * } * finally { * for (Future<Result> f : futures) * f.cancel(true); * } * * if (result != null) * use(result); * }}</pre> */ public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor;//执行器 private final AbstractExecutorService aes;//执行器服务 private final BlockingQueue<Future<V>> completionQueue;//任务完成队列 /** * FutureTask extension to enqueue upon completion 扩展FutureTask的队列完成任务 */ private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } //这个是关键,在FutureTask那篇文章中,我们有讲,及当任务完成时调用done方法, //done方法为抽象方法,待子类扩展 protected void done() { //当任务任务执行结束时,添加到完成队列 completionQueue.add(task); } private final Future<V> task; } //根据Callable,创建 private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. * 根据执行器构建ExecutorCompletionService,完成队列默认为LinkedBlockingQueue * @param executor the executor to use * @throws NullPointerException if executor is {@code null} */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; //将完成任务放在LinkedBlockingQueue中 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. * 根据执行器和完成队列构建ExecutorCompletionService * @param executor the executor to use * @param completionQueue the queue to use as the completion queue * normally one dedicated for use by this service. This * queue is treated as unbounded -- failed attempted * {@code Queue.add} operations for completed taskes cause * them not to be retrievable. * @throws NullPointerException if executor or completionQueue are {@code null} */ public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } //提交执行Callable任务 public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } //提交执行Runnable任务 public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } //从完成队列take完成任务的结果,没有则阻塞,直到有任务完成 public Future<V> take() throws InterruptedException { return completionQueue.take(); } //从完成队列获取完成任务的结果,没有则返回null public Future<V> poll() { return completionQueue.poll(); } //从完成队列获取完成任务的结果,没有则超时等待,如果超时等待期间还没有完成任务,返回为null public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }
总结:
ExecutorCompletionService内部关联一个执行器AbstractExecutorService和
一个阻塞的任务完成队列,默认为LinkedBlockingQueue。当提交任务,则包装成QueueingFuture,QueueingFuture
扩展了FutureTask,重写done方法,即在任务执行结束时,添加任务执行结果到完成队列。
而take,poll,超时poll直接委托给完成队列。