CompletionService源码分析
本文假设你已经会使用线程池以及了解FutureTask了,不熟悉的源码强烈建议看下之前的博文Java线程池源码分析,读完在读本片博文轻松加愉快。
CompletionService作用
将线程池中完成的任务集中管理,方便、高效
的让获取每个线程执行的结果。
代码使用
public static void main(String[] args) throws InterruptedException, ExecutionException {
Random random = new Random();
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletionService<String> cs = new ExecutorCompletionService<>(pool);
for(int i = 0; i<4; i++) {
cs.submit(() -> {
Thread.sleep(random.nextInt(1000));
System.out.println(Thread.currentThread().getName()+"|完成任务");
return "data"+random.nextInt(10);
});
}
for(int j = 0; j < 4; j++) {
Future<String> take = cs.take(); //这一行没有完成的任务就阻塞
String result = take.get(); // 这一行在这里不会阻塞,引入放入队列中的都是已经完成的任务
System.out.println("获取到结果:"+result);
}
}
运行结果:
pool-1-thread-2|完成任务
获取到结果:data7
pool-1-thread-2|完成任务
获取到结果:data8
pool-1-thread-3|完成任务
获取到结果:data7
pool-1-thread-1|完成任务
获取到结果:data4
程序很简单,我们还是先把源码看下,再来说有优缺点。
重要属性
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
三个属性:
Executor是通过构造方法传进来的线程池,实质上是提交任务还是交由了线程池来执行,典型的静态代理。
BlockingQueue< Future< V>> 放入已经完成的
执行结果
ps:我们知道提交的任务执行的结果就是在FutureTask.outcome存在的,所以拿到FutureTask就能获取到结果。
上面代码从CompletionService中获取执行结果的时候,有两行注释。
我们知道BlockingQueue.take()和FutureTask.get()都会阻塞。但是这里要强调的使用这两行在这里只有第一行BlockingQueue.take()会阻塞,FutureTask.get()不会阻塞。
原因也解释了BlockQueue放入的都是已经完成的任务所以FutureTask.get()不会阻塞,阻塞的是任务没完成就从BlockingQueue中获取
构造方法
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
就是传入了一个Executor和给初始化无界阻塞队列,以便于后续能够存储线程执行的结果FutureTask。
CompletionService的提交方法
submit(task)
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
跟线程池的submit很相似,都是讲task包装成了FutureTask
这里真正调用线程池的execute(task)时候又将FutureTask包装成了QueueingFuture,来看下:
QueueingFuture
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
它是ExecutorCompletionService的内部类,重写了FuturTask的done()方法
将FutureTask放入到BlockingQueue中。
剩下的就是线程池的execute的执行逻辑,不清楚的看下我之前的博文Java线程池源码分析
这里只是坐下简单的任务的提交流程:
1)submit(taks) task为Callable或者Runnable。
2)任务包装成FutureTask,执行executor(futureTask)。
3)addWorker(futureTask),将futureTask设置为Worker的firstTask,其他的task从Queue中取。
4)runWorker(woker) futureTask执行run,调用callable.call() set结果给FutureTask的成员outcome赋值,这样futureTask就拿到了执行结果。
FutureTask的run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
//set(result)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//拿到结果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
//finishCompletion(); 调用futureTask.get()线程在任务没完成时调用正在队列等待,这里执行完了要唤醒等待它
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();//留给子类拓展的
callable = null; // to reduce footprint
}
代码看起来稍微有点长,但实际逻辑很简单:
1、调用result = callable.call()
2、给futurTask.outcome赋值为result
3、唤醒调用futureTask.get()的线程
4、调用需要子类实现的拓展方法done();//这个很有用在本博文中
都看到了,执行任务获取到执行结果给FutureTask.outcome赋值了之后,调用了FutureTask的拓展方法done()
而正好CompletionService的内部类QueueingFuture重新了该方法,将带有执行结果的FutureTask放入到了CompletionService的BlockingQueue中,这样一来就可以直接从BlockingQueue中获取执行结果了。
进阶部分
整个的CompletionService的源码本身是很简答的,简单一句话就是利用线程池提交任务,执行完获取到执行结果,将带有执行结果的FutureTask放入到自己的无界阻塞队列中。
你有没有考虑过或者之前见过这样的代码:
public static void main(String[] args) throws InterruptedException, ExecutionException {
Random random = new Random();
ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<String>> resultFuture = new ArrayList<>();
for(int i = 0; i<4; i++) {
final int tmp = i;
Future<String> future = pool.submit(() -> {
Thread.sleep(1000+10*tmp);
System.out.println(Thread.currentThread().getName()+"|完成任务");
return "data"+random.nextInt(10);
});
resultFuture.add(future);
}
System.out.println("--------------");
for(Future<String> future:resultFuture) {
String result = future.get();
System.out.println("执行结果"+result);
}
}
之前我在上一家公司有个同事也是这样写的,当时我觉得我跟他说你怎么不用CompletionService,他反问我有什么区别吗?当时我竟然无言以对!!!
区别在于:
1)执行结果放入容器时机不同
后者是在任务没有完成的时候就放入到了ArrayList中
CompletionService是在线程执行完拿到结果之后放入到了BlockingQueue中的2)阻塞的方法不同
后者是FutureTask.get()在任务没有执行完毕,它是阻塞的
CompletionService是BlockQueue.take()队列都拿完了,或者没有放入阻塞3)线程池虽然提交任务都是异步的,此两种写法
取出执行结果
的效率或者响应时间不同后者效率低,如果放入的第一个任务执行时间最长,后边的任务执行很短,那么这个方法的时间将会延长。
CompletionService 取出所消耗的时间是哪个最后完成任务的时间
再说两个坑,别踩上了哈!!!
坑一
使用它异步提交,在收集结果会导致乱序
项目中你有可能使用CompletionService批量(分页 每页是一个任务)去查询数据库(order排序了等),然后在汇总那结果add到一个list中。
此时用这种方式,由于不确定那个任务先返回,就add到了list中,会导致数据库中每页排好序的结果,由于汇总不按照找顺序汇总就乱序了。
解决方式有两种:
一种是再排序
一种是上面我们效率第一点的例子
坑二
在Spring项目中你的CompletionService的构建应当是方到跟使用线程的方法内部new出来,而不应该放到Controller中作为成员变量存在
CompletionService如果作为一个成员变量,
来一个A用户请求方法,方法执行完结果放入BlockingQueue中执行的结果被另外一个用户获取到了,A用户的方法执行到取结果的地方。
此时来了B用户请求方法,方法也执行到了取结果的地方,此时B可能取到A执行的结果,A也有可能B执行的结果,就是因为CompletionService成了共享对象,其内部成员BlockingQueue也成了共享对象。