线程池
单独操作线程时的一些问题:
- availability:线程数量是受限的,如果线程过多会OOM
- cost:如果任务过多,超过线程数,需要在一批任务完成之后再创建一批任务,耗时费力
为其,引入了线程池。
首先,会初始化创建的线程数,当一个线程池内的任务完成之后,会从左边的队列中取任务再进行执行。
一个Demo
import java.util.concurrent.Executors;
public class ExecutorsDemo {
public static void show() {
var threadPool = Executors.newFixedThreadPool(3);
try {
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> System.out.println(Thread.currentThread().getName()));
}
} finally {
threadPool.shutdown();
}
}
}
这里将线程池执行的任务放在了try块中,关闭线程池放在了finally中,防止出现异常后无法及时关闭线程池。
Callable、Future
阻塞线程
public class ExecutorsDemo {
public static void show() {
var threadPool = Executors.newFixedThreadPool(3);
var start = System.currentTimeMillis();
try {
for (int i = 0; i < 10; i++) {
int finalI = i;
var submit = threadPool.submit(
() -> {
LongTask.simulate();
return finalI;
}
);
System.out.println("Done work" + i);
System.out.println(submit.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
var end = System.currentTimeMillis();
threadPool.shutdown();
System.out.println("耗时:" + (end - start));
}
}
}
使用CompletableFuture进行异步程序的创建
public class CompletableFuturesDemo {
public static void show(){
Runnable runnable = ()-> System.out.println("runnable");
Supplier<Integer> supplier=()->1;
CompletableFuture.runAsync(runnable);
var future = CompletableFuture.supplyAsync(supplier);
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
一个异步发送邮件的例子
MailService.java
public class MailService {
public void send(){
LongTask.simulate();
System.out.println("邮件被发送了");
}
public CompletableFuture<Void> sendAsync(){
return CompletableFuture.runAsync(()->send());
}
}
Main.java
public static void main(String[] args) {
var mailService = new MailService();
mailService.sendAsync();
System.out.println("Main进程停止");
try {
//如果不加这个,会直接退出主函数,需要让其等待一会儿
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
如何在Async task下继续工作
以一个异步获取歌单的demo为例(主要是thenCompose的使用)
public class CompletableFuturesDemo {
//user->email
private static CompletableFuture<String> getUserEmail() {
return CompletableFuture.supplyAsync(() -> "email");
}
//email->playlist
private static CompletableFuture<String> getPlayList(String email){
return CompletableFuture.supplyAsync(()-> "playlist");
}
public static void show() {
//一个读取歌单的操作
getUserEmail()
.thenCompose(CompletableFuturesDemo::getPlayList)
.thenAccept(System.out::println);
}
}
一个计算商品价格的例子(主要以thenCombine为例)
public class CompletableFuturesDemo {
public static void show() {
var first = CompletableFuture.supplyAsync(() -> "¥20")
.thenApply(s -> {
var price = s.replace("¥", "");
return Integer.parseInt(price);
});
var second = CompletableFuture.supplyAsync(() -> 0.9);
first
.thenCombine(second, (price, exchangeRate) -> price * exchangeRate)
.thenAccept(System.out::println);
}
}
首先进行价格标签中的原价提取,之后和折扣做运算。
还有allOf,anyOf,orTimeout,completeOrTimeout分别处理多个任务和超时,这里不赘述
项目:价格搜索器
定义查询对象
public class Quote {
private final String site;
private final int price;
public Quote(String site, int price) {
this.site = site;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"site='" + site + '\'' +
", price=" + price +
'}';
}
}
定义异步查询方法
public class FlightService {
/*创建一个异步的查询飞机票价的服务*/
public CompletableFuture<Quote> getQuote(String site) {
return CompletableFuture.supplyAsync(() -> {
System.out.println(site + "初始化...");
var random = new Random();
LongTask.simulate(1000+random.nextInt(2000)); //创建延迟在1~3s
return new Quote(site, 100 + random.nextInt(10));//价格初始化在100~110
}
);
}
public List<CompletableFuture<Quote>> getQuotes() {
var sites = List.of("site01", "site02", "site03");
return sites.stream()
.map(this::getQuote)
.collect(Collectors.toList());
}
}
进行异步查询
public class CompletableFuturesDemo {
public static void show() {
var flightService = new FlightService();
//单个站点访问
// flightService.getQuote("site01")
// .thenAccept(System.out::println);
//多个站点访问
var futureList = flightService.getQuotes().stream()
.map(future -> future.thenAccept(System.out::println))
.collect(Collectors.toList());
//如果要加入总的时间
var start = LocalTime.now();
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenRun(() -> {
var end = LocalTime.now();
System.out.println("共计耗时:" + Duration.between(start, end).toMillis() + "ms");
});
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
期间对 LongTask进行了重构。