定义:通过将停止线程这个动作分解为准备阶段和执行阶段两个阶段,提供了一种通用的用于优雅地停止线程的方法
准备阶段:“通知”目标线程(欲停止的线程)准备进行停止,会设置一个标志变量用于指示目标线程可以准备停止了
停止阶段:检查准备阶段所设置的线程停止标志和信号,在此基础上决定线程停止的时机,并进行适当的“清理”操作
ThreadOwner: 目标线程的拥有者。一般讲目标线程的创建者视为该线程的拥有者,并假定其“知道”目标线程的工作内容,可以安全地停止目标线程
Terminatable: 可停止线程的抽象
terminate: 请求目标线程终止
AbstractTerminatableThread: 可停止的线程
terminate: 设置线程停止标志,并发送停止“信号”给目标线程
doTerminate: 留给子类实现线程停止时所需的一些额外操作,如目标线程代码中包含Socket I/O,子类可以在该方法中关闭socket以达到快速停止线程,而不会使目标线程等待I/O完成才能侦测到线程停止标记
doRun: 线程处理逻辑方法。留给子类实现线程的处理逻辑,相当于Thread.run(),只不过该方法中无须关心停止线程的逻辑,因为这个逻辑已经被封装在TerminatableThread的run方法中
doCleanup: 留给子类实现线程停止后可能需要的一些清理动作
TerminationToken: 线程停止标志。toShutdown用于指示目标线程可以停止了。reservations可用于反映目标线程还有多少数量未完成的任务,以支持等目标线程处理完任务后再进行停止
ConcreteTerminatableThread: 由应用自己实现的AbstractTerminatableThread参与者的实现类。该类需要实现其父类的doRun()抽象方法,在其中实现线程的处理逻辑,并根据应用的实际需要覆盖其父类的doTerminate方法、doCleanup方法
下列代码中告警发送线程停止需要解决两个问题,一个是将缓存队列中的消息发送出去,一个是用于缓存消息的阻塞队列为空时发送线程会处于等待状态,无法响应关闭线程的请求,因此此处使用两阶段终止模式来解决这个问题。设置一个计数器reservations,消息缓存到队列是加1,消息发送成功是减1,如果值为0,则可以确定线程无未处理任务,可以关闭。执行终止方法AbstractTerminatableThread#terminate()时先将标记toShutdown设置为true,此为准备阶段,然后执行终止逻辑,如果缓存队列中无待处理的任务,则强制终止线程(即调用super.interrupt()方法)。执行阶段AbstractTerminatableThread#run()会判断toShutdown和resvations的值进行终止线程,然后还会执行一些清理。
package com.bruce.twoPhaseTermination;
/**
* @Author: Bruce
* @Date: 2019/5/31 12:31
* @Version 1.0
*/
public interface Terminatable {
void terminate();
}
package com.bruce.twoPhaseTermination;
import java.lang.ref.WeakReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: Bruce
* @Date: 2019/5/31 12:55
* @Version 1.0
*/
public class TerminationToken {
protected volatile boolean toShutdown = false;
public final AtomicInteger reservations = new AtomicInteger(0);
private final Queue<WeakReference<Terminatable>> coordinatedThreads;
public TerminationToken() {
coordinatedThreads = new ConcurrentLinkedDeque<WeakReference<Terminatable>>();
}
public boolean isToShutdown() {
return toShutdown;
}
protected void setToShutdown(boolean toShutdown) {
this.toShutdown = true;
}
public void register(Terminatable thread) {
coordinatedThreads.add(new WeakReference<Terminatable>(thread));
}
protected void notifyThreadTermination(Terminatable thread) {
WeakReference<Terminatable> wrThread;
Terminatable otherThread;
while (null != (wrThread = coordinatedThreads.poll())) {
otherThread = wrThread.get();
if (null != otherThread && otherThread != thread) {
otherThread.terminate();
}
}
}
}
package com.bruce.twoPhaseTermination;
import com.bruce.guardedSuspension.AlarmAgent;
import com.bruce.guardedSuspension.AlarmInfo;
import com.bruce.guardedSuspension.AlarmType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: Bruce
* @Date: 2019/5/31 0:07
* @Version 1.0
*/
public class AlarmSendingThread extends AbstractTerminatableThread {
private Logger LOG = LoggerFactory.getLogger(AlarmSendingThread.class);
private final AlarmAgent alarmAgent = new AlarmAgent();
private final BlockingQueue<AlarmInfo> alarmQueue;
private final ConcurrentMap<String, AtomicInteger> submittedAlarmRegistry;
public AlarmSendingThread() {
alarmQueue = new ArrayBlockingQueue<AlarmInfo>(100);
submittedAlarmRegistry = new ConcurrentHashMap<String, AtomicInteger>();
alarmAgent.init();
}
@Override
protected void doRun() throws Exception {
AlarmInfo alarm;
alarm = alarmQueue.take();
terminationToken.reservations.decrementAndGet();
try {
alarmAgent.sendAlarm(alarm);
} catch (Exception e) {
e.printStackTrace();
}
if (AlarmType.RESUME == alarm.type) {
String key = AlarmType.FAULT.toString() + ':' + alarm.getId() + '@'
+ alarm.getExtraInfo();
submittedAlarmRegistry.remove(key);
key = AlarmType.RESUME.toString() + ':' + alarm.getId() + '@'
+ alarm.getExtraInfo();
submittedAlarmRegistry.remove(key);
}
}
public int sendAlarm(final AlarmInfo alarmInfo) {
AlarmType type = alarmInfo.type;
String id = alarmInfo.getId();
String extraInfo = alarmInfo.getExtraInfo();
if (terminationToken.isToShutdown()) {
LOG.error("rejected alarm:" + id + "," + extraInfo);
return -1;
}
int duplicateSubmissionCount = 0;
try {
AtomicInteger prevSubmittedCounter;
prevSubmittedCounter = submittedAlarmRegistry.putIfAbsent(type.toString()
+ ':' + id + '@' + extraInfo, new AtomicInteger(0));
if (null == prevSubmittedCounter) {
alarmQueue.put(alarmInfo);
terminationToken.reservations.incrementAndGet();
} else {
duplicateSubmissionCount = prevSubmittedCounter.incrementAndGet();
}
} catch (Throwable t) {
t.printStackTrace();
}
return duplicateSubmissionCount;
}
@Override
protected void doCleanup(Exception exp) {
if (null != exp && !(exp instanceof InterruptedException)) {
exp.printStackTrace();
}
alarmAgent.disconnect();
}
}
package com.bruce.twoPhaseTermination;
import com.bruce.guardedSuspension.AlarmInfo;
import com.bruce.guardedSuspension.AlarmType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Author: Bruce
* @Date: 2019/5/31 0:05
* @Version 1.0
*/
public class AlarmMgr {
private Logger LOG = LoggerFactory.getLogger(AlarmMgr.class);
private static final AlarmMgr INSTANCE = new AlarmMgr();
private volatile boolean shutdownRequested = false;
private final AlarmSendingThread alarmSendingThread;
private AlarmMgr() {
alarmSendingThread = new AlarmSendingThread();
}
public static AlarmMgr getInstance() {
return INSTANCE;
}
public int sendAlarm(AlarmType type,String id, String extraInfo) {
LOG.info("Trigger alarm " + type + "," + id + ',' + extraInfo);
int duplicateSubmissionCount = 0;
try {
AlarmInfo alarmInfo = new AlarmInfo(id, type);
alarmInfo.setExtraInfo(extraInfo);
duplicateSubmissionCount = alarmSendingThread.sendAlarm(alarmInfo);
} catch (Throwable t) {
t.printStackTrace();
}
return duplicateSubmissionCount;
}
public void init() {
alarmSendingThread.start();
}
public synchronized void shutdown() {
if (shutdownRequested) {
throw new IllegalStateException("shutdown already requested");
}
alarmSendingThread.terminate();
shutdownRequested = true;
}
}
package com.bruce.twoPhaseTermination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Author: Bruce
* @Date: 2019/5/31 0:08
* @Version 1.0
*/
public abstract class AbstractTerminatableThread extends Thread implements Terminatable {
final static Logger LOG = LoggerFactory.getLogger(AbstractTerminatableThread.class);
private final boolean DEBUG = true;
public final TerminationToken terminationToken;
public AbstractTerminatableThread() {
this(new TerminationToken());
}
public AbstractTerminatableThread(TerminationToken terminationToken) {
this.terminationToken = terminationToken;
terminationToken.register(this);
}
protected abstract void doRun() throws Exception;
protected void doCleanup(Exception cause) {
}
protected void doTerminate() {}
@Override
public void run() {
Exception ex = null;
try {
for (;;) {
if (terminationToken.isToShutdown()
&& terminationToken.reservations.get() <= 0) {
break;
}
doRun();
}
} catch (Exception e) {
ex = e;
if (e instanceof InterruptedException) {
if (DEBUG) {
LOG.debug("", e);
}
} else {
LOG.error("", e);
}
} finally {
try {
doCleanup(ex);
} finally {
terminationToken.notifyThreadTermination(this);
}
}
}
@Override
public void interrupt() {
terminate();
}
@Override
public void terminate() {
terminationToken.setToShutdown(true);
try {
doTerminate();
} finally {
if (terminationToken.reservations.get() <= 0) {
super.interrupt();
}
}
}
public void terminate(boolean waitUtilThreadTerminate) {
terminate();
if (waitUtilThreadTerminate) {
try {
this.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
两阶段终止模式可以对各种形式的目标线程进行优雅的停止,如目标线程调用了能够对interrupt方法调用做出响应的阻塞方法、目标线程调用了不能对interrupt方法调用做出响应的阻塞方法、目标线程作为消费者处理其他线程产生的“产品”,在其停止前需要处理完现有“产品”。两阶段终止模式在大量的中间件中有应用,例如在RocketMQ,org.apache.rocketmq.client.impl.consumer.PullMessageService#shutdown()方法中会调用如下两个方法来优雅地终止线程,非常值得学习和借鉴。
@Override
public void shutdown(boolean interrupt) {
super.shutdown(interrupt);
ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
}
public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
try {
if (interrupt) {
this.thread.interrupt();
}
long beginTime = System.currentTimeMillis();
if (!this.thread.isDaemon()) {
this.thread.join(this.getJointime());
}
long eclipseTime = System.currentTimeMillis() - beginTime;
log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ this.getJointime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
// Disable new tasks from being submitted.
executor.shutdown();
try {
// Wait a while for existing tasks to terminate.
if (!executor.awaitTermination(timeout, timeUnit)) {
executor.shutdownNow();
// Wait a while for tasks to respond to being cancelled.
if (!executor.awaitTermination(timeout, timeUnit)) {
log.warn(String.format("%s didn't terminate!", executor));
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted.
executor.shutdownNow();
// Preserve interrupt status.
Thread.currentThread().interrupt();
}
}
参考资料