聊一聊Doug Lea的生意经
1.第一阶段:Executor
早期创业时候不想那么多,能做生意就行。所以只有一个方法:execute()
public interface Executor {
void execute(Runnable command);
}
2.第二阶段:ExecutorService
发展才是硬道理,该严格要求一下自己。
public interface ExecutorService extends Executor {
void shutdown(); // 正常关门
List<Runnable> shutdownNow(); //土匪来了,立即关门
boolean isShutdown(); // 判断是否关门
boolean isTerminated(); // 是否破产
boolean awaitTermination(long timeout, TimeUnit unit) // 等待破产
submit() // 服务一个客人
invokeAll() // 服务多个客人并且返回所有人的状态和结果
invokeAny() // 服务多个客人,只返回吃完饭的人的评价
}
.
.
.
上面的部分足够展开我们我们的话题,更为详细的可以参考: 线程池4th卷:大鹏展翅恨天低
线程池的状态管理
人生无常,世事难料,做生意当然是有成功也有失败…总公司为了便于管理,当然需要实时掌握所创建的连锁店的状态(runState)。
Doug Lea的实现在ThreadPoolExecutor,现在我们结合源码看一下线程池的状态:
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
*The main pool control state, ctl, is an atomic integer packing
* two conceptual fields:
* 线程池控制状态:包含两部分内容。
* workerCount(工作线程的数量):indicating the effective number of threads
* runState(线程池的状态):indicating whether running, shutting down etc
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable.
* 为了将其限制在一个Int的范围内,限制厨师数量(workerCount)大约为5亿个
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
}
1. 谈一下runState 的生命周期状态的含义:
- RUNNING:正常营业:接受新的任务并且处理队列中的任务( Accept new tasks and process queued tasks)
- SHUTDOWN:打烊状态:不再接受新任务,但是处理队列中的任务( Don’t accept new tasks, but process queued tasks)
- STOP:停业状态:所有任务全部暂停,且尝试打断正在执行的任务( Don’t accept new tasks, don’t process queued tasks,and interrupt in-progress tasks)
- TIDYING:清算状态:所有任务全部暂停,辞退全部厨师,调用hook方法进入破产流程(All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method)
- TERMINATED:破产状态:过眼云烟…( terminated() has completed)
- (SHUTDOWN and pool and queue empty),处于SHUTDOWN状态且线程池和任务队列都为空
- (STOP and pool empty),处于STOP状态,线程池为空
2. 开店(ThreadPoolExecutor)的生命历程
世事真难料,人生信有涯
3.涉及runState的方法
多说一句,线程池这块的源码还是非常好看的,除了判断语句不加{}让人有点捉急…
3.1 execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 第一步:判断workerCount < corePoolSize,则addWorker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 第二步:判断线程池的状态,且可以成功添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 由于上面不是原子操作,所以需要double check. 判断不是RUNNING状态且可从任务队列中删除此任务(具体可以参见remove()方法).执行拒绝策略。
if (! isRunning(recheck) && remove(command)){
reject(command);
}else if (workerCountOf(recheck) == 0)
addWorker(null, false);
//第三步: 若不能添加任务,则addWorker,若失败就执行拒绝策略
}else if (!addWorker(command, false)){
reject(command);
}
}
这里提一下remove()方法
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 重点是这里的尝试终止操作。
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
3.2 shutdown()
代码逻辑都很清晰,不多啰嗦…
public void shutdown() {
// 保证线程安全,使用可重入锁(ReentrantLock),需要帮助的可以评论留言
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1. 检查权限
checkShutdownAccess();
// 2.将runState状态置为SHUTDOWN,
advanceRunState(SHUTDOWN);
// 3. 中断所有**空闲线程(idle)**
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
3.3 shutdownNow()
注意对比和shutdown()的区别
扫描二维码关注公众号,回复:
9725003 查看本文章
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 返回未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
return tasks;
}
3.4 awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 这里死循环轮询 线程池的状态 以及 判断等待时间
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
线程池的状态相关的东西大概就讲到这里,其实结合你开店做老板的经验,大概脉络就很清晰啦…
这里还是更推荐RTFSC… Doug Lea的代码逻辑以及注释都是非常好的…