FutureTask用来解决什么问题的?
FutureTask可看作对异步任务的封装,异步任务在它的封装下,可灵活进行阻塞获取结果或者中断。
FutureTask类结构关系怎么样的?
通过类图我们不难看出,它继承了Runable以及Future接口,所以它可以灵活的作为Runnable给thread执行,也可作为Future得到callable的计算结果。
既然兼容,那么我们就可以从这个类的构造方法开始看起:
通过以callable作为参数的构造方法我们可以看到,FutureTask有两个成员属性分别是callable,state。通过注释我们不难看出,这个构造方法做的就是将异步任务赋给成员变量,并将该任务状态置为new(这个状态的转换下文会进行详解,这里理解为初始化即可)
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
复制代码
重点来了,我们不妨看看FutureTask是如何兼容runnable的。
代码如下所示,通过Executors.callable(),将runnable封装成一个callable并初始化任务状态,并赋值一个result这个result干啥用的?我们不妨再进一步看看源码。
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
复制代码
我们更进一步看看这个Executors.callable()到底做了什么,如下所示对任务进行判空即开始使用RunnableAdapter转换为Callable。不难猜出这个RunnableAdapter一定用了适配器模式,我们不妨再前进一步看看。
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
复制代码
可以看到RunnableAdapter是Executors的内部类,他将上文的runnable赋给task,将上文的result赋值给result。
观察源码的同时我们看到了call方法,很明显这是让异步任务运行的方法,我们看到他让task跑起来,然后将作为参数传入的result作为结果返回以实现与callable逻辑保持一致。
真相了,原来带runnable参数的result,是作为成功后的返回结果,开发者可以根据这个返回值判断任务是否完成。
所以我们又开始好奇,FutureTask做了什么使得可以得到runnable的结果并结果作为参数传给这个适配器呢?我们不妨看看FutureTask的run源码
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
因为run是runnable接口提供的一个方法,那么我们就从FutureTask查看他是如何完成任务并将结果赋值给result的。看到这里,我们不妨再深入看看set(result)到底做了哪些事.
public void run() {
//可能会执行CAS替换runner为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//拿到成员变量callable 假设他是用Runnable创建的callable
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//跑起来
result = c.call();
//运行正常,ran(run的过去式)设置为true,代表完美运行且完成
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//重点,设置结果,若是runnable则是将传入的结果设置到result中
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
复制代码
如下所示,重点就是outcome,这个outcome我们可以看源码中的注释看到介绍The result to return or exception to throw from get() ;non-volatile, protected by state reads/writes
。 再有一个利用cas将当前任务状态设置为COMPLETING,这个COMPLETING代表什么呢?笔者后文会详细介绍,这里大概解释一些就是这个线程运行完成,但是结果还未知,有可能崩了,也有可能有结果但我们还不知道。这种状态我们统统称为COMPLETING。
然后在调用finishCompletion完成一些收尾逻辑。我们可以继续点进去看看。
protected void set(V v) {
// 利用cas将当前任务状态设置为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//将result赋给outcome
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
复制代码
从注释我们就可出这个方法的功能,唤醒等待线程并移除空的callable移除
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒等待线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
//将空的线程移除,这里这个next关键字不由得使我们想起链表的操作,所以无需继续解释
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
复制代码
FutureTask的线程安全是由什么保证的?
cas,如上文所示,我们可以看到大量的cas操作。例如set(result)UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
就是一个cas操作。这里有一个技巧,我们看到所有CompareAndSxxx就是cas操作。
如下就是cas执行的简介,感兴趣的读者可以自行百度。它的底层是c++实现的。
public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
obj :包含要修改的字段对象;
offset :字段在对象内的偏移量;
expect : 字段的期望值;
update :如果该字段的值等于字段的期望值,用于更新字段的新值;
复制代码
FutureTask结果返回机制?
回答这个问题,我们不妨看看这段源码,逻辑很简单,我们再往前关注一些,如果任务没完成如何进行等待完成。完成后result要怎么处理一下返回。
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
//当小于COMPLETING即任务没完成执行等待方法
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//执行完成返回指向report方法结果
return report(s);
}
复制代码
通过源码我们不难看出,任务就是通过无线自旋等待完成并返回的。
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//自旋
for (;;) {
//获取并清除中断状态
if (Thread.interrupted()) {
//移除等待WaitNode
removeWaiter(q);
throw new InterruptedException();
}
//如果完成了 清空等待节点线程 返回state
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//完成了还没结果,则让出cpu时间片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
//还没入队就用cas入队
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//阻塞当前线程
LockSupport.parkNanos(this, nanos);
}
else
//阻塞当前线程
LockSupport.park(this);
}
}
复制代码
再来看看report
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
//将输出结果赋值给x
Object x = outcome;
//通过上面传入的state判断这个状态是正常的完成还是异常完成,如果正常则返回x 否则抛异常
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
复制代码
FutureTask内部运行状态的转变?
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0; //表示新任务,初始状态
private static final int COMPLETING = 1; //任务跑完但是还没结果,或者任务跑崩了正准备存异常原因的时候,COMPLETING就是来形容这个状态的
private static final int NORMAL = 2; //任务顺利完成,outcome也成功存了正常的结果
private static final int EXCEPTIONAL = 3; //任务跑崩了,outcome记录的错误的结果
private static final int CANCELLED = 4; //任务还没开始或者跑一半没结束,就被用户调用cancel(false),使得任务取消但是任务自己继续跑着
private static final int INTERRUPTING = 5; //任务还没开始或者跑一半没结束,就被用户调用cancel(true),使得任务处于一个打断ing的中间态
private static final int INTERRUPTED = 6; //从打断ing到完成被打断
复制代码
可以看到FutureTask的精妙之处,对于数值的编排,所有大于1的都可以代表线程已经完成。
FutureTask如何安全取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
//如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//可以在运行时中断
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();//移除并唤醒所有等待线程
}
return true;
}
复制代码
FutureTask通常会怎么用?
package com.shark.wiki;
import java.util.concurrent.*;
public class CallDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 第一种方式:Future + ExecutorService
//// 创建缓存线程池
// ExecutorService service = Executors.newCachedThreadPool();
//// 创建任务
// Task task = new Task();
// Future<Integer> future = service.submit(task);
// service.shutdown();
// 第二种方式: FutureTask + ExecutorService
// ExecutorService executor = Executors.newCachedThreadPool();
// Task task = new Task();
// FutureTask<Integer> futureTask = new FutureTask<>(task);
// executor.submit(futureTask);
// executor.shutdown();
/**
* 第三种方式:FutureTask + Thread
*/
// 2. 新建FutureTask,需要一个实现了Callable接口的类的实例作为构造函数参数
FutureTask<Integer> futureTask = new FutureTask<>(new Task());
// 3. 新建Thread对象并启动
Thread thread = new Thread(futureTask);
thread.setName("Task thread");
thread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
// 4. 调用isDone()判断任务是否结束
if(!futureTask.isDone()) {
System.out.println("Task is not done");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int result = 0;
try {
// 5. 调用get()方法获取任务结果,如果任务没有执行完成则阻塞等待
result = futureTask.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("result is " + result);
}
// 1. 继承Callable接口,实现call()方法,泛型参数为要返回的类型
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("Thread [" + Thread.currentThread().getName() + "] is running");
int result = 0;
for (int i = 1; i <= 100; ++i) {
result += i;
}
Thread.sleep(3000);
System.out.println(result);
return result;
}
}
}
复制代码