前提:
日常开发,有很多场景会使用到多线程,比如,我们解析Excel,如果解析出一个3万条数据的Excel表格,需要两部:
1.我们需要先异步解析出所有的数据,前面写过了如何异步处理任务(Spring Boot---(4)SpringBoot异步处理任务);
2.然后再多线程去处理业务或者插入到数据库;
这里,讲解一下,如何使用多线程,如何调用回调函数。
1、ThreadPoolExecutor使用
我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
public ThreadPoolExecutor(int corePoolSize, // 1
int maximumPoolSize, // 2
long keepAliveTime, // 3
TimeUnit unit, // 4
BlockingQueue<Runnable> workQueue, // 5
ThreadFactory threadFactory, // 6
RejectedExecutionHandler handler ) { //7
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
如果对这些参数作用有疑惑的请看 ThreadPoolExecutor概述。
2、引入pom.xml的依赖
由于后面要用到Guava包下的东西,所以这里先引入一下。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
maven地址:https://mvnrepository.com/artifact/com.google.guava/guava
扫描二维码关注公众号,回复:
9832191 查看本文章
示例:
public static void main(String[] args) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// 执行任务
final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
System.out.println("新任务。。。");
TimeUnit.SECONDS.sleep(1);
return 7;
}
});
// 任务完成回掉函数
final FutureCallback<Integer> futureCallback = new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out.println("任务执行成功,对任务进行操作。");
}
@Override
public void onFailure(Throwable t) {
System.out.println("任务执行失败。");
}
};
// 绑定任务以及回调函数
Futures.addCallback(listenableFuture, futureCallback);
}
2、controller
这里模拟10万条数据,然后多线程处理。
package com.jd.concurrent;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* author:lightClouds917
* date:2018/1/22
* description:模拟多线程处理
*/
@RestController
@RequestMapping("con")
public class ConController {
private final static Logger logger = LoggerFactory.getLogger(Logger.class);
// 线程池
// ExecutorService executorService = Executors.newFixedThreadPool(8);
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 500,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 使用Guava的ListeningExecutorService装饰线程池
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
@RequestMapping(value = "test1", method = RequestMethod.GET)
public String test1() {
try {
//10万条数据
List<String> list = new ArrayList<>();
List<String> list2 = new ArrayList<>();
for (int i = 1; i <= 100000; i++) {
list.add("test:" + i);
}
//每条线程处理的数据尺寸
int size = 250;
int count = list.size() / size; //根据size得出线程数量
if (count * size != list.size()) {
count++; //如果已有线程要处理数据的总数,不等于list数据集合的总数,线程+1
}
int countNum = 0;//统计处理数据
final CountDownLatch countDownLatch = new CountDownLatch(count);//线程计数器
while (countNum < list.size()) {
countNum += size;
//创建一个对象,此对象继承Callable,下面会有源代码
ConCallable callable = new ConCallable();
//截取list的数据,分给不同线程处理
/*这段代码写的很好,我喜欢这段,根据集合的下标,形成多线程,每个线程处理固定的数量,当最后一个线程要处理的数据大于总数的时候,
则从上一个线程处理的末尾,到数据总数。真正意义上的多线程,本来多线程这块儿我是写死的,手动分配几个线程,代码效率低;
这段儿代码,根据size可以随时对线程调优,仅需修改size,即可找到适合自己业务的线程数。*/
callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
//执行线程
ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
//异步回调操作,原作者仅仅是展示下如何使用回调,
/*如果有这种需求:多个线程执行,
都执行完毕,进行回调,则需要调用Futures.allAsList(futureList),多线程同时回调的代码会在文章末尾 单独贴出来。*/
Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> list1) {
countDownLatch.countDown();//计数器-1
list2.addAll(list1);//将线程执行结果放入结果集
}
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
logger.info("处理出错:", throwable);
}
});
}
//主线程阻塞,我直接这么用的countDownLatch.await();
// 原作者这个应该是个超时策略,超过这个时间的线程,直接舍弃。
countDownLatch.await(30, TimeUnit.MINUTES);
logger.info("符合条件的返回数据个数为:" + list2.size());
logger.info("回调函数:" + list2.toString());
} catch (Exception ex) {
ex.printStackTrace();
}
return "正在处理......";
}
}
例子2:
package
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
* The unit test for ListenableFuture/CompletableFuture.
* Created by yiqun01.lin
* on 2018/5/3.
*/
public class TestFutures {
//线程池中线程个数
private static final int POOL_SIZE = 50;
//带有回调机制的线程池
private static final ListeningExecutorService service = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));
private static Logger LOG = LoggerFactory.getLogger(TestFutures.class);
@Test
public void testListenableFuture() {
final List<String> value = Collections
.synchronizedList(new ArrayList<String>());
try {
List<ListenableFuture<String>> futures = new ArrayList<ListenableFuture<String>>();
// 将实现了callable的任务放入到线程池中,得到一个带有回调机制的ListenableFuture实例,
// 通过Futures.addCallback方法对得到的ListenableFuture实例进行监听,一旦得到结果就进入到onSuccess方法中,
// 在onSuccess方法中将查询的结果存入到集合中
for (int i = 0; i < 1; i++) {
final int index = i;
if (i == 9) {
Thread.sleep(500 * i);
}
ListenableFuture<String> sfuture = service
.submit(new Callable<String>() {
@Override
public String call() throws Exception {
long time = System.currentTimeMillis();
LOG.info("Finishing sleeping task{}: {}", index, time);
return String.valueOf(time);
}
});
sfuture.addListener(new Runnable() {
@Override
public void run() {
LOG.info("Listener be triggered for task{}.", index);
}
}, service);
Futures.addCallback(sfuture, new FutureCallback<String>() {
public void onSuccess(String result) {
LOG.info("Add result value into value list {}.", result);
value.add(result);
}
public void onFailure(Throwable t) {
LOG.info("Add result value into value list error.", t);
throw new RuntimeException(t);
}
});
// 将每一次查询得到的ListenableFuture放入到集合中
futures.add(sfuture);
}
// 这里将集合中的若干ListenableFuture形成一个新的ListenableFuture
// 目的是为了异步阻塞,直到所有的ListenableFuture都得到结果才继续当前线程
// 这里的时间取的是所有任务中用时最长的一个
ListenableFuture<List<String>> allAsList = Futures.allAsList(futures);
allAsList.get();
LOG.info("All sub-task are finished.");
} catch (Exception ignored) {
}
}
@Test
public void testCompletableFuture() throws Exception {
...
}
}
3、线程任务处理类
package com.jd.concurrent;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.List;
/**
* author:lightClouds917
* date:2018/1/22
* description:业务处理
*/
public class ConCallable implements Callable {
private List<String> list;
@Override
public Object call() throws Exception {
List<String> listRe = new ArrayList<>();
for(int i = 0;i < list.size();i++){
//含有‘4599’的字符串都返回
if(list.get(i).contains("4599")){
listRe.add(list.get(i));
}
}
return listRe;
}
public void setList(List<String> list) {
this.list = list;
}
}
4、返回结果:
符合条件的返回数据个数为:20
回调函数:[test:4599, test:14599, test:24599, test:34599, test:44599, test:45990, test:45991, test:45992, test:45993, test:45994, test:45995, test:45996, test:45997, test:45998, test:45999, test:54599, test:64599, test:74599, test:84599, test:94599]
5、Futures.allAsList()
如果希望的是多线程执行完毕,进行统一回调,为了达到这个目的,对代码进行了如下改造:需要调用Futures.allAsList()
//在循环外创建一个list数组,用于存放线程
List futureList = new ArrayList<ListenableFuture>();
while (countNum < list.size()) {
countNum += size;
//创建一个对象,此对象继承Callable
ConCallable callable = new ConCallable();
//截取list的数据,分给不同线程处理
callable.setList(ImmutableList.copyOf(list.subList(countNum - size, countNum < list.size() ? countNum : list.size())));
//执行线程
ListenableFuture listenableFuture = listeningExecutorService.submit(callable);
//将子线程添加至线程集合
futureList.add(listenableFuture);
}
/*都执行完毕,进行回调,则需要调用Futures.allAsList(futureList),多线程同时回调.
* 由于是所有的线程一起回调,线程的返回结果自动存放在一个list中,
* 因此需要将上面的List<String> 改为:List<List<String>>*/
Futures.allAsList(listenableFuture, new FutureCallback<List<List<String>>>() {
@Override
public void onSuccess(List<List<String>> list1) {
for (List<String> list : list1) {
countDownLatch.countDown();//计数器-1
list2.addAll(list);//将线程执行结果放入结果集
}
}
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
logger.info("处理出错:", throwable);
}
});
//主线程阻塞
countDownLatch.await();
System.out.println("OK!");
例子2:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class ListenableTest {
public static void main(String[] args) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
final List<Long> value = new ArrayList<Long>();
List<ListenableFuture<Long>> futures = new ArrayList<ListenableFuture<Long>>();
for(long i=1;i<=3;i++){
// 处理线程逻辑
final ListenableFuture<Long> listenableFuture = executorService.submit(new AddCallable(1000000*(i-1)+1,i*1000000));
// 回调方法
Futures.addCallback(listenableFuture, new FutureCallback<Long>() {
@Override
public void onSuccess(Long result) {
value.add(result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
futures.add(listenableFuture);
}
// 阻塞三个线程执行完成
Futures.allAsList(futures);
long result = 0 ;
for(int i=0,n=value.size();i<n;i++){
result += value.get(i);
}
System.out.println("sum:"+result);
executorService.shutdownNow();
}
}
/**
* 累加线程
* @author
* @time
*/
class AddCallable implements Callable<Long>{
private long begin ;
private long end ;
public AddCallable(long begin,long end){
this.begin = begin ;
this.end = end ;
}
@Override
public Long call() throws Exception {
long result = 0;
for(long i=begin;i<=end;i++){
result += i;
}
return result ;
}
}
大功告成!继续码代码!
参考:
https://blog.csdn.net/weixin_39800144/article/details/79133974
https://blog.csdn.net/weixin_39800144/article/details/79046237