写个简单使用:
Task类、MyThreadFactory类参见:https://blog.csdn.net/SJZYLC/article/details/98099977
private static void simple(){
ThreadFactory namedThreadFactory = new MyThreadFactory();
int queueCapacity = 3, corePoolSize=2, maximumPoolSize=4;
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10,
TimeUnit.SECONDS,arrayBlockingQueue);
for(int i=1;i<=120;i++){
Thread thread = namedThreadFactory.newThread(new Task(i));
threadPoolExecutor.execute(thread);
System.out.printf("i:%d, queueSize:%d, poolSize:%d, coreSize:%d, maxSize:%d\n",
i, arrayBlockingQueue.size(), threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize());
}
threadPoolExecutor.shutdown();
while (true){
if(threadPoolExecutor.isTerminated()){
System.out.println("over");
break;
}
}
System.out.println( );
}
for循环执行120次,也就是向线程池添加120个任务,结果报了异常:
Exception in thread "main" java.util.concurrent.RejectedExecutionException
因为队列大小为3,线程池最大为4,小于要添加的任务,所以会报这个拒绝执行异常。
如何避免异常呢?
任务数不确定,即使确定,也不能创建一个特别大容量的线程池。
由于线程池会不断从队列中取线程任务进行执行,而队列是阻塞的,所以我们可以做一个判断:当线程池中的线程数等于当前总容量的时候,就向阻塞队列添加线程。队列满了,就会阻塞,直到有新的空间,就会继续向队列里面添加新元素。
private static void simple(){
ThreadFactory namedThreadFactory = new MyThreadFactory();
int queueCapacity = 3, corePoolSize=2, maximumPoolSize=4;
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10,
TimeUnit.SECONDS,arrayBlockingQueue);
for(int i=1;i<=120;i++){
Thread thread = namedThreadFactory.newThread(new Task(i));
////判断当前线程池是否已满
if((threadPoolExecutor.getPoolSize()+arrayBlockingQueue.size())>=(queueCapacity+maximumPoolSize)){
try {
arrayBlockingQueue.put(thread);
System.out.printf("队列中添加线程 i:%d, queueSize:%d, poolSize:%d, coreSize:%d, maxSize:%d\n",
i, arrayBlockingQueue.size(), threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize());
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
threadPoolExecutor.execute(thread);
System.out.printf("i:%d, queueSize:%d, poolSize:%d, coreSize:%d, maxSize:%d\n",
i, arrayBlockingQueue.size(), threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize());
}
}
threadPoolExecutor.shutdown();
while (true){
if(threadPoolExecutor.isTerminated()){
System.out.println("over");
break;
}
}
System.out.println( );
}
还有一种优雅的方式,使用RejectedExecutionHandler处理。
当execute()方法不能接受某个任务时,会调用rejectedExecution()方法。RejectedExecutionHandler是个接口,我们在实现里面处理,当调用这个方法时,我们想阻塞队列添加元素。
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
//阻塞
System.out.format("handler处理,向阻塞队列排队,%s \n", r);
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
之前的方法为:
private static void simple(){
ThreadFactory namedThreadFactory = new MyThreadFactory();
int queueCapacity = 3, corePoolSize=2, maximumPoolSize=4;
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,10,
TimeUnit.SECONDS,arrayBlockingQueue,new MyRejectedExecutionHandler());
for(int i=1;i<=120;i++){
Thread thread = namedThreadFactory.newThread(new Task(i));
threadPoolExecutor.execute(thread);
System.out.printf("i:%d, queueSize:%d, poolSize:%d, coreSize:%d, maxSize:%d\n",
i, arrayBlockingQueue.size(), threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize());
}
threadPoolExecutor.shutdown();
while (true){
if(threadPoolExecutor.isTerminated()){
System.out.println("over");
break;
}
}
System.out.println( );
}
也就是构造ThreadPoolExecutor时候,传入RejectedExecutionHandler对象。