一、FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool 简单使用(不推荐)
public void fixedThreadPool1(){
int size = 5;
ExecutorService threadPool = Executors.newFixedThreadPool(size);
// ExecutorService threadPool = Executors.newCachedThreadPool(); //缓存,同上
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单线程
// ExecutorService threadPool = Executors.newScheduledThreadPool(size); //
while (size>0) {
threadPool.execute(new MyRunnable());
size--;
}
threadPool.shutdown();
}
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"运行了");
}
}
不推荐原因:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
二、ExecutorService、ThreadPoolExecutor实例化用法(阿里巴巴规范)【推荐】
原理:使用第三方线程工厂,com.google.guava包
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0.1-jre</version>
</dependency>
public void alibabaThreadPool(){
//线程池核心池的大小
int corePoolSize = 5;
//线程池的最大线程数
int maximumPoolSize = 200;
//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
long keepAliveTime = 0L;
//keepAliveTime 的时间单位
TimeUnit unit = TimeUnit.MILLISECONDS;
//用来储存等待执行任务的队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(1024);
//线程工厂,使用com.google.guava包
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build();
//拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
//这里为了备注各参数用途,就没有像下面优雅编写
/*ExecutorService threadPool = new ThreadPoolExecutor(5,200,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());*/
while (corePoolSize>0) {
threadPool.execute(new MyRunnable());
corePoolSize--;
}
threadPool.shutdown();
}
三、Future、Callable有返回值,get()阻塞串行
原理:Future是ExecutorService 调用submit()返回的结果集(泛型),实现Callable接口,支持线程执行完返回结果。
/***
* 有返回,串行
* @throws Exception
*/
public void alibabaThreadPool2() throws Exception{
int corePoolSize = 5;
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize,200,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());
while (corePoolSize>0) {
try {
Future future = threadPool.submit(new MyCallable());
Map<String, String> map = (Map<String, String>) future.get(); //会阻塞(串行)
System.out.println("返回:" + map.get("back"));
System.out.println("----------------华丽的分割线-------------------");
}catch (InterruptedException e){
e.printStackTrace();
}catch (ExecutionException e){
e.printStackTrace();
}
corePoolSize--;
}
threadPool.shutdown();
}
class MyCallable implements Callable<Object>{
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName()+"开始时间:" +
DateFormat.getTimeInstance().format(new Date()));
int rand = new Random().nextInt(5)*1000;
System.out.println(Thread.currentThread().getName()+"睡眠"+rand);
Thread.sleep(rand);
Map<String,String> map = new HashMap<>(16);
map.put("back",Thread.currentThread().getName()+"成功了");
System.out.println(Thread.currentThread().getName()+"结束时间:" +
DateFormat.getTimeInstance().format(new Date()));
return map;
}
}
四、Future、Callable支持返回,并行
/***
* 有返回,并行
* @throws Exception
*/
public void alibabaThreadPool3() throws Exception{
//线程池核心池的大小
int corePoolSize = 5;
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize,200,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());
List<Future> list = new ArrayList<>();
while (corePoolSize>0) {
Future future = threadPool.submit(new MyCallable());
list.add(future);
corePoolSize--;
}
for(Future future : list){
Map<String, String> map = (Map<String, String>) future.get();
System.out.println("返回:" + map.get("back"));
}
threadPool.shutdown();
}
五、CyclicBarrier、Runnable同时并行
原理:Runnable实现类添加CyclicBarrier实例做成员变量,实例化是导入。run()方法前置调用cyclicBarrier.await() 等待所有线程准备就绪再同时执行。
/***
* CyclicBarrier同时并行,一般用来测试并发
* @throws Exception
*/
public void alibabaThreadPool4() throws Exception{
//线程池核心池的大小
int corePoolSize = 100;
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize,200,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());
//注意cyclicBarrier对象不能在循环里面多次new,线程会挂,同时平时开发可以共享的尽量不要在循环里去new,影响性能
CyclicBarrier cyclicBarrier = new CyclicBarrier(corePoolSize);
for (int i = 0; i < corePoolSize; i++) {
threadPool.execute(new MyRunnable2(cyclicBarrier));
}
threadPool.shutdown();
}
class MyRunnable2 implements Runnable {
private CyclicBarrier cyclicBarrier;
public MyRunnable2(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
// 等待所有任务准备就绪
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+"开始时间:" +
DateFormat.getTimeInstance().format(new Date()));
int rand = new Random().nextInt(5)*1000;
System.out.println(Thread.currentThread().getName()+"睡眠"+rand);
Thread.sleep(rand);
// 测试内容
System.out.println(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName()+"结束时间:" +
DateFormat.getTimeInstance().format(new Date()));
} catch (Exception e) {
e.printStackTrace();
}
}
}
五、CyclicBarrier、Callable同时并行,支持返回
原理:Callable实现类添加CyclicBarrier实例做成员变量,实例化是导入。call()方法前置调用cyclicBarrier.await() 等待所有线程准备就绪再同时执行。
/***
* 有返回,CyclicBarrier同时并行,一般用来测试并发
* @throws Exception
*/
public void alibabaThreadPool5() throws Exception{
//线程池核心池的大小
int corePoolSize = 5;
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize,200,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().setNameFormat("my-pool-%d").build(),new ThreadPoolExecutor.AbortPolicy());
List<Future> list = new ArrayList<>();
//注意cyclicBarrier对象不能在循环里面多次new,线程会挂,同时平时开发可以共享的尽量不要在循环里去new,影响性能
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
while (corePoolSize>0) {
System.out.println(corePoolSize+"开始:" +
DateFormat.getTimeInstance().format(new Date()));
Future future = threadPool.submit(new MyCallable2(cyclicBarrier));
list.add(future);
System.out.println(corePoolSize+"结束:" +
DateFormat.getTimeInstance().format(new Date()));
corePoolSize--;
}
for(Future future : list){
Map<String, String> map = (Map<String, String>) future.get();
System.out.println("返回:" + map.get("back"));
}
threadPool.shutdown();
}
class MyCallable2 implements Callable<Object>{
private CyclicBarrier cyclicBarrier;
public MyCallable2(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
@Override
public Object call() throws Exception {
this.cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+"开始时间:" +
DateFormat.getTimeInstance().format(new Date()));
int rand = new Random().nextInt(5)*1000;
System.out.println(Thread.currentThread().getName()+"睡眠"+rand);
Thread.sleep(rand);
Map<String,String> map = new HashMap<>(16);
map.put("back",Thread.currentThread().getName()+"成功了");
System.out.println(Thread.currentThread().getName()+"结束时间:" +
DateFormat.getTimeInstance().format(new Date()));
return map;
}
}
六、ScheduledExecutorService、ScheduledThreadPoolExecutor 支持时间调度,定时任务
原理:使用第三方线程工厂,commons-lang3包
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
public void scheduledThreadPool(){
//线程池核心池的大小
int corePoolSize = 2;
//线程工厂,使用commons-lang3包
// ThreadFactory threadFactory1 = new BasicThreadFactory.Builder().namingPattern("my-schedule-pool-%d").daemon(true).build(); //设置守护进程,调度有问题
ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("my-schedule-pool-%d").build();
ScheduledExecutorService threadPool = new ScheduledThreadPoolExecutor(corePoolSize,threadFactory);
// ScheduledExecutorService threadPool = new ScheduledThreadPoolExecutor(corePoolSize);
// scheduleAtFixedRate(threadPool,1000);
// scheduleAtFixedRate(threadPool,5000);
scheduleWithFixedDelay(threadPool,"2019-03-09 14:09:50");
scheduleWithFixedDelay(threadPool,"2019-03-09 14:02:30");
}
private void scheduleAtFixedRate(ScheduledExecutorService service, final int sleepTime){
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("scheduleAtFixedRate 开始时间:" +
DateFormat.getTimeInstance().format(new Date()));
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("scheduleAtFixedRate 消耗时间:" + (end -start)/1000 + "m");
System.out.println("scheduleAtFixedRate 完成时间:"
+ DateFormat.getTimeInstance().format(new Date()));
System.out.println("----------------华丽的分割线-------------------");
}
},1000,5000,TimeUnit.MILLISECONDS);
}
private static void scheduleWithFixedDelay(ScheduledExecutorService service,String time){
long delay = 5000;
long initDelay = getTimeMillis(time) - System.currentTimeMillis();
System.out.println(getTimeMillis(time));
System.out.println(System.currentTimeMillis());
System.out.println(initDelay);
initDelay = initDelay > 0 ? initDelay : delay + initDelay;
service.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":"+DateFormat.getTimeInstance().format(new Date()));
}
},initDelay,delay,TimeUnit.MILLISECONDS);
}
private static long getTimeMillis(String time) {
return (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(time, new ParsePosition(0)).getTime();
}
七、ThreadPoolExecutor、ScheduledThreadPoolExecutor实例化参数用法详析
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
new ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory);
- corePoolSize - 线程池核心池的大小。
当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。 - maximumPoolSize - 线程池的最大线程数。
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
线程池大小
线程池有两个线程数的设置,一个为核心池线程数(corePoolSize),一个为最大线程数(maximumPoolSize)。
在创建了线程池后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法
当创建的线程数等于 corePoolSize 时,会加入设置的阻塞队列。当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize。 - keepAliveTime当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。 - unit - keepAliveTime 的时间单位。
可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。 - workQueue - 用于保存等待执行的任务的阻塞队列。
阻塞队列:
1.ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2.LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
3.SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高
4.PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
5.DelayQueue: 一个使用优先级队列实现的无界阻塞队列。 - threadFactory - 线程工厂。
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常有帮助 - RejectedExecutionHandler(饱和策略)-当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
1.ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认)
2.ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
3.ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
4.ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
此部分参照:https://blog.csdn.net/fly910905/article/details/81584675
线程池原理可阅读我另一篇博客
谈谈对 java线程池(Executors、ExecutorService、ScheduledExecutorService)和(Callable、Future、CyclicBarrier)的理解