Thread
线程的基本概念我们已经在深入理解Java并发机制(1)–理论基础中总结过了,这里不再赘述。
线程的状态
Java线程在其生命周期里处于以下6种状态,这些状态定义在Thread类内部枚举中。
状态 | 说明 |
---|---|
NEW | 初始态,还未调用start()方法 |
RUNNABLE | 运行态,将操作系统中的就绪态和运行态统一定义为RUNNABLE |
BLOCKED | 阻塞态,线程等待获取锁 |
WAITING | 等待态,线程等待其他线程做出一些特定状态(通知或中断) |
TIME_WAITING | 超时等待态,不同于等待态,超时后自行返回 |
TERMINATED | 终止态,线程执行完毕 |
状态转移图如下:
从本质上讲,BLOCKED/WAITING/TIME_WAITING这三种状态都是等待状态,只不过分为三类便于管理。BLOCKED状态是获取锁而不得,WAITING状态是自己放弃执行等待某条件成熟。在需要竞争锁的场景下,这两者分别是阻塞队列和等待队列,阻塞队列竞争锁执行,等待队列等待条件成熟移至阻塞队列竞争锁继续执行。在不需要竞争锁的情况下,WAITING状态可以直接转为RUNNABLE。
阻塞与中断
当线程阻塞时,它通常处于BLOCKED/WAITING/TIME_WAITING状态,等待某个事件的发生(如:IO/锁的获取/sleep中醒来等)。线程阻塞和运行时间长的程序区别在于,阻塞的线程必须等待一个不受它自己控制的事件发生以后才能继续执行。
当一个方法抛出InterruptedException时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将尽力提前结束阻塞状态。
中断是线程的标志位属性,表示一个运行中的线程是否被其他线程进行了中断操作。中断是一种协作机制,当线程A中断线程B时,仅仅是要求B在执行到可以暂停的地方停止正在执行的工作—这经常用于取消某个操作,方法对中断的响应度越高,就越容易取消那些执行时间很长的操作。
当在代码中调用了一个将抛出InterruptedException的方法时,调用者也就变成了一个阻塞方法,重点是必须要对中断做相应的处理,一般有两种处理方式:
- 传递InterruptedException:不做任何处理将中断情况传递给高层调用者,交由调用者处理
- 恢复中断:当不能抛出InterruptException时,如Runnable中的run方法,捕获到了InterruptException后,调用线程的interrupt方法重新设置中断标志位,以便保持中断信息,供其他代码使用。
这两种方式对于中断的处理都是一个原则:不能无缘无故吃掉中断信息,保证中断信息可靠有效的传递给需要的代码。
取消与关闭
当我们启动一个线程后,如果我们对它失去控制将是一件很可怕的事情,想象一下当一个写文件的线程启动后,我们无法终止它,即使我们不想它继续写了,这很尴尬,在线程中加入控制手段是非常有必要的。
Java没有提供任何机制来安全的终止线程,但提供了线程协作机制—中断,通过中断,一个线程可以终止另外一个线程。
这里说终止,并不意味着线程A可以直接关闭线程B,而是指的是A可以通过interrupt(Thread t)方法去设置线程B的中断标志位。在线程B内有轮询代码(可中断的阻塞方法或者是外层循环代码)不断检查其中断标志位,如果发现已经被线程A中断,是抛出Interrupt-Exception,还是执行其他业务逻辑,还是执行相应的清理工作提前结束线程工作,取决于具体的逻辑需要。
Runnable与Callable
Java线程执行内容的定义有三种方式:
- 继承Thread类,重写run()方法
- 实现Runnable接口
- 实现Callable接口
虽然有三种定义方式,但是本质上线程的启动都是通过Thread实例中的start方法来启动的(线程池的部分我们后面再讲)。
Runnable和Callable的区别和联系:
- 区别:主要表现在两点:1、对异常的捕捉 2、线程执行结果的获取
- 联系:Callable是对Runnable的补充,通过FutureTask类封装为Runnable类型
区别很明显,直接看代码:
Runnable:
public
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
Callable:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
简要说明一下Callable的使用逻辑:
首先定义了一个类实现了Callable接口,在Callable对象在submit到线程池执行时,会先被封装为一个FutureTask对象,Callable提供的多于Runnable的对异常的捕捉和结果的返回就封装在FutureTask类里,可以说FutureTask就是Callable的一个装饰器类。
FutureTask类实现了RunnableFuture接口(即Runnable和Future接口的结合接口),定义了7种线程执行的状态如下:
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
从构造器可以看出它对callable和runnable都可以进行包装。
FutureTask的run方法实现增强了对callable的调用,在线程执行后持有了线程异常或者线程执行结果。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里就是对callable定义的方法进行了直接调用
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 捕获发生的异常并持有
setException(ex);
}
if (ran)
//没有发生异常则持有结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
再来看其如何实现结果和异常的返回,主要是Future接口的get()方法:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//当线程未执行完毕时,进入awaitDone方法
s = awaitDone(false, 0L);
// 通过report方法返回结果
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//轮询
for (;;) {
//检测中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果线程执行完毕,或者被取消,或者抛出异常,或者被中断,即从轮询中返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//在轮询中等待线程状态的切换
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
// report方法可以视线程执行情况返回异常或结果
private V report(int s) throws ExecutionException {
Object x = outcome;
// 如果线程没有抛出异常,则返回结果
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
//如果被取消了则抛出CancellationException()
throw new CancellationException();
//如果线程抛出了其他异常,则将异常包装为ExecutionException继续向上抛出
throw new ExecutionException((Throwable)x);
}
FutureTask还提供了取消的方法:
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
// 线程已经开始执行,在允许中断的情况下,通过中断通知被取消的线程
if (mayInterruptIfRunning) {
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
//线程尚未开始执行,直接取消
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}
FutureTask类通过对callable和runnable的包装增强,是我们可以对线程生命周期做到良好控制和有效监控,并给之后线程池技术的实现打下了基础。