文章目录
一个采用“ThreadPoolExecutor+Future”的方案询价应用方案,如下:
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商S1询价
Future<Integer> f1 =
executor.submit(
()->getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 =
executor.submit(
()->getPriceByS2());
// 异步向电商S3询价
Future<Integer> f3 =
executor.submit(
()->getPriceByS3());
// 获取电商S1报价并异步保存
executor.execute(
()->save(f1.get()));
// 获取电商S2报价并异步保存
executor.execute(
()->save(f2.get())
// 获取电商S3报价并异步保存
executor.execute(
()->save(f3.get())
问题在于:如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上。
解决方法:增加一个阻塞队列,获取到S1、S2、S3的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。
// 创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
//电商S1报价异步进入阻塞队列
executor.execute(()->
bq.put(f1.get()));
//电商S2报价异步进入阻塞队列
executor.execute(()->
bq.put(f2.get()));
//电商S3报价异步进入阻塞队列
executor.execute(()->
bq.put(f3.get()));
//异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
1. 利用CompletionService实现询价系统
CompletionService的实现原理也是内部维护了一个阻塞队列,把任务执行结果的Future对象加入到阻塞队列中。
1.1 那到底该如何创建CompletionService呢?
CompletionService接口的实现类是ExecutorCompletionService,两个构造方法。
ExecutorCompletionService(Executor executor);
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
那么默认会使用无界的LinkedBlockingQueue。任务执行结果的Future对象就是加入到completionQueue中。
实现询价的代码如下:
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
1.2 CompletionService接口说明
Future<V> submit(Callable<V> task);
// V result传入的结果引用
Future<V> submit(Runnable task, V result);
// 以下三个都跟阻塞队列有关
// 如果队列为空,调用 take() 方法的线程会被阻塞
Future<V> take() throws InterruptedException;
// 如果队列为空,调用 poll() 方法的线程返回null
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
2. 利用CompletionService实现Dubbo中的Forking Cluster
Dubbo的Forking集群模式,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 用于保存Future对象
List<Future<Integer>> futures = new ArrayList<>(3);
//提交异步任务,并保存future到futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
// 获取最快返回的任务执行结果
Integer r = 0;
try {
// 只要有一个成功返回,则break
for (int i = 0; i < 3; ++i) {
r = cs.take().get();
//简单地通过判空来检查是否成功返回
if (r != null) {
break;
}
}
} finally {
//取消所有任务
for(Future<Integer> f : futures)
f.cancel(true);
}
// 返回结果
return r;
首先我们创建了一个线程池executor 、一个CompletionService对象cs和一个Future类型的列表 futures,每次通过调用CompletionService的submit()方法提交一个异步任务,会返回一个Future对象,我们把这些Future对象保存在列表futures中。通过调用 cs.take().get(),我们能够拿到最快返回的任务执行结果,只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了。