如果我们有以下需求:
- 从数据库的User表获取所有用户的id
- 根据用户的id查询所有用户相关联的信息
- 获取信息后,处理信息
- 等待所有用户的信息处理完成后,完成下一步动作
- 在处理过程中主线程无需等待整个信息的处理完成
通过Future来处理:
public class CompletableFutureTest {
public static final String USER_MSG_FORMAT = "用户信息%d";
public static final String USER_MSG_START_FORMAT = "正在获取用户信息%d的信息";
public static final String USER_MSG_END_FORMAT = "获取结束";
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws Exception {
Future<List<Integer>> userIdListFuture = executor.submit(CompletableFutureTest::getUserIdList);
mainThreadDo();
List<Integer> userIdList = userIdListFuture.get();
List<Future<String>> userMsg = new ArrayList<>();
userIdList.forEach(userId -> userMsg.add(executor.submit(() -> getUserMsg(userId))));
mainThreadDo();
for (Future<String> res : userMsg) {
System.out.println(Thread.currentThread().getName() + " " + res.get());
}
mainThreadDo();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
System.out.println(Thread.currentThread().getName() + " 业务处理完毕");
}
public static void mainThreadDo() {
System.out.println(Thread.currentThread().getName() + " 主线程开始执行别的业务逻辑");
sleep();
System.out.println(Thread.currentThread().getName() + " 主线程结束执行别的业务逻辑");
}
public static String getUserMsg(Integer userId) {
System.out.println(Thread.currentThread().getName() + " " + String.format(USER_MSG_START_FORMAT, userId));
sleep();
System.out.println(Thread.currentThread().getName() + " " + USER_MSG_END_FORMAT);
return String.format(USER_MSG_FORMAT, userId);
}
public static List<Integer> getUserIdList() {
return Lists.newArrayList(1, 2, 3, 4, 5);
}
//模拟数据库延时操作
public static void sleep() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
打印结果:
main 主线程开始执行别的业务逻辑
main 主线程结束执行别的业务逻辑
main 主线程开始执行别的业务逻辑
pool-1-thread-4 正在获取用户信息3的信息
pool-1-thread-3 正在获取用户信息2的信息
pool-1-thread-2 正在获取用户信息1的信息
pool-1-thread-5 正在获取用户信息4的信息
pool-1-thread-6 正在获取用户信息5的信息
main 主线程结束执行别的业务逻辑
pool-1-thread-4 获取结束
pool-1-thread-5 获取结束
pool-1-thread-6 获取结束
pool-1-thread-3 获取结束
pool-1-thread-2 获取结束
main 用户信息1
main 用户信息2
main 用户信息3
main 用户信息4
main 用户信息5
main 主线程开始执行别的业务逻辑
main 主线程结束执行别的业务逻辑
main 业务处理完毕
复制代码
通过 CompletableFuture 来处理:
public static void main(String[] args) throws Exception {
System.out.println(Thread.currentThread().getName() + " 业务开始完毕");
List<CompletableFuture<Void>> completableFutureList = CompletableFuture.supplyAsync(CompletableFutureTest2::getUserIdList)//将getUserIdList封装成Callable放入ForkJoinPool中执行
.get()//等待执行结果
.stream()
.map(userId -> CompletableFuture.supplyAsync(() -> getUserMsg(userId)).thenAccept(System.out::println))//将stream List<UserId> 转换为List<CompletableFuture>
.collect(Collectors.toList());
mainThreadDo();
//将所有的List<CompletableFuture>包装成一个 CompletableFuture
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));
completableFuture.thenRun(() -> mainThreadDo());//将封装的一个CompletableFuture安装一个执行结束后的回调动作mainThreadDo()
completableFuture.get();//等待completableFuture执行完成
System.out.println(Thread.currentThread().getName() + " 业务处理完毕");
}
复制代码
CompletableFuture执行回调示例图