在spark中有异步多线程的需求,需要阻塞主线程,等所有子线程都执行完成后,主线程继续执行。
如果是用Thread,不太好实现,用Callable+FutrueTask结合线程池,可以快速实现:
final AtomicInteger messagesReceived = new AtomicInteger(0);
// ThreadedListenerAdapter is the class that I'm testing
// It's not germane to the question other than as a target for a thread pool.
final ThreadedListenerAdapter<Integer> adapter =
new ThreadedListenerAdapter<Integer>(listener);
int taskCount = 10;
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
for (int whichTask = 0; whichTask < taskCount; whichTask++) {
FutureTask<Integer> futureTask =
new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// Does useful work that affects messagesSent
return messagesSent;
}
});
taskList.add(futureTask);
}
for (FutureTask<Integer> task : taskList) {
LocalExecutorService.getExecutorService().submit(task);
}
for (FutureTask<Integer> task : taskList) {
int result = 0;
try {
// 关键是这里
result = task.get();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex) {
throw new RuntimeException("ExecutionException in task " + task, ex);
}
assertEquals(maxMessages, result);
}
int messagesSent = taskCount * maxMessages;
assertEquals(messagesSent, messagesReceived.intValue());