java多线程处理 CompletableFuture异步编排(二)
一、线程池(无返回)
直接使用new Thread 的方式在代码中新建线程,虽然方便,但却不能对计算机资源进行合理管理,在高并发的情况下就容易出现问题。因此引入线程池十分有必要。
- 代码
为了测试效果,新建定长线程池,传入核心线程数为2,超出的线程会在队列中等待。
这种线程池可以控制计算机资源,防止线程资源耗尽,实际使用较多。
ExecutorService pool= Executors.newFixedThreadPool(2);
在for循环中新建10个线程,每个睡五秒,然后用 submit 方法提交到线程池。
package com.springboot;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
System.out.println("主线程开始。。。");
ExecutorService pool= Executors.newFixedThreadPool(2);
for(int i=0;i<10;i++) {
Thread thread=new Thread(()-> {
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("当前线程结束"+Thread.currentThread());
});
pool.submit(thread);
}
System.out.println("主线程结束。。。");
}
}
- 运行结果:
主线程开始。。。
主线程结束。。。
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程开始Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
当前线程结束Thread[pool-1-thread-2,5,main]
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程开始Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程结束Thread[pool-1-thread-2,5,main]
当前线程开始Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
当前线程结束Thread[pool-1-thread-2,5,main]
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程开始Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程结束Thread[pool-1-thread-2,5,main]
当前线程开始Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
当前线程结束Thread[pool-1-thread-2,5,main]
由运行结果发现它一次只开两个线程,执行完了才会开新的线程,这就达到了管理计算机资源的目的。
二、线程池(有返回)
- 代码
package com.springboot;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("主线程开始。。。");
ExecutorService pool= Executors.newFixedThreadPool(2);
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool);
System.out.println("主线程结束。。。");
System.out.println("返回值:"+future.get());
}
}
- 运行结果
线程结束后可以用get方法获取返回值。future.get()是一个阻塞方法,在线程结束后才会得到值。
System.out.println("主线程结束。。。返回值:"+future.get());
主线程开始。。。
主线程结束。。。
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
返回值:10
三、线程池扩展
当人为制造一个错误,如在线程中做了个计算题
int j=10/0;
代码如下:
package com.springboot;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) {
System.out.println("主线程开始。。。");
ExecutorService pool= Executors.newFixedThreadPool(2);
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10/0;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool) ;
System.out.println("主线程结束。。。");
}
}
运行结果:
主线程开始。。。
主线程结束。。。
当前线程开始Thread[pool-1-thread-1,5,main]
由运行结果发现线程只有开始没有结束,却没有抛出异常,这显然不是我们想要的。
当我们用get() 方法获取运行结果时这个异常才出来了
System.out.println("返回值:"+future.get());
异常:
主线程开始。。。
主线程结束。。。
当前线程开始Thread[pool-1-thread-1,5,main]
Exception in thread “main” java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at com.springboot.Test3.main(Test3.java:72)
Caused by: java.lang.ArithmeticException: / by zero
at com.springboot.Test3.lambda AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
除此以外,我们可以也可以用whenComplete方法获取线程运行结果和异常信息
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10/0;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool).whenComplete((r,e)->{
System.out.println("结果是"+r);
System.out.println("异常是"+e);
});
这个时候的运行结果:
主线程开始。。。
当前线程开始Thread[pool-1-thread-1,5,main]
主线程结束。。。
结果是null
异常是java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
可以在控制台看到异常已经被打印出来了。
有时候我们需要用上一个线程的运行结果继续进行下一步操作,我们可以用thenApply方法。
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool).thenApply((r)->{
System.out.println("上一步的结果是"+r);
return r;
});
运行结果:
主线程开始。。。
当前线程开始Thread[pool-1-thread-1,5,main]
主线程结束。。。
当前线程结束Thread[pool-1-thread-1,5,main]
上一步的结果是10
同时,这个方法还可以返回值,如我返回 r+1。
package com.springboot;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("主线程开始。。。");
ExecutorService pool= Executors.newFixedThreadPool(2);
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool).thenApply((r)->{
System.out.println("上一步的结果是"+r);
return r+1;
});
System.out.println("主线程结束。。。");
System.out.println("主线程结束。。。返回值:"+future.get());
}
}
运行结果:
主线程开始。。。
当前线程开始Thread[pool-1-thread-1,5,main]
主线程结束。。。
当前线程结束Thread[pool-1-thread-1,5,main]
上一步的结果是10
主线程结束。。。返回值:11
那么我们是否可以写多个thenApply,来实现对上一步运行结果的使用。答案是肯定的。
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool).thenApply((r)->{
System.out.println("上一步的结果是"+r);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return r+1;
}).thenApply((r)->{
System.out.println("结果是"+r);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return r+1;
});
运行结果:
主线程开始。。。
当前线程开始Thread[pool-1-thread-1,5,main]
主线程结束。。。
当前线程结束Thread[pool-1-thread-1,5,main]
上一步的结果是10
结果是11
返回值:12
既然如此,我们当然还可以写出一些花来。
package com.springboot;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test3 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("主线程开始。。。");
ExecutorService pool= Executors.newFixedThreadPool(2);
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int j=10;
System.out.println("当前线程结束"+Thread.currentThread());
return j;
},pool).thenApply((r)->{
System.out.println("上一步的结果是"+r);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return r+1;
}).whenComplete((r,e)->{
System.out.println("结果是"+r);
System.out.println("异常是"+e);
});
System.out.println("主线程结束。。。");
System.out.println("返回值:"+future.get());
}
}
运行结果:
主线程开始。。。
当前线程开始Thread[pool-1-thread-1,5,main]
主线程结束。。。
当前线程结束Thread[pool-1-thread-1,5,main]
上一步的结果是10
结果是11
异常是null
返回值:11
观察程序运行顺序,发现它们确实是异步的。这样看起来果然比较有逼格。
除了thenApply 和 whenComplete 两个列子外,其他的方法也还有许多,总有一款适合你。
下面模拟一段简单的业务代码。
比如说在淘宝买一条哈士奇,现在需要查询该订单的详情,比如就包括商品信息和物流信息吧。
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("主线程开始。。。");
ExecutorService pool= Executors.newFixedThreadPool(2);
CompletableFuture<String> future1=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("当前线程结束"+Thread.currentThread());
return "哈士奇的商品信息";
},pool);
CompletableFuture<String> future2=CompletableFuture.supplyAsync(()->{
System.out.println("当前线程开始"+Thread.currentThread());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("当前线程结束"+Thread.currentThread());
return "哈士奇的物流信息";
},pool);
Void void1=CompletableFuture.allOf(future1,future2).get();
System.out.println("主线程结束。。。");
System.out.println("返回查询详细结果。。。");
}
运行结果:
主线程开始。。。
当前线程开始Thread[pool-1-thread-1,5,main]
当前线程开始Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-2,5,main]
当前线程结束Thread[pool-1-thread-1,5,main]
主线程结束。。。
返回查询详细结果。。。
线程后面的这一句
Void void1=CompletableFuture.allOf(future1,future2).get();
这个 get()方法只相当于一个阻塞。目的是当所有线程都运算完时一起返回查询信息。