一、开始
多任务并行执行阻塞等待全部执行完毕,通过线程池控制并发数。 前半部分为代码(按照执行流程罗列),后半部分为相关注释。
二、创建带有返回数据的Callable
public Callable<String> get(String key, Map<String, Object> params) {
return () -> {
System.out.println("开始查询=======" + params.toString());
Thread.sleep(500);
long currentTimeMillis1 = System.currentTimeMillis();
System.out.println("get任务耗时:" + (currentTimeMillis1 - currentTimeMillis) + "ms");
return key + "哈哈哈哈哈哈";
};
}
复制代码
三、循环调用创建Callable<String>
List<Callable<String>> tasks = new ArrayList<>();
for (int i = 0; i < 30; i++) {
// ... params
Callable<String> callable = mSmsStatisticsService.get(i + "key", params);
tasks.add(callable);
}
复制代码
四、创建线程池
/**
* corePoolSize:核心线程数。
* maximumPoolSize:线程池允许创建的最大线程数。
* keepAliveTime:非核心线程闲置的超时时间。超过这个时间则回收。
* TimeUnit:keepAliveTime参数的时间单位。
* workQueue:任务队列。
* ThreadFactory:线程工厂,用于创建线程。
* RejectedExecutionHandler:饱和策略。
*
* ThreadPoolExecutor(int corePoolSize,
* int maximumPoolSize,
* long keepAliveTime,
* TimeUnit unit,
* BlockingQueue<Runnable> workQueue,
* ThreadFactory threadFactory,
* RejectedExecutionHandler handler)
*/
ExecutorService executorService = new ThreadPoolExecutor(5, 32,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
r -> {
Thread thread = new Thread(r);
// thread.setName("sdwfqin");
return thread;
}, new ThreadPoolExecutor.AbortPolicy());
复制代码
五、启动线程并获取返回数据
List<String> resultList = new ArrayList<>();
// 启动线程并获取返回数据
List<Future<String>> futures = executorService.invokeAll(tasks);
for (Future<String> future : futures) {
resultList.add(future.get());
}
复制代码
六、关闭线程池
executorService.shutdown();
复制代码
七、上述提及API相关解释
-
ExecutorService
-
基本方法
- submit: 提交的是
Callable
方法,返回Future
,说明submit是有返回值的 - execute: 执行的是
Runnable
方法,没有返回值 - invokeAny: 接收一个包含
Callable
对象的集合作为参数。调用该方法不会返回Future
对象,而是返回集合中某一个Callable
对象的结果,而且无法保证调用之后返回的结果是一个Callable
,只知道它是这些Callable
中一个执行结束的Callable
对象。 - invokeAll: 调用存在于参数集合中的所有
Callable
对象,并且返回一个包含Future对象的集合,可以通过这个返回的集合来管理每个Callable
的执行结果。 - shutdown: 平滑的关闭
ExecutorService
,当此方法被调用时,ExecutorService
停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
- submit: 提交的是
-
拒绝策略
- ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出
RejectedExecutionException
异常。 - ThreadPoolExecutor.DiscardPolicy: 也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列最前面的任务,执行后面的任务
- ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务
- ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出
-
阻塞队列
- ArrayBlockingQueue: 基于数组的先进先出队列,有界
- LinkedBlockingQueue: 基于链表的先进先出队列,无界
- SynchronousQueue: 无缓冲的等待队列,无界
- 什么是无界: 如果不指定容量,默认为
Integer.MAX_VALUE
-
常用线程池
- Executors.newCachedThreadPool(): 可缓存线程池
- Executors.newSingleThreadExecutor(): 单线程池
- Executors.newFixedThreadPool(3): 固定线程数线程池
- Executors.newScheduledThreadPool(5): 固定线程数,支持定时和周期性任务
- ThreadPoolExecutor(): 手动创建
-
-
Future
-
基本方法
- get: 当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕
- get(long timeout,TimeUnit unit): 最多等待timeout的时间就会返回结果
- cancel(boolean mayInterruptIfRunning): 可以用来停止一个任务,如果任务可以停止(通过
mayInterruptIfRunning
来进行判断),则可以返回true,如果任务已经完成或者已经停止,或者这个任务无法停止,则会返回false. - isDone(): 判断当前方法是否完成
- isCancel(): 判断当前方法是否取消
-