/**
* 并发请求接口 将数据整理到一起处理
*/
@SuppressWarnings("unchecked")
private void parallelReqBigData(SegmentUVArgs segmentUVArgs, Map<Integer, Tag> tagMap, AreaTag areaTag) {
List<SegmentTagsReply> list = Collections.synchronizedList(new ArrayList<>(16));
List<CompletableFuture<Void>> cfList = new ArrayList<>();
//存储 请求uv的uv值
Map<Integer, Integer> mapping = new HashMap<>();
// 单独处理请求uv
CompletableFuture<Void> cfUv =
CompletableFuture.runAsync(() -> areaTag.setUv(DppUtils.processErrorAndReturnData(bdsClient.execute(segmentUVArgs)).getUv()), executorService);
cfList.add(cfUv);
Arrays.stream(TagCategoryEnum.values()).forEach(type -> {
//异步请求
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
SegmentTagsArgs segmentTagsArgs = new SegmentTagsArgs();
BeanUtils.copyProperties(segmentUVArgs, segmentTagsArgs);
segmentTagsArgs.setTagCategory(type.getCode());
SegmentTagsReply segmentTagsReply = DppUtils.processErrorAndReturnData(bdsClient.execute(segmentTagsArgs));
list.add(segmentTagsReply);
}, executorService);
cfList.add(cf);
});
CompletableFuture<Void>[] completableFutures = cfList.toArray(new CompletableFuture[0]);
//异步请求完成
CompletableFuture<Void> c4 =
CompletableFuture.allOf(completableFutures).whenComplete((ret, err) -> {
if (err != null) {
log.error("failed to select to big data", err);
throw new OpenApiException(ErrorCode.COMMON_ERROR, ImmutableMap.of("err", err.toString()));
} else {
for (SegmentTagsReply segmentTagsReply : list) {
if (CollectionUtil.isNotEmpty(segmentTagsReply.getResidenceList())) {
areaTag.setResidenceList(segmentTagsReply.getResidenceList());
}
if (CollectionUtil.isNotEmpty(segmentTagsReply.getShoppingMallList())) {
areaTag.setShoppingMallList(segmentTagsReply.getShoppingMallList());
}
if (CollectionUtil.isNotEmpty(segmentTagsReply.getWorkPlaceList())) {
areaTag.setWorkPlaceList(segmentTagsReply.getWorkPlaceList());
}
mapping.putAll(segmentTagsReply.getTagCountMap());
mapping.forEach((k, v) -> {
Tag tag = tagMap.get(k);
if (null != tag) {
tag.setCount((long) v);
}
});
}
}
});
// join()和get()方法都是阻塞调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。
c4.join();
}
java8 CompletableFuture使用示例
猜你喜欢
转载自blog.csdn.net/songkai558919/article/details/122046085
今日推荐
周排行