Runnable、Callable、Future和FutureTask
Java中创建线程的3种方式,一种是直接继承Thread类,一种就是实现Runnable接口,最后一种是实现Callable接口:
1、继承Thread方式,重写run方法:
package thread;
public class ThreadTest extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
new ThreadTest().start();
new ThreadTest().start();
}
}
输出:
Thread-0
Thread-1
2、实现Runnable接口
package thread;
public class RunnableTest implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread t1 = new Thread(new RunnableTest(),"一号口");
Thread t2 = new Thread(new RunnableTest(),"二号口");
t1.start();
t2.start();
}
}
输出:
一号口
二号口
两种方式共同的缺陷:执行完任务之后无法获取执行结果;
如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。自从Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
3、实现Callable接口,重写call方法:
package thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class Callable01 implements Callable<String> {
@Override
public String call() throws Exception {
return "xxxxxxxxxxxx";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask task = new FutureTask(new Callable01());
new Thread(task).start();
System.out.println(task.get());
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<String> future = threadPool.submit(new Callable01());
System.out.println(future.get());
}
}
输出:
xxxxxxxxxxxx
xxxxxxxxxxxx
Callable与Runnable的功能大致相似,Callable中有一个call()函数,但是call()函数有返回值,而Runnable的run()函数不能将结果返回给客户程序。
Callable接口实际上是属于Executor框架中的功能类,Callable接口与Runnable接口的功能类似,但提供了比Runnable更加强大的功能。
- Callable可以在任务结束的时候提供一个返回值,Runnable无法提供这个功能;
- Callable的call方法分可以抛出异常,而Runnable的run方法不能抛出异常。
看到上面,可能会觉得疑问,为什么两种方式都可以实现call方法的调用,接下来进行分析。
继承关系图
FutureTask类关系图:
ThreadPoolExecutor类关系图:
ExecutorService
在ExecutorService的接口中,有这样几个submit的版本:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
一般实现Callable接口是和ExecutorService搭配使用的,在ExecutorService接口中,常用的submit方法是上面第一和第三种,第二种就很少用了。
AbstractExecutorService
AbstractExecutorService是一个实现了ExecutorService接口的抽象类,ExecutorService接口中的submit方法的实现均在AbstractExecutorService抽象类中实现:
public abstract class AbstractExecutorService implements ExecutorService {
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
再来看newTaskFor()方法,这里是重载了两个个方法:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
也就是在这个地方,体现了submit方法和execute方法的区别,submit方法有返回值,execute没有,submit方法里参数可以是Callable类型也可以是Runnable类型,而execute方法里的参数只能是Runnable类型。还有newTaskFor方法返回值类型是一个FutureTask对象,为什么也可以是可以用RunnableFuture去接收呢?请往下看它们的关系。
ThreadPoolExecutor
从上面可以看出,submit方法最终还是要调用execute方法的,我在AbstractExecutorService中一直没有找到execute方法,那么execute方法在哪里呢?通过继承关系,我发现了它的子类ThreadPoolExecutor,从名字上来说,感觉像是找对了,看一下源码:
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
分三步进行:
1、如果正在运行的线程数少于corePoolSize,请尝试使用给定的命令启动一个新线程作为其第一个任务。
对addWorker的调用原子性地检查运行状态和workerCount,因此通过返回false来防止错误警报,这些错误
警报会在不应该添加线程的情况下添加线程
2、如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加一个线程(因为已有的线程在上
次检查之后就死了),还是池在进入这个方法后关闭了。因此,我们重新检查状态,如果需要,如果停止排
队,则回滚排队;如果没有排队,则启动一个新线程。
3、如果不能对任务进行排队,则尝试添加一个新线程。如果失败,我们知道我们已经关闭或饱和,因此拒绝
任务。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
这里只要知道执行execute方法主要是执行其子类ThreadPoolExecutor中的execute方法就可以了,这就是我们所说的线程池,关于线程池中execute方法以及一些参数下一篇文章继续跟进。
Future
Executor就是Runnable和Callable的调度容器,Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。get方法会阻塞,直到任务返回结果。
Future类位于java.util.concurrent包下,它是一个接口,方法如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Future接口中声明了5个方法,下面依次解释每个方法的作用:
- cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
- isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
- isDone方法表示任务是否已经完成,若任务完成,则返回true;
- get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
- get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
FutureTask
public class FutureTask<V> implements RunnableFuture<V>{
// 线程的执行状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// FutureTask内部封装了Callable接口
private Callable<V> callable;
// 返回值
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnbale和Futrue这两个接口,另外它还可以包装Runnable和Callable, 由构造函数注入依赖。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
而且FutureTask是Future接口的一个唯一实现类。
可以看到,Runnable注入会被Executors.callable()函数转换为Callable类型,即FutureTask最终都是执行Callable类型的任务。该适配函数的实现如下 :
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
RunnableAdapter适配器
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
由于FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行(这就是上述例子中为什么可以用Thread类进行调用)。
并且还可以直接通过get()函数获取执行结果,该函数会阻塞,直到结果返回。因此FutureTask既是Future、Runnable,又是包装了Callable( 如果是Runnable最终也会被转换为Callable ), 它是这两者的合体(通过构造函数无论传入是Runnable类型还是Callable类型最终都会赋值给其内部Callable的成员变量)。
FutureTask既然实现了Runnable,必定会有一个run方法,看一下run方法都干了啥:
public void run() {
// 先判断state状态,如果不是NEW说明执行完毕,直接return
// 后面使用CAS操作,判断这个任务是否已经执行,这里FutureTask有个全局的volatile runner字段,这里通过cas将当前线程指定给runner。
// 这里可以防止callable被执行多次。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
// 定义返回值
V result;
boolean ran;
try {
// 获取返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
// 发生异常,先将result至空,然后通过setException方法将异常信息赋值给outcome
result = null;
ran = false;
setException(ex);
}
// 执行成功后通过set方法将返回值赋值给outcome
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
通过run方法知道了返回值,接下来就需要用get方法去获取:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
get() 方法会进行自旋操作等待,直到FutureTask中的state状态大于NORMAL(表示自行完成),然后才会通过FutureTask的outcome获取返回值。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 等待时长
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 采用死循环的方式->自旋
for (;;) {
// = while(true)
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 第一次进来,state = NEW,会走第三个if分支,创建一个WaitNode节点
int s = state;
if (s > COMPLETING) {
// 已经完成call操作,直接返回当前state
if (q != null)
q.thread = null;
return s;
}
// 自旋的过程中,state = COMPLETING此时还没有完成赋值操作,NORMAL才代表完成,交出当前线程的时间片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
// 创建等待节点
q = new WaitNode();
// 其他线程来执行get操作,将WaitNode的next指向新线程
else if (!queued)
// 若未加入等待链表时,将 q 的 next 指向 waiters , 然后将 waiters 移动到 q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 超过等待时长 将等待节点移除
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 挂起调用者线程
// 当任务执行完 执行 finishCompletion 是会被唤醒
LockSupport.park(this);
}
}
总结
结合上述分析可得 FutureTask 执行活动图如下:
同时也可以看出,在 FutureTask 中内部维护了一个单向链表 waiters , 在执行 get 的时候会向其中添加节点:
从上可以看出 FutureTask 可以用于当一个线程需要等待另外一个线程执行完某个任务后才能继续执行的场景下。