多线程在现在工作中出现越来越频繁、需要我们熟记并且能熟练的使用之、对相关线程池的一些配置需要我们非常熟悉。
1. 参数详解
1.1 corePoolSize
corePoolSize 核心线程数
- 核心线程会一直存活,即使没有任务需要执行
- 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
- 设置allowCoreThreadTimeOut=true(默认false)时,核心线程会超时关闭
1.2 queueCapacity
queueCapacity 任务队列容量(阻塞队列)
- 当核心线程数达到最大时,新任务会放在队列中排队等待
1.3 maxPoolSize
maxPoolSize 最大线程数
- 当线程数 >= corePoolSize 且任务队列已满时,线程池会创建新线程来处理任务
- 当线程数 = maxPoolSize且任务队列已满时,线程池会拒绝处理任务而抛出异常
1.4 keepAliveTime
keepAliveTime 线程空闲时间
- 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=coolPoolSize
- 如果allowCoreThreadTimeOut=true时,则会直到线程数量=0
1.5 allowCoreThreadTimeOut
allowCoreThreadTimeOut 允许核心线程超时
1.6 rejectedExecutionHandler
rejectedExecutionHandler 任务拒绝处理器
-
两种情况会拒绝处理任务
- 当线程数已经达到maxPoolSize且任务队列已满,就会拒绝新的任务
- 当线程池被调运shutdown() 后,会等待线程池里的任务执行完毕在shutdown。如果在调运shutdown() 和线程池真正shutdown之间提交任务,会拒绝新的任务
-
线程池会调运rejectedExecutionHandler来处理这个任务,如果没有设置默认是AbortPolicy,会抛出异常
- ThreadPoolExecutor类有几个内部实现类来处理这类情况
1. **AbortPolicy** 丢弃任务、抛运行时异常
1. **CallerRunsPolicy** 执行任务
1. **DisCardPolicy** 忽视,什么都不会发生
1. **DisCardOldestPolicy** 从队列中提出最先 进入队列(最后一个执行)的任务
- 实现RejectedExecutionHandler接口,可自定义处理器
2. ThreadPoolExecutor 执行顺序
线程池按以下行为执行任务
1. 当线程数 < 核心线程数时,创建线程。
2. 当线程数 >= 核心线程数,且任务队列未满时,将任务放入任务队列。
3. 当线程数 >= 核心线程数,且任务队列已满
- 若线程数 < 最大线程数,创建线程
- 若线程数 = 最大线程数,抛出异常,拒绝任务
3. 如何设置参数
3.1 默认值
corePoolSize = 1
queueCapacity = Integer.MAX_VALUE
maxPoolSize = Integer.MAX_VALUE
keepAliveTime = 60s
allowCoreThreadTimeOut = false
rejectedExecutionHandler = AbortPolicy
3.2 如何来设置
- 需要根据几个值来决定
tasks : 每秒的任务数,假设为 500 - 1000
taskcost : 每个任务花费时间,假设为0.1s
responsetime : 系统允许容忍的最大响应时间,假设为1s
- 计算方式
corePoolSize = 每秒需要多少个线程处理
* threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
* 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
* 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
* 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
* 计算可得 maxPoolSize = (1000-80)/10 = 92
* (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
4. 使用案例
@Component
public class KafkaReceiver {
@Autowired
private BroadbandService broadbandService;
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("deal-pool-%d")
.build();
private ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 监听kafka消息受理业务
*/
@KafkaListener(topics = "${kafka.topics[1]}")
public void listenerKafkaProductCommit(ConsumerRecord<String, String> record) {
threadPool.execute(() -> {
try {
BusinessOrder businessOrder = JsonUtil.jsonToObject(record.value(), BusinessOrder.class);
log.info("消息队列正在工作,订单号: {}", businessOrder.getOrderNo());
//尝试多线程处理
broadbandService.asynSchoolBroadbandRemoveService(businessOrder);
} catch (Exception e) {
log.error("消息队列正在工作异常", e);
}
});
}
}