版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zl_momomo/article/details/81871600
介绍
从Runnable开始
Runnable
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Runnable接口封装一个异步的任务,但缺点是,方法不能返回任务执行结果以及向外抛出异常,而Callable接口解决了这两个问题,可以使用Callable封装异步任务代替Runnable
Callable
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Thread构造函数中并没有类型为Callable的参数,这里我们引入FutureTask类,它实现了Runnable接口和Future接口。我们可以使用FutureTask另起线程执行异步任务。
FutureTask task = new FutureTask(...);
Thread thread = new Thread(task);
thread.start();
观察FutureTask构造函数,传递包装异步任务的接口Callable或Runnable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
前面提到,我们能够获取异步任务执行结果,类比js中的异步任务回调函数。
使用get方法获得计算结果
if(task.isDone()){
Object obj = task.get(); //会阻塞当前线程,直到获取任务结果
}
关于任务指定相关的操作封装在Future接口中
public interface Future<V> {
//试图取消对此任务的执行
boolean cancel(boolean mayInterruptIfRunning);
//如果在任务正常完成前将其取消,则返回 true。
boolean isCancelled();
//如果任务已完成,则返回 true。
boolean isDone();
//如有必要,等待计算完成,然后获取其结果。
V get() throws InterruptedException, ExecutionException;
//如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
实例
任务:实现一个并发可伸缩的缓存管理器
原理:将FutureTask保存为Map的value,而不是计算结果保存为value,因为当多个线程并发执行某一方法时,当某一线程开销了大量时间,而其他线程不知道时,可能会重复进行计算,无法到达缓存器效果。
public class Memorizer<A,V>{
private Map<A,Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> c;
public Memorizer(Computable<A, V> c) {
this.c = c;
}
public V compute(A arg) throws InterruptedException {
while(true){
Future<V> f = cache.get(arg);
if(f == null){
FutureTask<V> ft = new FutureTask<V>(()->{
return c.compute(arg);
});
f = cache.putIfAbsent(arg,ft);
if(f == null){
f = ft;
ft.run();
}
}
try {
return f.get();
} catch(CancellationException e){
cache.remove(arg,f);
} catch(ExecutionException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception{
ExpensiveFunction function = new ExpensiveFunction();
Memorizer<String,BigInteger> m = new Memorizer<>(function);
ExecutorService pool = Executors.newCachedThreadPool();
//开10个线程去执行
for(int i =0;i<10;i++){
Future<BigInteger> future = pool.submit(()->{
return m.compute("100");
});
}
pool.shutdown();
}
}
public class ExpensiveFunction implements Computable<String, BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
System.out.println("正在进行计算....");
// 在经过长时向的计算后
return new BigInteger(arg);
}
}
实例参考java并发编程实战(5.6小节)
小结
我们将每个线程运行的结果保存在FutureTask对象中,通过FutureTask暴露的接口来操作任务完成状态,因此我们在主线程中有了对子线程的控制权。