ThreadPoolExecutor-可暂停的线程池

ThreadPoolExecutor可以通过子类实现一些扩展功能。

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);

这两个方法是在execute方法中执行线程前后执行。子类可以实现这两个方法,对线程池进行扩展。

对beforeExecute方法进行重写,实现一个可暂停/恢复执行的线程池。

public class PausableThreadPoolExecutor extends ThreadPoolExecutor {

    public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PausableThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler){
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    boolean isPause = false;
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    @Override
    public void beforeExecute(Thread t, Runnable r) {
        lock.lock();
        super.beforeExecute(t,r);
            try {
                while (isPause){
                long ms = 10L;
                System.out.printf("pausing, %s\n", t.getName());
                Thread.sleep(ms);
                condition.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
    }

    /**
     * 暂停
     */
    public void pause(){
        lock.lock();
        System.out.println("exe pause");
        isPause = true;
        lock.unlock();
    }

    /**
     * 继续执行
     */
    public void resume(){
        lock.lock();
        System.out.println("un pause");
        isPause = false;
        condition.signalAll();
        lock.unlock();
    }
}

通过Timer定时执行pause、resume操作,模拟进行暂停、恢复。

private static void simple2(){
        ThreadFactory namedThreadFactory = new MyThreadFactory();
        int queueCapacity = 3, corePoolSize=2, maximumPoolSize=4;
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);
        PausableThreadPoolExecutor threadPoolExecutor = new PausableThreadPoolExecutor(corePoolSize,maximumPoolSize,10,
                TimeUnit.SECONDS,arrayBlockingQueue,new MyRejectedExecutionHandler());
        timerPause(threadPoolExecutor);
        timerResume(threadPoolExecutor);
        for(int i=1;i<=120;i++){
            Thread thread = namedThreadFactory.newThread(new Task(i));
            threadPoolExecutor.execute(thread);
            System.out.printf("i:%d, queueSize:%d, poolSize:%d, coreSize:%d, maxSize:%d\n",
                    i, arrayBlockingQueue.size(), threadPoolExecutor.getPoolSize(),
                    threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize());
        }
        threadPoolExecutor.shutdown();
        while (true){
            if(threadPoolExecutor.isTerminated()){
                System.out.println("over");
                break;
            }
        }
        System.out.println( );
    }

    public static void timerPause(PausableThreadPoolExecutor executor){
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                executor.pause();
            }
        },10000L,10000L);
    }

    public static void timerResume(PausableThreadPoolExecutor executor){
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                executor.resume();
            }
        },10000L,15000L);
    }
发布了57 篇原创文章 · 获赞 39 · 访问量 20万+

猜你喜欢

转载自blog.csdn.net/SJZYLC/article/details/99835107