对于一个线程池如果使用shutdownNow() or shutdown() 方法关闭线程池的话可能会造成不必要的开销,因为有时候我们需要关闭线程池的线程但是之后可能还会继续使用线程池,
因此,最好的办法是自己写一个终止线程的方法。一下就是我给出的一种方法:
拿到线程池源码作为自己包下的类后,直接修改源码,添加一下内容,
// TODO: 2017/5/15 线程管理类 private volatile boolean exit=false; private BlockingQueue<Runnable> remainingTasks; class ThreadManager{ public int getThreadCount(){return workers.size();} public BlockingQueue<Runnable> stopAllThreads(){//结束所有线程 exit=true; return remainingTasks; } } private ThreadManager poolManager; public ThreadManager getThreadPoolManager(){ return poolManager; }
//修改线程池的这个方法
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { if (exit) { remainingTasks=new ArrayBlockingQueue<Runnable>(workQueue.size()); while (!workQueue.isEmpty()){ remainingTasks.add(workQueue.take()); } exit=false; } Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }字体变大部分为添加内容,其余不需要修改。
首先来看一下为什么要在getTask()方法中添加代码,getTask()方法是线程用来获取任务的方法;如果该方法获取不到任务线程池就会关闭,这个方法本身就在线程中运行所以不必担心线程阻塞问题;
看一下添加的代码:
if (exit) {
remainingTasks=new ArrayBlockingQueue<Runnable>(workQueue.size());
while (!workQueue.isEmpty()){
remainingTasks.add(workQueue.take());
}
exit=false;
}
意思是说当线程池的
stopAllThreads()方法执行后exit变为true,则取走任务队列中所有任务,然后getTask()方法会返回null,这样线程就会自动关闭,从而就达到了目的,同时也能拿到剩余的任务;
当然由于在线程中进行所以返回的任务列表可能为空,解决办法是延时获取,或者在一个线程中获取。
还有一种方法直接修改下面的方法就能直接得到剩余任务:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {
if(exit)
break; // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);// TODO: 2017/5/13 此方法并没有实现,但是子类可以实现该方法;注意子类实现该方法的时候,实现内容应在super()之前; Throwable thrown = null; try { mRunningTasks.add(task); task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { mRunningTasks.remove(task); task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
然后将上面的方法改为:
public BlockingQueue<Runnable> stopAllThreads(){//结束所有线程
return workerQueue;
}
再到execute()方法中添加,
if(exit)
exit=false;
即可。
源码下载