手撕源码系列之线程池 -- ThreadPoolExecutor(一)

简介:ThreadPoolExecutor是线程池类,可以通俗的将它理解为存放一定数量线程的一个线程集合。
线程池允许若个线程同时运行,同时运行的线程数量就是线程池的容量;
当添加的到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。
线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

ThreadPoolExecutor

ThreadPoolExecutor作为比较常用的线程池。我们可以将其理解为一个存放一定数量线程的线程集合。
线程池允许多个线程同时运行,同时也按照构造参数内的核心参数,实现一定规则的任务执行和拒绝策略。

参数详解

workers

    //Set containing all worker threads in pool. Accessed only when
      holding mainLock.
    //set内包含线程池内所有的工作线程,仅在持有mainLock的时候才可以进行操作
     
    private final HashSet<Worker> workers = new HashSet<Worker>();
    

workers在ThreadPoolExecutor中是一个相当重要的参数。
线程池通过wokers实现了允许多个线程同时运行。workers是HashSet类型,即它是一个装载Worker类型实例的集合。一个Worker对应一个线程,也就是说线程池通过workers包含了一个线程集合。当Worker对应的线程启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。

ctl的作用和线程池状态详解

/**ctl维护两个概念上的参数:runState和workCount。因为int范围是二进制的32位。
	这里的ctl设计为32位的高三位作为线程池的运行状态
	低29位作为工作线程的总量(一般工作线程的总量也不会超过2的29次方 - 1)
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 用于计算runState的二进制移位数,即32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;

// 容量是1 << 29 - 1,即低29位用于表示最大容量(最大只能是2^29 - 1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 高3位用于表示运行状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 高三位为111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位为000
private static final int STOP = 1 << COUNT_BITS; // 高三位为001
private static final int TIDYING = 2 << COUNT_BITS; // 高三位为010
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位为011

线程池各个状态的转换并不是随意转,各个状态的转换关系

  1. RUNNING -> SHUTDOWN:在调用shutdown()时,可能隐含在finalize()方法。
  2. RUNNING or SHUTDOWN -> STOP:调用shutdownNow()。
  3. SHUTDOWN -> TIDYING:当队列和线程池都是空的时。
  4. STOP -> TIDYING:当线程池为空时。
  5. TIDYING -> TERMINATED:当terminate()方法完成时
    在这里插入图片描述

各个状态的内容详解

  1. RUNNING :
    状态转换:ThreadPoolExecutor在初始化的时候会默认生成RUNNING状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    状态说明:当前状态可正常接受任务,处理任务;

  2. SHUTDOWN:
    状态转换:由RUNNING调用shutdown()接口转换
    状态说明:当前状态不可接受新任务,但是可以执行已接收的任务。

  3. STOP:
    状态转换:由RUNNING 或者 SHUTDOWN 状态调用shutdownNow()接口转换
    状态说明:当前状态不可接受新任务,不处理已接收的任务,中断正在处理中的任务。

  4. TIDYING:
    状态转换:当 SHUTDOWN 状态下,缓存队列和执行队列都为空的情况下转成 TIDYING 执行完所有任务。 当 STOP 状态下,执行任务为空的时候转换为 TIDYING
    状态说明:所有任务已中止,工作线程为0。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

  5. TERMINATED :
    状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由TIDYING转变为TERMINATED。
    状态说明:线程池彻底终止,就变成TERMINATED状态。

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    
    
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
    
    
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

 
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
    
    
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    
    
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造方法内一共有六大核心参数
1.corePoolSize:核心线程数
2.maximumPoolSize:最大线程数
3.keepAliveTime,TimeUnit :线程在无工作的状态下的存活时间,keepAliveTime是时间数量,TimeUnit 是时间单位
4.workQueue:缓存队列(这里会使用用到java提供的队列,即queue
5.threadFactory:线程工厂,生成线程的规则
6.rejectedExecutionHandler:拒绝策略

核心参数

其中1,2,3会在后面的流程中讲解。这里主要讲解一下

  • QUEUE 缓存队列:

    queue具体可以看另一篇手撕源码系列之queue
    这里讲解一下线程池可用的几种缓存队列:
    1. ArrayBlockingQueue:
        基于数组的FIFO(FIFO)队列,是有界的,创建时必须指定大小。
    2. LinkedBlockingQueue:
        基于链表的FIFO队列,是无界的,默认大小是Integer.MAX_VALUE,也可以指定初始大小
    3. SynchronizedQueue:
        一个比较特殊的队列,虽然它是无界的,但它不会保存任务,每一个新增任务的线程必须等待另一个线程取出任务,也可以把它看成容量为0的队列。
    4. 或者其他自己实现的缓存队列,只需要去实现BlockingQueue接口即可,泛型是Runnable

  • RejectedExecutionHandler 拒绝策略

    简介:拒绝策略是在corePoolSize,maximumPoolSize ,workQueue 均达到设置上线时候触发的策略。一般由以下四种,如果需要实现自定义的拒绝策略可以去实现RejectedExecutionHandler接口。
    1. AbortPolicy:
        丢弃任务并抛出RejectedExecutionException异常。(如果不传拒绝策略的话,ThreadPoolExecutor默认的是 AbortPolicy 策略)

       private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  1. DiscardPolicy:
        和AbortPolicy差不多,都是丢弃任务但是不抛出异常。
  2. DiscardOldestPolicy:
       抛弃缓存队列最前面的任务,并尝试执行任务(重复执行)
  3. CallerRunsPolicy:
       由调用线程执行此任务(一般是主线程)
  • ThreadFactory 线程工厂

    简介: 线程工厂用于创建线程 , 默认使用的是Executors.defaultThreadFactory(),一般情况下不需要改动,如果特殊需要的话,可以自定义(实现ThreadFactory接口即可)

猜你喜欢

转载自blog.csdn.net/pontuss/article/details/114069337