线程池 详解 吐血整理
数哥:你们用多线程吗?
我:用啊
数哥:我们根本不需要,厉害吧
我:。。。。。。厉害
数哥:你们怎么用的啊?
我:一般都是用线程池,不会直接创建线程
数哥:线程池??? 干什么的?
我:。。。。。。 !!!!!!
1、使用线程池的好处
-
我们知道创建
-
减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
-
可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下
2、线程池创建、执行
2.1线程池的执行流程
2.2线程池创建及使用
2.2.1task源码
class Task implements Callable {
private int num1;
private int num2;
public Task(){}
public Task(int num1,int num2){
this.num1=num1;
this.num2=num2;
}
@Override
public Integer call() throws Exception {
int sum=0;
for(int i=num1;i<=num2;i++){
sum+=i;
}
return sum;
}
}
2.2.2线程池使用
/**
* 计算 400 数 和
* 使用线程池
*/
@Test
public void pooltest() throws ExecutionException, InterruptedException {
int corePoolSize = 4;
int maximumPoolSize = 4;
long keepAliveTime = 1000;
TimeUnit unit = TimeUnit.MICROSECONDS;
BlockingQueue a = new SynchronousQueue();
// ArrayBlockingQueue DelayQueue LinkedBlockingQueue PriorityBlockingQueue
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, a, threadFactory, handler);
// 创建 任务
Task t1=new Task(1,100);
Task t2=new Task(101,200);
Task t3=new Task(201,300);
Task t4=new Task(301,400);
// 让线程池自主选择一条线程执行线程任务
Future<Integer> f1=threadPoolExecutor.submit(t1);
Future<Integer> f2=threadPoolExecutor.submit(t2);
Future<Integer> f3=threadPoolExecutor.submit(t3);
Future<Integer> f4=threadPoolExecutor.submit(t4);
threadPoolExecutor.shutdown();
int sum1=f1.get();
int sum2=f2.get();
int sum3=f3.get();
int sum4=f4.get();
System.out.println(sum1+sum2+sum3+sum4);
}
3、线程池各参数 详解
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param corePoolSize 是一直存活在线程池中的线程数,即使这些线程被闲置,除非设置参数
* {@code allowCoreThreadTimeOut}
*
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param maximumPoolSize 在线程池中能存活的 最大线程数
*
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param keepAliveTime 如果线程池中的 线程数比 核心线程数(corePoolSize)大
* 此参数是 闲置线程 等待新任务执行的 最长 时间
*
* @param unit the time unit for the {@code keepAliveTime} argument
* @param unit {@code keepAliveTime} 参数的 时间单位
*
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param workQueue 此队列用于存储未执行的任务(任务实现了Runnable接口,并且通过excute方法提交)
*
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param threadFactory 线程池用来创建线程的 工厂
*
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @param handler 处理 因为达到了线程边界和队列容量而导致 执行阻塞的 策略
*
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3.1阻塞队列:BlockingQueue
ArrayBlockingQueue`,
`DelayQueue`,
`LinkedBlockingDeque`,
`LinkedBlockingQueue`,
`LinkedTransferQueue`,
`PriorityBlockingQueue`,
`SynchronousQueue
3.1.1无界队列
队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致内存飙升服务器挂掉。
3.1.2有界队列
常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
3.1.3同步移交队列
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
3.2饱和策略
3.2.1ThreadPoolExecutor.AbortPolicy
描述
直接抛异常,不处理
源码
/**
* A handler for rejected tasks that throws a
* {@link RejectedExecutionException}.
*
* This is the default handler for {@link ThreadPoolExecutor} and
* {@link ScheduledThreadPoolExecutor}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
3.2.2ThreadPoolExecutor.CallerRunsPolicy
描述
它直接在{@code execute}方法的调用线程中运行被拒绝的任务,除非执行程序已经关闭,在这种情况下任务将被丢弃
源码
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3.2.3ThreadPoolExecutor.DiscardOldestPolicy
描述
丢弃队列最前面的任务,然后重新尝试执行任务(不适合工作队列为优先队列场景)
源码
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
3.2.4ThreadPoolExecutor.DiscardPolicy
描述
新提交的任务被抛弃,但是不抛出异常
源码
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
4、JDK提供线程池
4.1Executors.newScheduledThreadPool
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return the newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
--------------------------------------------------------------------------------------------------
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
4.2Executors.newCachedThreadPool
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
*
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
4.3Executors.newFixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
4.4Executors.newSingleThreadExecutor
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new threads
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
4.5Executors.newSingleThreadScheduledExecutor
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
*
* @param threadFactory the factory to use when creating new threads
* @return the newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
4.6Executors.newWorkStealingPool
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a work-stealing thread pool using the number of
* {@linkplain Runtime#availableProcessors available processors}
* as its target parallelism level.
*
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Alibaba 编码规约
一 -(六)- 4
【强制】 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这
样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。
如有问题,欢迎大家留言指正,会虚心接纳,补充学习
请大家,点赞 收藏,谢谢