线程与线程池笔记(三)线程池

目录

一、什么是线程池
二、使用方法
三、原理分析

一、什么是线程池?

线程池就是是一种多线程处理形式,里边可以有很多线程的池子,通过线程池可以对多条线程进行维护管理,进行有效合理的调度和复用等,节省系统和cpu资源,提升性能。

二、使用

一个非常非常简单的例子,容易理解,为后面做铺垫。

public class MyClass {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++){
            es.submit(new MyRunnable());
        }
        es.shutdown();
    }
}
class MyRunnable implements Runnable{
    @Override
    public void run() {
        while (true){
            System.out.println("thread id is: " + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
thread id is: 13
thread id is: 12
thread id is: 14
thread id is: 15
thread id is: 16
thread id is: 16
thread id is: 15
thread id is: 13
thread id is: 12
thread id is: 14
thread id is: 16
...

newFixedThreadPool:该方法返回一个固定线程数量的线程池。
我们虽然创建了10条线程丢到线程池里,但我们发现线程id只是从12-16共五条线程,这是因为我们创建的FixedThreadPool设置了最大同时运行线程数目5,而且当这5个运行完才会运行其他线程
我们来改一下代码:

public class MyClass {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++){
            es.submit(new MyRunnable());
        }
        es.shutdown();
    }
}
class MyRunnable implements Runnable{
    public boolean isStop = false;
    int i = 0;
    @Override
    public void run() {
        while (!isStop){
            if (i++ == 3){
                isStop = true;
            }else {
                System.out.println("thread name is: "+ Thread.currentThread().getName()+"---thread id is: " + Thread.currentThread().getId());

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-2---thread id is: 13
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-4---thread id is: 15
thread name is: pool-1-thread-3---thread id is: 14
thread name is: pool-1-thread-5---thread id is: 16
thread name is: pool-1-thread-1---thread id is: 12
thread name is: pool-1-thread-2---thread id is: 13

一条线程运行3次,但我们发现如果有线程完成了,下一条线程进来时线程id和名字都是一样的,这是因为:
直接看总结去吧哈哈

三、原理分析

ExecutorService es = Executors.newFixedThreadPool(5);

先看看这个句子涉及到了那几个类:

public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2);

    Future<?> submit(Runnable var1);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

public interface Executor {
    void execute(Runnable var1);
}
public class Executors {

ExecutorService就是Executor 的加强版,看上一篇可以了解Future、Runnable、Callable和有无返回值,和各种方法的区别,不多说。
ExecutorService里还定义了以下常用的方法如
shutdown() invokeAny()
其中shutdown()和shutdownNow()的区别在于前者会等待线程的执行完毕在shutdown。
然后就是Executors,其是一个工厂类,里边提供很多工厂方法来创建不同类型的线程池。

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }

    public static ExecutorService newWorkStealingPool(int var0) {
        return new ForkJoinPool(var0, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
    }
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
    }

    public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory var0) {
        return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, var0));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
    }

    public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
        return new ScheduledThreadPoolExecutor(var0, var1);
    }
常用线程池:
//创建单核心的线程池
newSingleThreadExecutor();
//创建固定核心数的线程池
newFixedThreadPool(5);
//创建一个按照计划规定执行的线程池
newScheduledThreadPool(2);
//创建一个自动增长的线程池
newCachedThreadPool();
//创建一个具有抢占式操作的线程池
newWorkStealingPool();
...
都可见名之意

那为什么创建完成的线程池都能被ExecutorService接受呢
看个类和接口的关系图就懂啦:
在这里插入图片描述
现在我们拿出个创建线程池的方法来分析一下吧

public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }

执行了ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int var1, int var2, long var3, TimeUnit var5, BlockingQueue<Runnable> var6, ThreadFactory var7, RejectedExecutionHandler var8) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
            if (var6 != null && var7 != null && var8 != null) {
                this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
                this.corePoolSize = var1;
                this.maximumPoolSize = var2;
                this.workQueue = var6;
                this.keepAliveTime = var5.toNanos(var3);
                this.threadFactory = var7;
                this.handler = var8;
            } else {
                throw new NullPointerException();
            }
        } else {
            throw new IllegalArgumentException();
        }
    }

可以发现用传进来的参数进行对线程池的实例化
这些参数都非常重要:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • workQueue:等待队列
  • keepAliveTime:多于线程的生命
  • threadFactory :线程工厂,用于创建线程
  • handler:拒绝策略,即当线程池和队列都满时,对继续进来的任务采取的措施。

我们现在已经构造好一个线程池了,接下来就是我们的submit()提交任务了:

for (int i = 0; i < 10; i++){
            es.submit(new MyRunnable());
        }

看看究竟做了什么事:
跟进去来到了

public Future<?> submit(Runnable var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            RunnableFuture var2 = this.newTaskFor(var1, (Object)null);
            this.execute(var2);
            return var2;
        }
    }

由于我们传进来的时Runnable所以它帮我们封装成了RunnableFuture,RunnableFuture继承了Future,所以这里调用了用ThreadPoolExecutor的execute()方法后就返回,并用Future接收
接下来再看ThreadPoolExecutor的execute()方法:

public void execute(Runnable var1) {
        if (var1 == null) {//如何任务为空,抛异常
            throw new NullPointerException();
        } else {
            int var2 = this.ctl.get();//获取线程池状态
            if (workerCountOf(var2) < this.corePoolSize) {//如果当前工作的线程小于核心线程,尝试调用addWorker()开启新线程运行任务
                if (this.addWorker(var1, true)) {
                    return;
                }

                var2 = this.ctl.get();//获取新的状态
            }

            if (isRunning(var2) && this.workQueue.offer(var1)) {//如果正在运行,则添加任务到workQueue阻塞队列
                int var3 = this.ctl.get();//获取新状态二次校验
                if (!isRunning(var3) && this.remove(var1)) {//如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
                    this.reject(var1);
                } else if (workerCountOf(var3) == 0) {//线程池为空,添加空线程
                    this.addWorker((Runnable)null, false);
                }
            } else if (!this.addWorker(var1, false)) {//满了则新增空线程,新增失败则执行拒绝策略
                this.reject(var1);
            }

        }
    }

这里很明显重要方法是addWorkder ():

private boolean addWorker(Runnable var1, boolean var2) {
        while(true) {
            int var3 = this.ctl.get();
            int var4 = runStateOf(var3);
            if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {
                return false;
            }

            while(true) {
                int var5 = workerCountOf(var3);
                if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) {
                    return false;
                }

                if (this.compareAndIncrementWorkerCount(var3)) {
                    boolean var18 = false;
                    boolean var19 = false;
                    ThreadPoolExecutor.Worker var20 = null;

                    try {
                        var20 = new ThreadPoolExecutor.Worker(var1);
                        Thread var6 = var20.thread;
                        if (var6 != null) {
                            ReentrantLock var7 = this.mainLock;
                            var7.lock();

                            try {
                                int var8 = runStateOf(this.ctl.get());
                                if (var8 < 0 || var8 == 0 && var1 == null) {
                                    if (var6.isAlive()) {
                                        throw new IllegalThreadStateException();
                                    }

                                    this.workers.add(var20);
                                    int var9 = this.workers.size();
                                    if (var9 > this.largestPoolSize) {
                                        this.largestPoolSize = var9;
                                    }

                                    var19 = true;
                                }
                            } finally {
                                var7.unlock();
                            }

                            if (var19) {
                                var6.start();
                                var18 = true;
                            }
                        }
                    } finally {
                        if (!var18) {
                            this.addWorkerFailed(var20);
                        }

                    }

                    return var18;
                }

                var3 = this.ctl.get();
                if (runStateOf(var3) != var4) {
                    break;
                }
            }
        }
    }

这里各种名字都是var n 我贴大佬的吧:
https://www.cnblogs.com/huangjuncong/p/10031525.html

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //(6) 检查队列是否只在必要时为空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //(7)循环cas增加线程个数
        for (;;) {
            int wc = workerCountOf(c);

            //(7.1)如果线程个数超限则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //(7.2)cas增加线程个数,同时只有一个线程成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //(7.3)cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //(8)到这里说明cas成功了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //(8.1)创建worker
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {

            //(8.2)加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。
            mainLock.lock();
            try {

                //(8.3)重新检查线程池状态,为了避免在获取锁前调用了shutdown接口
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //(8.4)添加任务
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //(8.5)添加成功则启动任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

都差不多,
进行一系列逻辑后最重要的是执行了
w = new Worker(firstTask)和t.start() 这了两个方法
第一个:Worker是一个内部类,实现了Runnable接口:

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable var2) {  //构造方法拿到我们传进来的任务
            this.setState(-1);
            this.firstTask = var2;
            this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);
        }
		...
    }

构造方法先把我们的任务变成自己的firstTask,之后通过ThreadFactory创建了线程,赋值给了thread,然后回去,t拿到了就t.start(),执行什么呢?其实就是run()方法
我们知道一个任务重要的在run()方法,这个实现了Runnable的Worke,也是个可执行任务哦,所以我们从它的run()方法看

public void run() {
            ThreadPoolExecutor.this.runWorker(this);
        }

其调用了runWorker()

final void runWorker(ThreadPoolExecutor.Worker var1) {
        Thread var2 = Thread.currentThread();
        Runnable var3 = var1.firstTask;
        var1.firstTask = null;
        var1.unlock();
        boolean var4 = true;

        try {
            while(var3 != null || (var3 = this.getTask()) != null) {//取等待队列的任务或者添加自己携带的任务 , 当task == null就会阻塞。
                var1.lock();
                if ((runStateAtLeast(this.ctl.get(), 536870912) || Thread.interrupted() && runStateAtLeast(this.ctl.get(), 536870912)) && !var2.isInterrupted()) {
                    var2.interrupt();
                }

                try {
                    this.beforeExecute(var2, var3);
                    Object var5 = null;

                    try {
                        var3.run();//执行我们自己写的run方法
                    } catch (RuntimeException var28) {
                        var5 = var28;
                        throw var28;
                    } catch (Error var29) {
                        var5 = var29;
                        throw var29;
                    } catch (Throwable var30) {
                        var5 = var30;
                        throw new Error(var30);
                    } finally {
                        this.afterExecute(var3, (Throwable)var5);
                    }
                } finally {
                    var3 = null;//执行完后设为空,
                    ++var1.completedTasks;
                    var1.unlock();
                }
            }

            var4 = false;
        } finally {
            this.processWorkerExit(var1, var4);
        }

    }

当我们的task不为空的时后进行循环,为空的或就用getTask()去workQueue获取,如果又为空,那就阻塞了。所以说啊,复用就在这体现了。每个线程执行完任务后就会去取任务,而不是销毁重创
getTask():

private Runnable getTask() {
        boolean var1 = false;

        while(true) {
            int var2 = this.ctl.get();
            int var3 = runStateOf(var2);
            if (var3 >= 0 && (var3 >= 536870912 || this.workQueue.isEmpty())) {
                this.decrementWorkerCount();
                return null;
            }

            int var4 = workerCountOf(var2);
            boolean var5 = this.allowCoreThreadTimeOut || var4 > this.corePoolSize;
            if (var4 <= this.maximumPoolSize && (!var5 || !var1) || var4 <= 1 && !this.workQueue.isEmpty()) {
                try {
                    Runnable var6 = var5 ? (Runnable)this.workQueue.poll(this.keepAliveTime, TimeUnit.NANOSECONDS) : (Runnable)this.workQueue.take();
                    if (var6 != null) {
                        return var6;
                    }
                    var1 = true;
              	} catch (InterruptedException var7) {
                    var1 = false;
                }
            } else if (this.compareAndDecrementWorkerCount(var2)) {
                return null;
            }
        }
    }

总结

简单的例子:
比如说我建了个线程池为5个核心大小的固定线程池
线程池一开始丢进来5个任务时,会创建线程并执行这个任务,这时候已经达到核心线程数,如果再往里丢任务,通常会直接进入workQueue等待队列
当这5个线程中的某个执行完后会不停往workQueue取任务来执行,并没有创建新线程!取不到这个线程就阻塞,while循环一直getTask()我们新丢进去任务。getTask()还会判断等待时间是否超时来回收这个线程可以看下runWorker()processWorkerExit()就知道了。

发布了16 篇原创文章 · 获赞 0 · 访问量 249

猜你喜欢

转载自blog.csdn.net/weixin_43860530/article/details/105326115