java.util.Concurrent.Executors 源码

类图

源码

package java.util.concurrent;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.security.PrivilegedActionException;
import java.security.AccessControlException;
import sun.security.util.SecurityConstants;


public class Executors {

    //构造器私有化(只能通过类去直接调用静态方法,而不允许创建类的实例对象)
    private Executors() {}

    //创建一个固定大小的线程池,以共享的无界队列方式来运行这些线程
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    //创建一个固定大小的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);
    }


    //创建一个单个线程的线程池,以无界队列方式来运行该线程
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
    }

    //创建一个单个线程的线程池,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));
    }

    //创建一个可按需自动扩容的线程池,但优先重用线程池中空闲可用的线程
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    }

    //创建一个可按需自动扩容的线程池,但优先重用线程池中空闲可用的线程,并在需要时使用提供的 ThreadFactory 创建新线程
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
    }

    //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    //创建一个在一定延迟时间后调度命令的线程池,或者周期性执行的线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    //创建一个在一定延迟时间后调度命令的线程池,或者周期性执行的线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

    //利用所有运行的处理器数目来创建一个工作窃取的线程池
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
    }

    //根据给定的并行等级,创建一个拥有足够的线程数目的工作窃取的线程池
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
    }

    //返回一个将所有已定义的ExecutorService方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法
    public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedExecutorService(executor);
    }

    //返回一个将所有已定义的ScheduledExecutorService方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法
    public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedScheduledExecutorService(executor);//ScheduledExecutorService继承了ExecutorService
    }

    //返回用于创建新线程的默认线程工厂
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    // 返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限
    public static ThreadFactory privilegedThreadFactory() {
        return new PrivilegedThreadFactory();
    }

    //返回 Callable 对象,调用它时可运行给定的任务并返回 null
    public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }

    //返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    //返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。
    public static Callable<Object> callable(final PrivilegedAction<?> action) {
        if (action == null)
            throw new NullPointerException();
        return new Callable<Object>() {
                  public Object call() { return action.run(); }
               };
    }

    //返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果
    public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
        if (action == null)
            throw new NullPointerException();
        return new Callable<Object>() {
                  public Object call() throws Exception { return action.run(); }
               };
    }

    //返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象
    public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallable<T>(callable);
    }

    //返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象
    public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
    }

    /***************内部类************/
    //实现了Callable接口(可运行给定的任务并返回给定的结果)
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;

        //构造器
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }

        //实现了Callable接口定义的call方法
        public T call() {
            task.run();//运行所定义的Runnable方法
            return result;//返回 T result
        }
    }

    //实现了Callable接口
    static final class PrivilegedCallable<T> implements Callable<T> {
        private final Callable<T> task;
        private final AccessControlContext acc;

        //构造器
        PrivilegedCallable(Callable<T> task) {
            this.task = task;
            this.acc = AccessController.getContext();
        }

        //实现了Callable接口定义的call方法
        public T call() throws Exception {
            try {
                return AccessController.doPrivileged(
                    new PrivilegedExceptionAction<T>() {
                        public T run() throws Exception {
                            return task.call();
                        }
                    }, acc);
            } catch (PrivilegedActionException e) {
                throw e.getException();
            }
        }
    }

    //实现了Callable接口
    static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
        private final Callable<T> task;
        private final AccessControlContext acc;
        private final ClassLoader ccl;

        //构造器
        PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);            
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
            this.task = task;
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
        }

        //实现了Callable接口定义的call方法
        public T call() throws Exception {
            try {
                return AccessController.doPrivileged(
                    new PrivilegedExceptionAction<T>() {
                        public T run() throws Exception {
                            Thread t = Thread.currentThread();
                            ClassLoader cl = t.getContextClassLoader();
                            if (ccl == cl) {
                                return task.call();
                            } else {
                                t.setContextClassLoader(ccl);
                                try {
                                    return task.call();
                                } finally {
                                    t.setContextClassLoader(cl);
                                }
                            }
                        }
                    }, acc);
            } catch (PrivilegedActionException e) {
                throw e.getException();
            }
        }
    }

    //实现ThreadFactory接口(创建默认线程工厂)
    static class DefaultThreadFactory implements ThreadFactory {
        //线程池大小
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        //线程组
        private final ThreadGroup group;
        //线程数
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        //线程名称
        private final String namePrefix;

        //构造器
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();//获取安全管理器对象
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();//得到当前线程组
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";//线程名称初始化
        }

        //实现ThreadFactory接口:创建新线程的方法
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);//创建一个新的线程加入到当前线程组(线程名称加1)
            if (t.isDaemon())//判断是否设置了后台守护标志
                t.setDaemon(false);//设为后台线程
            if (t.getPriority() != Thread.NORM_PRIORITY)//将线程优先级统统设置为5
                t.setPriority(Thread.NORM_PRIORITY);//不为5的统统改成5
            return t;//返回创建的线程对象
        }
    }

    //继承DefaultThreadFactory,增加成员变量,重写newThread方法
    static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final AccessControlContext acc;

        private final ClassLoader ccl;//类加载器

        PrivilegedThreadFactory() {
            super();
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
        }

        //重写newThread方法
        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        public Void run() {
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }

    //继承AbstractExecutorService抽象类
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;

        //构造器
        DelegatedExecutorService(ExecutorService executor) { e = executor; }

        //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public void execute(Runnable command) { e.execute(command); }

        //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public void shutdown() { e.shutdown(); }

        //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public boolean isShutdown() { return e.isShutdown(); }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public boolean isTerminated() { return e.isTerminated(); }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }

       //具体实现依赖于传入的ExecutorService的实现类中定义的方法
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }

    //继承DelegatedExecutorService类,重写了finalize()回收方法
    static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
        //构造器
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }

        //重写finalize()回收方法
        protected void finalize() {
            super.shutdown();
        }
    }

    //在DelegatedExecutorService的基础上,增加了对ScheduledExecutorService接口的实现
    static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService {
        private final ScheduledExecutorService e;

        //构造器
        DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
        }

        //具体实现依赖于ScheduledExecutorService接口的实现
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return e.schedule(command, delay, unit);
        }

        //具体实现依赖于ScheduledExecutorService接口的实现
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return e.schedule(callable, delay, unit);
        }

        //具体实现依赖于ScheduledExecutorService接口的实现
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            return e.scheduleAtFixedRate(command, initialDelay, period, unit);
        }

        //具体实现依赖于ScheduledExecutorService接口的实现
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    }
}

实现原理:

    

    1.newFixedThreadPool、newCachedThreadPool方法,最终都调用了ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                          TimeUnit unit, BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • corePoolSize:核心池的大小
  • maxmumPoolSize:池中允许的最大线程数
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

        TimeUnit.DAYS; //天
        TimeUnit.HOURS; //小时
        TimeUnit.MINUTES; //分钟
        TimeUnit.SECONDS; //秒
        TimeUnit.MILLISECONDS; //毫秒
        TimeUnit.MICROSECONDS; //微妙
        TimeUnit.NANOSECONDS; //纳秒

  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

        ArrayBlockingQueue;
        LinkedBlockingQueue;
        SynchronousQueue;

  • threadFactory:创建新线程的工厂。
  • handler:(拒绝策略)当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

    ThreadPoolExecutor继承了AbstractExecutorService抽象类。

    2.newSingleThreadExecutor()直接调用了FinalizableDelegatedExecutorService的构造方法:

    FinalizableDelegatedExecutorService(ExecutorService executor) {
       super(executor);
    }

    FinalizableDelegatedExecutorService继承了DelegatedExecutorService:

DelegatedExecutorService(ExecutorService executor) {
       e = executor;
    }

    DelegatedExecutorService继承了AbstractExecutorService。

    3.newSingleThreadScheduledExecutor()调用了DelegatedScheduledExecutorService的构造方法:

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
       super(executor);
       e = executor;
    }

    DelegatedScheduledExecutorService 继承了 DelegatedExecutorService 实现了 ScheduledExecutorService接口。

    4.newScheduledThreadPool()调用了ScheduledThreadPoolExecutor的构造方法:

    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
       super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
    }

    ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 实现了 ScheduledExecutorService 接口

    因此,最终调用的还是ThreadPoolExecutor的构造方法。

    5.newWorkStealingPool()最终调用了ForkJoinPool类的私有构造方法:

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

        此处不做具体分析,ForkJoinPool继承了AbstractExecutorService抽象类。

    综合分析,可以发现Executors类,主要提供了创建ExecutorServiceScheduledExecutorServiceThreadFactory 和 Callable 类的工厂和实用方法。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)

    创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地 关闭之前,池中的线程将一直存在。

    参数:

    nThreads - 池中的线程数

    返回:

        新创建的线程池

    抛出:

    IllegalArgumentException - 如果 nThreads <= 0

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads,
                                                 ThreadFactory threadFactory)

    创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地 关闭之前,池中的线程将一直存在。

    参数:

    nThreads - 池中的线程数

    threadFactory - 创建新线程时使用的工厂

    返回:

        新创建的线程池

    抛出:

    NullPointerException - 如果 threadFactory 为 null

    IllegalArgumentException - 如果 nThreads <= 0

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()

    创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

    返回:

        新创建的单线程 Executor

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

    创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。与其他等效的 newFixedThreadPool(1, threadFactory) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

    参数:

    threadFactory - 创建新线程时使用的工厂

    返回:

        新创建的单线程 Executor

    抛出:

    NullPointerException - 如果 threadFactory 为 null

newCachedThreadPool

public static ExecutorService newCachedThreadPool()

    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。

    返回:

        新创建的线程池

newCachedThreadPool

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。

    参数:

    threadFactory - 创建新线程时使用的工厂

    返回:

        新创建的线程池

    抛出:

    NullPointerException - 如果 threadFactory 为 null

newSingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor()

    创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程会代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newScheduledThreadPool(1) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

    返回:

        新创建的安排执行程序

newSingleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

    创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程会代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newScheduledThreadPool(1, threadFactory) 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。

    参数:

    threadFactory - 创建新线程时使用的工厂

    返回:

        新创建的安排执行程序

    抛出:

    NullPointerException - 如果 threadFactory 为 null

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

    参数:

    corePoolSize - 池中所保存的线程数,即使线程是空闲的也包括在内。

    返回:

        新创建的安排线程池

    抛出:

    NullPointerException - 如果 threadFactory 为 null

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

    参数:

    corePoolSize - 池中所保存的线程数,即使线程是空闲的也包括在内

    threadFactory - 执行程序创建新线程时使用的工厂

    返回:

        新创建的安排线程池

    抛出:

    IllegalArgumentException - 如果 corePoolSize < 0

    NullPointerException - 如果 threadFactory 为 null

unconfigurableExecutorService

public static ExecutorService unconfigurableExecutorService(ExecutorService executor)

    返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。这提供了一种可安全地“冻结”配置并且不允许调整给定具体实现的方法。

    参数:

    executor - 底层实现

    返回:

        一个 ExecutorService 实例

    抛出:

    NullPointerException - 如果 executor 为 null

unconfigurableScheduledExecutorService

public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)

    返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。这提供了一种可安全地“冻结”配置并且不允许调整给定具体实现的方法。

    参数:

    executor - 底层实现

    返回:

        一个 ScheduledExecutorService 实例

    抛出:

    NullPointerException - 如果 executor 为 null

defaultThreadFactory

public static ThreadFactory defaultThreadFactory()

    返回用于创建新线程的默认线程工厂。此工厂创建同一 ThreadGroup 中 Executor 使用的所有新线程。如果有 SecurityManager,则它使用 System.getSecurityManager() 组来调用此 defaultThreadFactory 方法,其他情况则使用线程组。每个新线程都作为非守护程序而创建,并且具有设置为 Thread.NORM_PRIORITY 中较小者的优先级以及线程组中允许的最大优先级。新线程具有可通过 pool-N-thread-M 的 Thread.getName() 来访问的名称,其中 N 是此工厂的序列号, M 是此工厂所创建线程的序列号。

    返回:

        线程工厂

privilegedThreadFactory

public static ThreadFactory privilegedThreadFactory()

    返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。此工厂创建具有与 defaultThreadFactory() 相同设置的线程,新线程的 AccessControlContext 和 contextClassLoader 的其他设置与调用此 privilegedThreadFactory 方法的线程相同。可以在 AccessController.doPrivileged(java.security.PrivilegedAction ) 操作中创建一个新 privilegedThreadFactory,设置当前线程的访问控制上下文,以便创建具有该操作中保持的所选权限的线程。

    注意,虽然运行在此类线程中的任务具有与当前线程相同的访问控制和类加载器,但是它们无需具有相同的 ThreadLocal 或 InheritableThreadLocal 值。如有必要,使用 ThreadPoolExecutor.beforeExecute(java.lang.Thread, java.lang.Runnable) 在 ThreadPoolExecutor 子类中运行任何任务前,可以设置或重置线程局部变量的特定值。另外,如果必须初始化 worker 线程,以具有与某些其他指定线程相同的 InheritableThreadLocal 设置,则可以在线程等待和服务创建请求的环境中创建自定义的 ThreadFactory,而不是继承其值。

    返回:

        线程工厂

    抛出:

    AccessControlException - 如果当前访问控制上下文没有获取和设置上下文类加载器的权限。

callable

public static <T> Callable<T> callable(Runnable task,T result)

    返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。这在把需要 Callable 的方法应用到其他无结果的操作时很有用。

    参数:

    task - 要运行的任务

    result - 返回的结果

    返回:

        一个 callable 对象

    抛出:

    NullPointerException - 如果 task 为 null

callable

public static Callable<Object> callable(Runnable task)

    返回 Callable 对象,调用它时可运行给定的任务并返回 null。

    参数:

    task - 要运行的任务

    返回:

        一个 callable 对象

    抛出:

    NullPointerException - 如果 task 为 null

callable

public static Callable<Object> callable(PrivilegedAction<?> action)

    返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。

    参数:

    action - 要运行的特权操作

    返回:

        一个 callable 对象

    抛出:

    NullPointerException - 如果 action 为 null

callable

public static Callable<Object> callable(PrivilegedExceptionAction<?> action)

    返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果。

    参数:

    action - 要运行的特权异常操作

    返回:

        一个 callable 对象

    抛出:

    NullPointerException - 如果 action 为 null

privilegedCallable

public static <T> Callable<T> privilegedCallable(Callable<T> callable)

    返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象。通常应该在 AccessController.doPrivileged(java.security.PrivilegedAction ) 操作中调用此方法,以便创建 callable 对象,并且如有可能,则在该操作中保持的所选权限设置下执行此对象;如果无法调用,则抛出相关的 AccessControlException

    参数:

    callable - 底层任务

    返回:

        一个 callable 对象

    抛出:

    NullPointerException - 如果 callable 为 null

privilegedCallableUsingCurrentClassLoader

public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable)

    返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象。通常应该在 AccessController.doPrivileged(java.security.PrivilegedAction )操作中调用此方法,以创建 callable 对象,并且如有可能,则在该操作中保持的所选权限设置下执行此对象;如果无法调用,则抛出相关的 AccessControlException

    参数:

    callable - 底层任务

    返回:

        一个 callable 对象

    抛出:

    NullPointerException - 如果 callable 为 null

    AccessControlException - 如果当前的访问控制上下文没有设置和获得上下文类加载器的权限。

常用的4种线程池的使用:

    通过Executors提供四种线程池:newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool。

1.public static ExecutorService newFixedThreadPool(int nThreads) 
创建固定数目线程的线程池。

2.public static ExecutorService newCachedThreadPool() 
创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

3.public static ExecutorService newSingleThreadExecutor() 
创建一个单线程化的Executor。

4.public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    newFixedThreadPool创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,提交的任务则进入队列等待,等着有闲置的线程来执行这些任务。它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            Runnable syncRunnable = new Runnable() {
                public void run() {
                   //...
                }
            };
            executorService.execute(syncRunnable);
        }

    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

  • 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
  • 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。
  • 如果存在某一线程持续一段时间没有工作(默认为1分钟),则该线程就会销毁回收。
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            Runnable syncRunnable = new Runnable() {
                public void run() {
                    //...
                }
            };
            executorService.execute(syncRunnable);
        }

    newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。特点:有且仅有一个工作线程执行任务,所有任务按照指定顺序执行,即遵循队列的入队出队规则。

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 20; i++) {
            Runnable syncRunnable = new Runnable() {
                @Override
                public void run() {
                   //...
                }
            };
            executorService.execute(syncRunnable);
        }

    newScheduledThreadPool创建一个定长的线程池,而且支持定时或周期性的任务执行。

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 20; i++) {
            Runnable syncRunnable = new Runnable() {
                public void run() {
                   //...
                }
            };
            executorService.schedule(syncRunnable, 5000, TimeUnit.MILLISECONDS);//延迟5秒执行
        }
  • 1.延迟执行:
executorService.schedule(new Runnable(), 3, TimeUnit.SECONDS);//延迟3秒执行
  • 2. 周期执行:
executorService.scheduleAtFixedRate(new Runnable(),3, 7, TimeUnit.SECONDS);//延迟3秒后执行任务,从开始执行任务开始计时,每7秒执行一次不管执行任务需要多长的时间
executorService.scheduleWithFixedDelay(new Runnable(),3, 7, TimeUnit.SECONDS);//延迟3秒后执行任务,从任务完成时开始计时,每7秒执行一次需要等到任务执行完成才开始计时

        注意:scheduleAtFixedRate()和scheduleWithFixedDelay()两者之间有细微的不同。

猜你喜欢

转载自my.oschina.net/u/3858564/blog/2882235