简介
Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executors:提供了一系列静态工厂方法用于创建各种线程池
Executors:通过ThreadFactory创建工作线程
ThreadFactory
接口ThreadFactory为产生线程的工厂
public interface ThreadFactory {
Thread newThread(Runnable r);
}
具体实现类
DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//线程Name前缀
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//设置为非守护线程
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
PrivilegedThreadFactory
//创建的新线程与当前线程具有相同的权限
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
RejectedExecutionHandler
在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,可以指定处理策略。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
针对这些问题java线程池提供了以下几种策略:
- AbortPolicy
该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//不做任何处理,直接抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
- DiscardPolicy
这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//就是一个空的方法
}
- DiscardOldestPolicy
这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//移除队头元素
e.getQueue().poll();
//再尝试入队
e.execute(r);
}
}
- CallerRunsPolicy
使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//直接执行run方法
r.run();
}
}
- 自定义
如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。
线程池创建
newFixedThreadPool
创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。
//corePoolSize==maximumPoolSize
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
new LinkedBlockingQueue<Runnable>()
);
}
newSingleThreadExecutor
创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务
//corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
//corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
(new ThreadPoolExecutor(1,
1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
newScheduledThreadPool
创建一个可延迟执行或定期执行的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newCachedThreadPool
创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重 用可用线程,否则就新建一条线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
//使用同步队列,将任务直接提交给线程
new SynchronousQueue<Runnable>());
}
扩展ExecutorService
DelegatedExecutorService
代理模式,封装其他ExecutorService,仅封装需要的方法
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
FinalizableDelegatedExecutorService
DelegatedExecutorService的子类,实现finalize方法。
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
DelegatedScheduledExecutorService
DelegatedExecutorService的子类,实现schedule方法。
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}