版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_30038111/article/details/83987486
在利用org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
实现请求的慢处理时,我的配置中核心线程数为5,最大线程数为10,但是代码中创建了8个队列放在线程池中,假如我用id
去hash
取模,得到队列集合下标,从队列集合中队列,此时就会出现8个队列中,每个队列都有数据,但只有前5个队列中的请求被处理,后3个队列的请求被搁置。
因此,配置线程池的线程最少数量时,一定要大于等于自己创建的队列数量。
<bean id="deviceBindExecutor" name="deviceBindExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 线程池维护线程的最少数量 -->
<property name="corePoolSize" value="5" />
<!-- 允许的空闲时间 -->
<property name="keepAliveSeconds" value="200" />
<!-- 线程池维护线程的最大数量 -->
<property name="maxPoolSize" value="10" />
<!-- 缓存队列 -->
<property name="queueCapacity" value="10" />
<!-- 对拒绝task的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
代码实现如下:
/**
* 懒汉式单例可以实现传参的单例初始化
*/
public class RequestProcessorThreadPool {
private volatile static RequestProcessorThreadPool instance = null;
private RequestProcessorThreadPool() {
}
private RequestProcessorThreadPool(ApplicationContext applicationContext) {
RequestQueue requestQueue = RequestQueue.getInstance();
ThreadPoolTaskExecutor threadPool = (ThreadPoolTaskExecutor) applicationContext.getBean("deviceBindExecutor");
/**
* 给线程池中加入任务队列
*/
for (int i = 0; i < 8; i++) {
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
requestQueue.addQueue(queue);
threadPool.submit(new RequestProcessorThread(queue));
}
}
public static RequestProcessorThreadPool getInstance(ApplicationContext applicationContext) {
if (instance == null) {
synchronized (RequestProcessorThreadPool.class) {
if (instance == null) {
instance = new RequestProcessorThreadPool(applicationContext);
}
}
}
return instance;
}
}
/**
* 请求队列
*/
public class RequestQueue {
private List<ArrayBlockingQueue<Request>> queues = new ArrayList<ArrayBlockingQueue<Request>>();
private RequestQueue() {
}
private static class Singleton {
private static RequestQueue requestQueue = null;
static {
requestQueue = new RequestQueue();
}
public static RequestQueue getInstance() {
return requestQueue;
}
}
public static RequestQueue getInstance() {
return Singleton.getInstance();
}
/**
* 添加队列到集合
*/
public void addQueue(ArrayBlockingQueue<Request> queue) {
queues.add(queue);
}
/**
* 队列集合的大小
*/
public int queueSize() {
return queues.size();
}
/**
* 根据集合下表获取队列
*/
public ArrayBlockingQueue<Request> getQueue(int index) {
return queues.get(index);
}
}