ThreadPoolExecutor可以通过子类实现一些扩展功能。
protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
这两个方法是在execute方法中执行线程前后执行。子类可以实现这两个方法,对线程池进行扩展。
对beforeExecute方法进行重写,实现一个可暂停/恢复执行的线程池。
public class PausableThreadPoolExecutor extends ThreadPoolExecutor {
public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PausableThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler){
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
boolean isPause = false;
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
@Override
public void beforeExecute(Thread t, Runnable r) {
lock.lock();
super.beforeExecute(t,r);
try {
while (isPause){
long ms = 10L;
System.out.printf("pausing, %s\n", t.getName());
Thread.sleep(ms);
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
/**
* 暂停
*/
public void pause(){
lock.lock();
System.out.println("exe pause");
isPause = true;
lock.unlock();
}
/**
* 继续执行
*/
public void resume(){
lock.lock();
System.out.println("un pause");
isPause = false;
condition.signalAll();
lock.unlock();
}
}
通过Timer定时执行pause、resume操作,模拟进行暂停、恢复。
private static void simple2(){
ThreadFactory namedThreadFactory = new MyThreadFactory();
int queueCapacity = 3, corePoolSize=2, maximumPoolSize=4;
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);
PausableThreadPoolExecutor threadPoolExecutor = new PausableThreadPoolExecutor(corePoolSize,maximumPoolSize,10,
TimeUnit.SECONDS,arrayBlockingQueue,new MyRejectedExecutionHandler());
timerPause(threadPoolExecutor);
timerResume(threadPoolExecutor);
for(int i=1;i<=120;i++){
Thread thread = namedThreadFactory.newThread(new Task(i));
threadPoolExecutor.execute(thread);
System.out.printf("i:%d, queueSize:%d, poolSize:%d, coreSize:%d, maxSize:%d\n",
i, arrayBlockingQueue.size(), threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize());
}
threadPoolExecutor.shutdown();
while (true){
if(threadPoolExecutor.isTerminated()){
System.out.println("over");
break;
}
}
System.out.println( );
}
public static void timerPause(PausableThreadPoolExecutor executor){
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
executor.pause();
}
},10000L,10000L);
}
public static void timerResume(PausableThreadPoolExecutor executor){
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
executor.resume();
}
},10000L,15000L);
}