版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/danielzhou888/article/details/84036900
使用Semaphore控制线程池任务提交的速率
欢迎关注作者博客
简书传送门
介绍
当线程池的工作队列被填满后,如果没有预定义的饱和策略来阻塞任务的执行,则可以通过信号量Semaphore来限制任务的到达率。Semaphore是一个同步工具类,用来控制同时访问某个特定资源的操作数量。它的acquire方法返回一个虚拟的许可,如果没有可用的许可,则阻塞该方法的调用线程直到有可用许可为止。如果线程池使用无界队列缓冲任务时,如果任务在某一时间增长数量过快,容易导致内存耗尽。
无界队列和Semaphore搭配使用,通过设置信号量的上界,来控制任务的提交速率。
四种饱和策略
- static class ThreadPoolExecutor.AbortPolicy
用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException. - static class ThreadPoolExecutor.CallerRunsPolicy
用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。 - static class ThreadPoolExecutor.DiscardOldestPolicy
用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。 - static class ThreadPoolExecutor.DiscardPolicy
用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
源码
下面使用Semaphore来控制线程池任务提交的速率:
/**
* @program:
* @description: 使用Semaphore控制线程池任务提交速率
* @author: zhouzhixiang
* @create: 2018-11-13 20:48
*/
@ThreadSafe
public class BoundedExecutor {
private final ExecutorService executor;
private final Semaphore semaphore;
public BoundedExecutor(ExecutorService executor, int bound) {
this.executor = executor;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) {
try {
semaphore.acquire();
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
}finally {
semaphore.release();
}
}
});
} catch (InterruptedException e) {
semaphore.release();
}
}
public void stop(){
this.executor.shutdown();
}
static class MyThread extends Thread {
public String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("Thread-"+name+" is running....");
try {
Thread.sleep(new Random().nextInt(10000));
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(2,2,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5));
BoundedExecutor executor = new BoundedExecutor(executorService, 5);
for (int i = 0; i < 100; i++) {
executor.submitTask(new MyThread(""+i));
}
executor.stop();
}
}
欢迎加入Java猿社区