线程池(转)

线程池的优点

1、线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。

2、可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。

线程池的创建


    
    
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue <Runnable> workQueue,
  6. RejectedExecutionHandler handler)

corePoolSize:线程池核心线程数量

maximumPoolSize:线程池最大线程数量

keepAliverTime:当活跃线程数大于核心线程数时,空闲的多余线程最大存活时间

unit:存活时间的单位

workQueue:存放任务的队列

handler:超出线程范围和队列容量的任务的处理程序

线程池的实现原理

提交一个任务到线程池中,线程池的处理流程如下:

1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。

2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

  这里写图片描述

线程池的源码解读

1、ThreadPoolExecutor的execute()方法


    
    
  1. 1 public void execute(Runnable command) {
  2. 2 if (command == null)
  3. 3 throw new NullPointerException();
  4.        //如果线程数大于等于基本线程数或者线程创建失败,将任务加入队列
  5. 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
  6.           //线程池处于运行状态并且加入队列成功
  7. 5 if (runState == RUNNING && workQueue.offer(command)) {
  8. 6 if (runState != RUNNING || poolSize == 0)
  9. 7 ensureQueuedTaskHandled(command);
  10. 8 }
  11.          //线程池不处于运行状态或者加入队列失败,则创建线程(创建的是非核心线程)
  12. 9 else if (!addIfUnderMaximumPoolSize(command))
  13.            //创建线程失败,则采取阻塞处理的方式
  14. 10 reject(command); // is shutdown or saturated
  15. 11 }
  16. 12 }

2、创建线程的方法:addIfUnderCorePoolSize(command)


    
    
  1. 1 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  2. 2 Thread t = null;
  3. 3 final ReentrantLock mainLock = this.mainLock;
  4. 4 mainLock.lock();
  5. 5 try {
  6. 6 if (poolSize < corePoolSize && runState == RUNNING)
  7. 7 t = addThread(firstTask);
  8. 8 } finally {
  9. 9 mainLock.unlock();
  10. 10 }
  11. 11 if ( t == null)
  12. 12 return false;
  13. 13 t.start();
  14. 14 return true;
  15. 15 }

我们重点来看第7行:


    
    
  1. 1 private Thread addThread(Runnable firstTask) {
  2. 2 Worker w = new Worker(firstTask);
  3. 3 Thread t = threadFactory.newThread(w);
  4. 4 if (t != null) {
  5. 5 w.thread = t;
  6. 6 workers.add(w);
  7. 7 int nt = ++poolSize;
  8. 8 if (nt > largestPoolSize)
  9. 9 largestPoolSize = nt;
  10. 10 }
  11. 11 return t;
  12. 12 }

这里将线程封装成工作线程worker,并放入工作线程组里,worker类的方法run方法:


    
    
  1. public void run() {
  2. try {
  3. Runnable task = firstTask;
  4. firstTask = null;
  5. while (task != null || (task = getTask()) != null) {
  6. runTask(task);
  7. task = null;
  8. }
  9. } finally {
  10. workerDone(this);
  11. }
  12. }

worker在执行完任务后,还会通过getTask方法循环获取工作队里里的任务来执行。

我们通过一个程序来观察线程池的工作原理:

1、创建一个线程


    
    
  1. 1 public class ThreadPoolTest implements Runnable
  2. 2 {
  3. 3 @Override
  4. 4 public void run()
  5. 5 {
  6. 6 try
  7. 7 {
  8. 8 Thread.sleep(300);
  9. 9 }
  10. 10 catch (InterruptedException e)
  11. 11 {
  12. 12 e.printStackTrace();
  13. 13 }
  14. 14 }
  15. 15 }

2、线程池循环运行16个线程:


    
    
  1. 1 public static void main(String[] args)
  2. 2 {
  3. 3 LinkedBlockingQueue <Runnable> queue =
  4. 4 new LinkedBlockingQueue <Runnable>(5);
  5. 5 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, queue);
  6. 6 for (int i = 0; i < 16 ; i++)
  7. 7 {
  8. 8 threadPool.execute(
  9. 9 new Thread( new ThreadPoolTest(), " Thread" .concat( i + "")));
  10. 10 System.out.println("线程池中活跃的线程数: " + threadPool.getPoolSize());
  11. 11 if ( queue.size() > 0)
  12. 12 {
  13. 13 System.out.println("----------------队列中阻塞的线程数" + queue.size());
  14. 14 }
  15. 15 }
  16. 16 threadPool.shutdown();
  17. 17 }

执行结果:


    
    
  1. 线程池中活跃的线程数: 1
  2. 线程池中活跃的线程数: 2
  3. 线程池中活跃的线程数: 3
  4. 线程池中活跃的线程数: 4
  5. 线程池中活跃的线程数: 5
  6. 线程池中活跃的线程数: 5
  7. ----------------队列中阻塞的线程数1
  8. 线程池中活跃的线程数: 5
  9. ----------------队列中阻塞的线程数2
  10. 线程池中活跃的线程数: 5
  11. ----------------队列中阻塞的线程数3
  12. 线程池中活跃的线程数: 5
  13. ----------------队列中阻塞的线程数4
  14. 线程池中活跃的线程数: 5
  15. ----------------队列中阻塞的线程数5
  16. 线程池中活跃的线程数: 6
  17. ----------------队列中阻塞的线程数5
  18. 线程池中活跃的线程数: 7
  19. ----------------队列中阻塞的线程数5
  20. 线程池中活跃的线程数: 8
  21. ----------------队列中阻塞的线程数5
  22. 线程池中活跃的线程数: 9
  23. ----------------队列中阻塞的线程数5
  24. 线程池中活跃的线程数: 10
  25. ----------------队列中阻塞的线程数5
  26. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[Thread15,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
  27. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  28. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  29. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  30. at test.ThreadTest.main(ThreadTest.java:17)

从结果可以观察出:

1、创建的线程池具体配置为:核心线程数量为5个;全部线程数量为10个;工作队列的长度为5。

2、我们通过queue.size()的方法来获取工作队列中的任务数。

3、运行原理:

      刚开始都是在创建新的线程,达到核心线程数量5个后,新的任务进来后不再创建新的线程,而是将任务加入工作队列,任务队列到达上线5个后,新的任务又会创建新的普通线程,直到达到线程池最大的线程数量10个,后面的任务则根据配置的饱和策略来处理。我们这里没有具体配置,使用的是默认的配置AbortPolicy:直接抛出异常。

  当然,为了达到我需要的效果,上述线程处理的任务都是利用休眠导致线程没有释放!!!

RejectedExecutionHandler:饱和策略

当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:

1、AbortPolicy:直接抛出异常

2、CallerRunsPolicy:只用调用所在的线程运行任务

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

4、DiscardPolicy:不处理,丢弃掉。

我们现在用第四种策略来处理上面的程序:


    
    
  1. 1 public static void main(String[] args)
  2. 2 {
  3. 3 LinkedBlockingQueue <Runnable> queue =
  4. 4 new LinkedBlockingQueue <Runnable>(3);
  5. 5 RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
  6. 6
  7. 7 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue,handler);
  8. 8 for (int i = 0; i < 9 ; i++)
  9. 9 {
  10. 10 threadPool.execute(
  11. 11 new Thread( new ThreadPoolTest(), " Thread" .concat( i + "")));
  12. 12 System.out.println("线程池中活跃的线程数: " + threadPool.getPoolSize());
  13. 13 if ( queue.size() > 0)
  14. 14 {
  15. 15 System.out.println("----------------队列中阻塞的线程数" + queue.size());
  16. 16 }
  17. 17 }
  18. 18 threadPool.shutdown();
  19. 19 }

执行结果:


    
    
  1. 线程池中活跃的线程数: 1
  2. 线程池中活跃的线程数: 2
  3. 线程池中活跃的线程数: 2
  4. ----------------队列中阻塞的线程数1
  5. 线程池中活跃的线程数: 2
  6. ----------------队列中阻塞的线程数2
  7. 线程池中活跃的线程数: 2
  8. ----------------队列中阻塞的线程数3
  9. 线程池中活跃的线程数: 3
  10. ----------------队列中阻塞的线程数3
  11. 线程池中活跃的线程数: 4
  12. ----------------队列中阻塞的线程数3
  13. 线程池中活跃的线程数: 5
  14. ----------------队列中阻塞的线程数3
  15. 线程池中活跃的线程数: 5
  16. ----------------队列中阻塞的线程数3

复制代码

这里采用了丢弃策略后,就没有再抛出异常,而是直接丢弃。在某些重要的场景下,可以采用记录日志或者存储到数据库中,而不应该直接丢弃。

设置策略有两种方式:

1、


    
    
  1. RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
  2. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue,handler);

2、


    
    
  1. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue);
  2. threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

Executor框架的两级调度模型

在HotSpot VM的模型中,JAVA线程被一对一映射为本地操作系统线程。JAVA线程启动时会创建一个本地操作系统线程,当JAVA线程终止时,对应的操作系统线程也被销毁回收,而操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,JAVA程序会将应用分解为多个任务,然后使用应用级的调度器(Executor)将这些任务映射成固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

Executor框架类图

在前面介绍的JAVA线程既是工作单元,也是执行机制。而在Executor框架中,我们将工作单元与执行机制分离开来。Runnable和Callable是工作单元(也就是俗称的任务),而执行机制由Executor来提供。这样一来Executor是基于生产者消费者模式的,提交任务的操作相当于生成者,执行任务的线程相当于消费者。

1、从类图上看,Executor接口是异步任务执行框架的基础,该框架能够支持多种不同类型的任务执行策略。


    
    
  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

Executor接口就提供了一个执行方法,任务是Runnbale类型,不支持Callable类型。


2、ExecutorService接口实现了Executor接口,主要提供了关闭线程池和submit方法:


    
    
  1. public interface ExecutorService extends Executor {
  2. List <Runnable> shutdownNow();
  3. boolean isTerminated();
  4. <T> Future <T> submit(Callable <T> task);
  5. }

另外该接口有两个重要的实现类:ThreadPoolExecutor与ScheduledThreadPoolExecutor。

其中ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务;而ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行任务,或者定期执行命令。

在上一篇文章中,我是使用ThreadPoolExecutor来通过给定不同的参数从而创建自己所需的线程池,但是在后面的工作中不建议这种方式,推荐使用Exectuors工厂方法来创建线程池

这里先来区别线程池和线程组(ThreadGroup与ThreadPoolExecutor)这两个概念:

a、线程组就表示一个线程的集合。

b、线程池是为线程的生命周期开销问题和资源不足问题提供解决方案,主要是用来管理线程。

Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadExecutor和CachedThreadPool

a、SingleThreadExecutor:单线程线程池

ExecutorService threadPool = Executors.newSingleThreadExecutor();
    
    

    
    
  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue <Runnable>()));
  6. }

我们从源码来看可以知道,单线程线程池的创建也是通过ThreadPoolExecutor,里面的核心线程数和线程数都是1,并且工作队列使用的是无界队列。由于是单线程工作,每次只能处理一个任务,所以后面所有的任务都被阻塞在工作队列中,只能一个个任务执行。

b、FixedThreadExecutor:固定大小线程池

ExecutorService threadPool = Executors.newFixedThreadPool(5);
    
    

    
    
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue <Runnable>());
  5. }

这个与单线程类似,只是创建了固定大小的线程数量。

c、CachedThreadPool:无界线程池

ExecutorService threadPool = Executors.newCachedThreadPool();
    
    

    
    
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue <Runnable>());
  5. }

无界线程池意味着没有工作队列,任务进来就执行,线程数量不够就创建,与前面两个的区别是:空闲的线程会被回收掉,空闲的时间是60s。这个适用于执行很多短期异步的小程序或者负载较轻的服务器。

Callable、Future、FutureTash详解

Callable与Future是在JAVA的后续版本中引入进来的,Callable类似于Runnable接口,实现Callable接口的类与实现Runnable的类都是可以被线程执行的任务。

三者之间的关系:

Callable是Runnable封装的异步运算任务。

Future用来保存Callable异步运算的结果

FutureTask封装Future的实体类

1、Callable与Runnbale的区别

a、Callable定义的方法是call,而Runnable定义的方法是run。

b、call方法有返回值,而run方法是没有返回值的。

c、call方法可以抛出异常,而run方法不能抛出异常。

2、Future

Future表示异步计算的结果,提供了以下方法,主要是判断任务是否完成、中断任务、获取任务执行结果


    
    
  1. 1 public interface Future <V> {
  2. 2
  3. 3 boolean cancel(boolean mayInterruptIfRunning);
  4. 4
  5. 5 boolean isCancelled();
  6. 6
  7. 7 boolean isDone();
  8. 8
  9. 9 V get() throws InterruptedException, ExecutionException;
  10. 10
  11. 11 V get(long timeout, TimeUnit unit)
  12. 12 throws InterruptedException, ExecutionException, TimeoutException;
  13. 13 }

3、FutureTask<V>

可取消的异步计算,此类提供了对Future的基本实现,仅在计算完成时才能获取结果,如果计算尚未完成,则阻塞get方法。

public class FutureTask<V> implements RunnableFuture<V>
    
    
public interface RunnableFuture<V> extends Runnable, Future<V>
    
    

FutureTask不仅实现了Future接口,还实现了Runnable接口,所以不仅可以将FutureTask当成一个任务交给Executor来执行,还可以通过Thread来创建一个线程。

Callable与FutureTask

定义一个callable的任务:


    
    
  1. 1 public class MyCallableTask implements Callable <Integer>
  2. 2 {
  3. 3 @Override
  4. 4 public Integer call()
  5. 5 throws Exception
  6. 6 {
  7. 7 System.out.println("callable do somothing");
  8. 8 Thread.sleep(5000);
  9. 9 return new Random().nextInt(100);
  10. 10 }
  11. 11 }

    
    
  1. 1 public class CallableTest
  2. 2 {
  3. 3 public static void main(String[] args) throws Exception
  4. 4 {
  5. 5 Callable <Integer> callable = new MyCallableTask();
  6. 6 FutureTask <Integer> future = new FutureTask <Integer>(callable);
  7. 7 Thread thread = new Thread(future);
  8. 8 thread.start();
  9. 9 Thread.sleep(100);
  10. 10 //尝试取消对此任务的执行
  11. 11 future.cancel(true);
  12. 12 //判断是否在任务正常完成前取消
  13. 13 System.out.println("future is cancel:" + future.isCancelled());
  14. 14 if(!future.isCancelled())
  15. 15 {
  16. 16 System.out.println("future is cancelled");
  17. 17 }
  18. 18 //判断任务是否已完成
  19. 19 System.out.println("future is done:" + future.isDone());
  20. 20 if(!future.isDone())
  21. 21 {
  22. 22 System.out.println("future get=" + future.get());
  23. 23 }
  24. 24 else
  25. 25 {
  26. 26 //任务已完成
  27. 27 System.out.println("task is done");
  28. 28 }
  29. 29 }
  30. 30 }

执行结果:


    
    
  1. callable do somothing
  2. future is cancel:true
  3. future is done:true
  4. task is done

这个DEMO主要是通过调用FutureTask的状态设置的方法,演示了状态的变迁。

a、第11行,尝试取消对任务的执行,该方法如果由于任务已完成、已取消则返回false,如果能够取消还未完成的任务,则返回true,该DEMO中由于任务还在休眠状态,所以可以取消成功。

future.cancel(true);
    
    

b、第13行,判断任务取消是否成功:如果在任务正常完成前将其取消,则返回true

System.out.println("future is cancel:" + future.isCancelled());
    
    

c、第19行,判断任务是否完成:如果任务完成,则返回true,以下几种情况都属于任务完成:正常终止、异常或者取消而完成。

    我们的DEMO中,任务是由于取消而导致完成。

 System.out.println("future is done:" + future.isDone());
    
    

d、在第22行,获取异步线程执行的结果,我这个DEMO中没有执行到这里,需要注意的是,future.get方法会阻塞当前线程, 直到任务执行完成返回结果为止。

System.out.println("future get=" + future.get());
    
    

Callable与Future


    
    
  1. public class CallableThread implements Callable <String>
  2. {
  3. @Override
  4. public String call()
  5. throws Exception
  6. {
  7. System.out.println("进入Call方法,开始休眠,休眠时间为:" + System.currentTimeMillis());
  8. Thread.sleep(10000);
  9. return "今天停电";
  10. }
  11. public static void main(String[] args) throws Exception
  12. {
  13. ExecutorService es = Executors.newSingleThreadExecutor();
  14. Callable <String> call = new CallableThread();
  15. Future <String> fu = es.submit(call);
  16. es.shutdown();
  17. Thread.sleep(5000);
  18. System.out.println("主线程休眠5秒,当前时间" + System.currentTimeMillis());
  19. String str = fu.get();
  20. System.out.println("Future已拿到数据,str=" + str + ";当前时间为:" + System.currentTimeMillis());
  21. }
  22. }

执行结果:


    
    
  1. 进入Call方法,开始休眠,休眠时间为:1478606602676
  2. 主线程休眠5秒,当前时间1478606608676
  3. Future已拿到数据,str=今天停电;当前时间为:1478606612677

这里的future是直接扔到线程池里面去执行的。由于要打印任务的执行结果,所以从执行结果来看,主线程虽然休眠了5s,但是从Call方法执行到拿到任务的结果,这中间的时间差正好是10s,说明get方法会阻塞当前线程直到任务完成。

通过FutureTask也可以达到同样的效果:


    
    
  1. public static void main(String[] args) throws Exception
  2. {
  3. ExecutorService es = Executors.newSingleThreadExecutor();
  4. Callable <String> call = new CallableThread();
  5. FutureTask <String> task = new FutureTask <String>(call);
  6. es.submit(task);
  7. es.shutdown();
  8. Thread.sleep(5000);
  9. System.out.println("主线程等待5秒,当前时间为:" + System.currentTimeMillis());
  10. String str = task.get();
  11. System.out.println("Future已拿到数据,str=" + str + ";当前时间为:" + System.currentTimeMillis());
  12. }

以上的组合可以给我们带来这样的一些变化:

如有一种场景中,方法A返回一个数据需要10s,A方法后面的代码运行需要20s,但是这20s的执行过程中,只有后面10s依赖于方法A执行的结果。如果与以往一样采用同步的方式,势必会有10s的时间被浪费,如果采用前面两种组合,则效率会提高:

1、先把A方法的内容放到Callable实现类的call()方法中

2、在主线程中通过线程池执行A任务

3、执行后面方法中10秒不依赖方法A运行结果的代码

4、获取方法A的运行结果,执行后面方法中10秒依赖方法A运行结果的代码

这样代码执行效率一下子就提高了,程序不必卡在A方法处。

            </div>

线程池的优点

1、线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。

2、可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。

线程池的创建


  
  
  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue <Runnable> workQueue,
  6. RejectedExecutionHandler handler)

corePoolSize:线程池核心线程数量

maximumPoolSize:线程池最大线程数量

keepAliverTime:当活跃线程数大于核心线程数时,空闲的多余线程最大存活时间

unit:存活时间的单位

workQueue:存放任务的队列

handler:超出线程范围和队列容量的任务的处理程序

线程池的实现原理

提交一个任务到线程池中,线程池的处理流程如下:

1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。

2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

  这里写图片描述

线程池的源码解读

1、ThreadPoolExecutor的execute()方法


  
  
  1. 1 public void execute(Runnable command) {
  2. 2 if (command == null)
  3. 3 throw new NullPointerException();
  4.        //如果线程数大于等于基本线程数或者线程创建失败,将任务加入队列
  5. 4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
  6.           //线程池处于运行状态并且加入队列成功
  7. 5 if (runState == RUNNING && workQueue.offer(command)) {
  8. 6 if (runState != RUNNING || poolSize == 0)
  9. 7 ensureQueuedTaskHandled(command);
  10. 8 }
  11.          //线程池不处于运行状态或者加入队列失败,则创建线程(创建的是非核心线程)
  12. 9 else if (!addIfUnderMaximumPoolSize(command))
  13.            //创建线程失败,则采取阻塞处理的方式
  14. 10 reject(command); // is shutdown or saturated
  15. 11 }
  16. 12 }

2、创建线程的方法:addIfUnderCorePoolSize(command)


  
  
  1. 1 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  2. 2 Thread t = null;
  3. 3 final ReentrantLock mainLock = this.mainLock;
  4. 4 mainLock.lock();
  5. 5 try {
  6. 6 if (poolSize < corePoolSize && runState == RUNNING)
  7. 7 t = addThread(firstTask);
  8. 8 } finally {
  9. 9 mainLock.unlock();
  10. 10 }
  11. 11 if ( t == null)
  12. 12 return false;
  13. 13 t.start();
  14. 14 return true;
  15. 15 }

我们重点来看第7行:


  
  
  1. 1 private Thread addThread(Runnable firstTask) {
  2. 2 Worker w = new Worker(firstTask);
  3. 3 Thread t = threadFactory.newThread(w);
  4. 4 if (t != null) {
  5. 5 w.thread = t;
  6. 6 workers.add(w);
  7. 7 int nt = ++poolSize;
  8. 8 if (nt > largestPoolSize)
  9. 9 largestPoolSize = nt;
  10. 10 }
  11. 11 return t;
  12. 12 }

这里将线程封装成工作线程worker,并放入工作线程组里,worker类的方法run方法:


  
  
  1. public void run() {
  2. try {
  3. Runnable task = firstTask;
  4. firstTask = null;
  5. while (task != null || (task = getTask()) != null) {
  6. runTask(task);
  7. task = null;
  8. }
  9. } finally {
  10. workerDone(this);
  11. }
  12. }

worker在执行完任务后,还会通过getTask方法循环获取工作队里里的任务来执行。

我们通过一个程序来观察线程池的工作原理:

1、创建一个线程


  
  
  1. 1 public class ThreadPoolTest implements Runnable
  2. 2 {
  3. 3 @Override
  4. 4 public void run()
  5. 5 {
  6. 6 try
  7. 7 {
  8. 8 Thread.sleep(300);
  9. 9 }
  10. 10 catch (InterruptedException e)
  11. 11 {
  12. 12 e.printStackTrace();
  13. 13 }
  14. 14 }
  15. 15 }

2、线程池循环运行16个线程:


  
  
  1. 1 public static void main(String[] args)
  2. 2 {
  3. 3 LinkedBlockingQueue <Runnable> queue =
  4. 4 new LinkedBlockingQueue <Runnable>(5);
  5. 5 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, queue);
  6. 6 for (int i = 0; i < 16 ; i++)
  7. 7 {
  8. 8 threadPool.execute(
  9. 9 new Thread( new ThreadPoolTest(), " Thread" .concat( i + "")));
  10. 10 System.out.println("线程池中活跃的线程数: " + threadPool.getPoolSize());
  11. 11 if ( queue.size() > 0)
  12. 12 {
  13. 13 System.out.println("----------------队列中阻塞的线程数" + queue.size());
  14. 14 }
  15. 15 }
  16. 16 threadPool.shutdown();
  17. 17 }

执行结果:


  
  
  1. 线程池中活跃的线程数: 1
  2. 线程池中活跃的线程数: 2
  3. 线程池中活跃的线程数: 3
  4. 线程池中活跃的线程数: 4
  5. 线程池中活跃的线程数: 5
  6. 线程池中活跃的线程数: 5
  7. ----------------队列中阻塞的线程数1
  8. 线程池中活跃的线程数: 5
  9. ----------------队列中阻塞的线程数2
  10. 线程池中活跃的线程数: 5
  11. ----------------队列中阻塞的线程数3
  12. 线程池中活跃的线程数: 5
  13. ----------------队列中阻塞的线程数4
  14. 线程池中活跃的线程数: 5
  15. ----------------队列中阻塞的线程数5
  16. 线程池中活跃的线程数: 6
  17. ----------------队列中阻塞的线程数5
  18. 线程池中活跃的线程数: 7
  19. ----------------队列中阻塞的线程数5
  20. 线程池中活跃的线程数: 8
  21. ----------------队列中阻塞的线程数5
  22. 线程池中活跃的线程数: 9
  23. ----------------队列中阻塞的线程数5
  24. 线程池中活跃的线程数: 10
  25. ----------------队列中阻塞的线程数5
  26. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[Thread15,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
  27. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  28. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  29. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  30. at test.ThreadTest.main(ThreadTest.java:17)

从结果可以观察出:

1、创建的线程池具体配置为:核心线程数量为5个;全部线程数量为10个;工作队列的长度为5。

2、我们通过queue.size()的方法来获取工作队列中的任务数。

3、运行原理:

      刚开始都是在创建新的线程,达到核心线程数量5个后,新的任务进来后不再创建新的线程,而是将任务加入工作队列,任务队列到达上线5个后,新的任务又会创建新的普通线程,直到达到线程池最大的线程数量10个,后面的任务则根据配置的饱和策略来处理。我们这里没有具体配置,使用的是默认的配置AbortPolicy:直接抛出异常。

  当然,为了达到我需要的效果,上述线程处理的任务都是利用休眠导致线程没有释放!!!

RejectedExecutionHandler:饱和策略

当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:

1、AbortPolicy:直接抛出异常

2、CallerRunsPolicy:只用调用所在的线程运行任务

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

4、DiscardPolicy:不处理,丢弃掉。

我们现在用第四种策略来处理上面的程序:


  
  
  1. 1 public static void main(String[] args)
  2. 2 {
  3. 3 LinkedBlockingQueue <Runnable> queue =
  4. 4 new LinkedBlockingQueue <Runnable>(3);
  5. 5 RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
  6. 6
  7. 7 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue,handler);
  8. 8 for (int i = 0; i < 9 ; i++)
  9. 9 {
  10. 10 threadPool.execute(
  11. 11 new Thread( new ThreadPoolTest(), " Thread" .concat( i + "")));
  12. 12 System.out.println("线程池中活跃的线程数: " + threadPool.getPoolSize());
  13. 13 if ( queue.size() > 0)
  14. 14 {
  15. 15 System.out.println("----------------队列中阻塞的线程数" + queue.size());
  16. 16 }
  17. 17 }
  18. 18 threadPool.shutdown();
  19. 19 }

执行结果:


  
  
  1. 线程池中活跃的线程数: 1
  2. 线程池中活跃的线程数: 2
  3. 线程池中活跃的线程数: 2
  4. ----------------队列中阻塞的线程数1
  5. 线程池中活跃的线程数: 2
  6. ----------------队列中阻塞的线程数2
  7. 线程池中活跃的线程数: 2
  8. ----------------队列中阻塞的线程数3
  9. 线程池中活跃的线程数: 3
  10. ----------------队列中阻塞的线程数3
  11. 线程池中活跃的线程数: 4
  12. ----------------队列中阻塞的线程数3
  13. 线程池中活跃的线程数: 5
  14. ----------------队列中阻塞的线程数3
  15. 线程池中活跃的线程数: 5
  16. ----------------队列中阻塞的线程数3

复制代码

这里采用了丢弃策略后,就没有再抛出异常,而是直接丢弃。在某些重要的场景下,可以采用记录日志或者存储到数据库中,而不应该直接丢弃。

设置策略有两种方式:

1、


  
  
  1. RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
  2. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue,handler);

2、


  
  
  1. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue);
  2. threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

Executor框架的两级调度模型

在HotSpot VM的模型中,JAVA线程被一对一映射为本地操作系统线程。JAVA线程启动时会创建一个本地操作系统线程,当JAVA线程终止时,对应的操作系统线程也被销毁回收,而操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,JAVA程序会将应用分解为多个任务,然后使用应用级的调度器(Executor)将这些任务映射成固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

Executor框架类图

在前面介绍的JAVA线程既是工作单元,也是执行机制。而在Executor框架中,我们将工作单元与执行机制分离开来。Runnable和Callable是工作单元(也就是俗称的任务),而执行机制由Executor来提供。这样一来Executor是基于生产者消费者模式的,提交任务的操作相当于生成者,执行任务的线程相当于消费者。

1、从类图上看,Executor接口是异步任务执行框架的基础,该框架能够支持多种不同类型的任务执行策略。


  
  
  1. public interface Executor {
  2. void execute(Runnable command);
  3. }

Executor接口就提供了一个执行方法,任务是Runnbale类型,不支持Callable类型。


2、ExecutorService接口实现了Executor接口,主要提供了关闭线程池和submit方法:


  
  
  1. public interface ExecutorService extends Executor {
  2. List <Runnable> shutdownNow();
  3. boolean isTerminated();
  4. <T> Future <T> submit(Callable <T> task);
  5. }

另外该接口有两个重要的实现类:ThreadPoolExecutor与ScheduledThreadPoolExecutor。

其中ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务;而ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行任务,或者定期执行命令。

在上一篇文章中,我是使用ThreadPoolExecutor来通过给定不同的参数从而创建自己所需的线程池,但是在后面的工作中不建议这种方式,推荐使用Exectuors工厂方法来创建线程池

这里先来区别线程池和线程组(ThreadGroup与ThreadPoolExecutor)这两个概念:

a、线程组就表示一个线程的集合。

b、线程池是为线程的生命周期开销问题和资源不足问题提供解决方案,主要是用来管理线程。

Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadExecutor和CachedThreadPool

a、SingleThreadExecutor:单线程线程池

ExecutorService threadPool = Executors.newSingleThreadExecutor();
  
  

  
  
  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue <Runnable>()));
  6. }

我们从源码来看可以知道,单线程线程池的创建也是通过ThreadPoolExecutor,里面的核心线程数和线程数都是1,并且工作队列使用的是无界队列。由于是单线程工作,每次只能处理一个任务,所以后面所有的任务都被阻塞在工作队列中,只能一个个任务执行。

b、FixedThreadExecutor:固定大小线程池

ExecutorService threadPool = Executors.newFixedThreadPool(5);
  
  

  
  
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue <Runnable>());
  5. }

这个与单线程类似,只是创建了固定大小的线程数量。

c、CachedThreadPool:无界线程池

ExecutorService threadPool = Executors.newCachedThreadPool();
  
  

  
  
  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue <Runnable>());
  5. }

无界线程池意味着没有工作队列,任务进来就执行,线程数量不够就创建,与前面两个的区别是:空闲的线程会被回收掉,空闲的时间是60s。这个适用于执行很多短期异步的小程序或者负载较轻的服务器。

Callable、Future、FutureTash详解

Callable与Future是在JAVA的后续版本中引入进来的,Callable类似于Runnable接口,实现Callable接口的类与实现Runnable的类都是可以被线程执行的任务。

三者之间的关系:

Callable是Runnable封装的异步运算任务。

Future用来保存Callable异步运算的结果

FutureTask封装Future的实体类

1、Callable与Runnbale的区别

a、Callable定义的方法是call,而Runnable定义的方法是run。

b、call方法有返回值,而run方法是没有返回值的。

c、call方法可以抛出异常,而run方法不能抛出异常。

2、Future

Future表示异步计算的结果,提供了以下方法,主要是判断任务是否完成、中断任务、获取任务执行结果


  
  
  1. 1 public interface Future <V> {
  2. 2
  3. 3 boolean cancel(boolean mayInterruptIfRunning);
  4. 4
  5. 5 boolean isCancelled();
  6. 6
  7. 7 boolean isDone();
  8. 8
  9. 9 V get() throws InterruptedException, ExecutionException;
  10. 10
  11. 11 V get(long timeout, TimeUnit unit)
  12. 12 throws InterruptedException, ExecutionException, TimeoutException;
  13. 13 }

3、FutureTask<V>

可取消的异步计算,此类提供了对Future的基本实现,仅在计算完成时才能获取结果,如果计算尚未完成,则阻塞get方法。

public class FutureTask<V> implements RunnableFuture<V>
  
  
public interface RunnableFuture<V> extends Runnable, Future<V>
  
  

FutureTask不仅实现了Future接口,还实现了Runnable接口,所以不仅可以将FutureTask当成一个任务交给Executor来执行,还可以通过Thread来创建一个线程。

Callable与FutureTask

定义一个callable的任务:


  
  
  1. 1 public class MyCallableTask implements Callable <Integer>
  2. 2 {
  3. 3 @Override
  4. 4 public Integer call()
  5. 5 throws Exception
  6. 6 {
  7. 7 System.out.println("callable do somothing");
  8. 8 Thread.sleep(5000);
  9. 9 return new Random().nextInt(100);
  10. 10 }
  11. 11 }

  
  
  1. 1 public class CallableTest
  2. 2 {
  3. 3 public static void main(String[] args) throws Exception
  4. 4 {
  5. 5 Callable <Integer> callable = new MyCallableTask();
  6. 6 FutureTask <Integer> future = new FutureTask <Integer>(callable);
  7. 7 Thread thread = new Thread(future);
  8. 8 thread.start();
  9. 9 Thread.sleep(100);
  10. 10 //尝试取消对此任务的执行
  11. 11 future.cancel(true);
  12. 12 //判断是否在任务正常完成前取消
  13. 13 System.out.println("future is cancel:" + future.isCancelled());
  14. 14 if(!future.isCancelled())
  15. 15 {
  16. 16 System.out.println("future is cancelled");
  17. 17 }
  18. 18 //判断任务是否已完成
  19. 19 System.out.println("future is done:" + future.isDone());
  20. 20 if(!future.isDone())
  21. 21 {
  22. 22 System.out.println("future get=" + future.get());
  23. 23 }
  24. 24 else
  25. 25 {
  26. 26 //任务已完成
  27. 27 System.out.println("task is done");
  28. 28 }
  29. 29 }
  30. 30 }

执行结果:


  
  
  1. callable do somothing
  2. future is cancel:true
  3. future is done:true
  4. task is done

这个DEMO主要是通过调用FutureTask的状态设置的方法,演示了状态的变迁。

a、第11行,尝试取消对任务的执行,该方法如果由于任务已完成、已取消则返回false,如果能够取消还未完成的任务,则返回true,该DEMO中由于任务还在休眠状态,所以可以取消成功。

future.cancel(true);
  
  

b、第13行,判断任务取消是否成功:如果在任务正常完成前将其取消,则返回true

System.out.println("future is cancel:" + future.isCancelled());
  
  

c、第19行,判断任务是否完成:如果任务完成,则返回true,以下几种情况都属于任务完成:正常终止、异常或者取消而完成。

    我们的DEMO中,任务是由于取消而导致完成。

 System.out.println("future is done:" + future.isDone());
  
  

d、在第22行,获取异步线程执行的结果,我这个DEMO中没有执行到这里,需要注意的是,future.get方法会阻塞当前线程, 直到任务执行完成返回结果为止。

System.out.println("future get=" + future.get());
  
  

Callable与Future


  
  
  1. public class CallableThread implements Callable <String>
  2. {
  3. @Override
  4. public String call()
  5. throws Exception
  6. {
  7. System.out.println("进入Call方法,开始休眠,休眠时间为:" + System.currentTimeMillis());
  8. Thread.sleep(10000);
  9. return "今天停电";
  10. }
  11. public static void main(String[] args) throws Exception
  12. {
  13. ExecutorService es = Executors.newSingleThreadExecutor();
  14. Callable <String> call = new CallableThread();
  15. Future <String> fu = es.submit(call);
  16. es.shutdown();
  17. Thread.sleep(5000);
  18. System.out.println("主线程休眠5秒,当前时间" + System.currentTimeMillis());
  19. String str = fu.get();
  20. System.out.println("Future已拿到数据,str=" + str + ";当前时间为:" + System.currentTimeMillis());
  21. }
  22. }

执行结果:


  
  
  1. 进入Call方法,开始休眠,休眠时间为:1478606602676
  2. 主线程休眠5秒,当前时间1478606608676
  3. Future已拿到数据,str=今天停电;当前时间为:1478606612677

这里的future是直接扔到线程池里面去执行的。由于要打印任务的执行结果,所以从执行结果来看,主线程虽然休眠了5s,但是从Call方法执行到拿到任务的结果,这中间的时间差正好是10s,说明get方法会阻塞当前线程直到任务完成。

通过FutureTask也可以达到同样的效果:


  
  
  1. public static void main(String[] args) throws Exception
  2. {
  3. ExecutorService es = Executors.newSingleThreadExecutor();
  4. Callable <String> call = new CallableThread();
  5. FutureTask <String> task = new FutureTask <String>(call);
  6. es.submit(task);
  7. es.shutdown();
  8. Thread.sleep(5000);
  9. System.out.println("主线程等待5秒,当前时间为:" + System.currentTimeMillis());
  10. String str = task.get();
  11. System.out.println("Future已拿到数据,str=" + str + ";当前时间为:" + System.currentTimeMillis());
  12. }

以上的组合可以给我们带来这样的一些变化:

如有一种场景中,方法A返回一个数据需要10s,A方法后面的代码运行需要20s,但是这20s的执行过程中,只有后面10s依赖于方法A执行的结果。如果与以往一样采用同步的方式,势必会有10s的时间被浪费,如果采用前面两种组合,则效率会提高:

1、先把A方法的内容放到Callable实现类的call()方法中

2、在主线程中通过线程池执行A任务

3、执行后面方法中10秒不依赖方法A运行结果的代码

4、获取方法A的运行结果,执行后面方法中10秒依赖方法A运行结果的代码

这样代码执行效率一下子就提高了,程序不必卡在A方法处。

            </div>

猜你喜欢

转载自blog.csdn.net/qq_37812691/article/details/82379503