阿里规约中 并发处理 章节提到。
3. 【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
4. 【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下: 1) FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 2) CachedThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
以下为工具类Executors,提供了简便创建线程池的方法。
Executors类创建线程池的方法,实际是调用ThreadPoolExecutor的构造方法。
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
// 默认的线程池工厂,当不指定明确的时候,使用此类
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
LinkedBlockingQueue是默认使用的阻塞队列实现,LinkedBlockingQueue初始化的部分代码
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final int capacity;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
}
阻塞队列初始化的容量过大,为 2的31次幂-1
public final class Integer extends Number implements Comparable<Integer> {
//@Native注解修饰成员变量,则表示这个变量可以被‘本地代码‘引用,常常被代码生成工具使用
@Native public static final int MAX_VALUE = 0x7fffffff;
}
Java 线程池和多线程编程 ——线程池理解与创建
JDK1.5 引入了 Executor框架 ,对任务提交和执行进行解耦 , 定义任务后交由线程池执行。
线程池是由java.util.concurrent 包中Executors类的工厂方法创建线程池。
阿里巴巴规范:
- 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
- 说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。
- 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
线程池的优势:
- 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核的充分利用,还能防止过分调度。
- 可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
阿里巴巴规范:
-
线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,这样可以明确的运行规则,规避资源耗尽的风险
-
newFixedThreadPool和newSingleThreadExecutor 的弊端:
都是无界队列,允许的队列长度都是Integer.MAX_VALUE, 请求可能大量堆积,最后造成JVM的OOM内存溢出
- newCachedThreadPool 的弊端:
允许的创建线程数量为Integer.MAX_VALUE, 可能创建出大量线程,最后造成JVM的OOM内存溢出
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
public static ExecutorService newFixedThreadPool(int nThreads) nThreads
- 池中的线程数
ExecutorService pool = Executors.newFixedThreadPool(6);
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
public static ExecutorService newSingleThreadExecutor()
ExecutorService threadPool = Executors.newSingleThreadExecutor();
创建一个安排在给定延迟后运行命令或者定期地执行的拥有3个线程的线程池。
初始延迟为6秒 , 间隔定时为2秒 的定时任务。
Executors.newScheduledThreadPool(3).scheduleAtFixedRate( ()-> {
System.out.println("bombing!");
}, 6, 2, TimeUnit.SECONDS);
}
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。public static ExecutorService newSingleThreadExecutor()
自定义线程池创建:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize 核心线程数 (最开始的线程数)
-
maximumPoolSize 最大线程数 (可以扩展的最大线程数)
-
keepAliveTime 线程存活时间
-
TimeUnit unit 存活时间单位
-
BlockingQueue workQueue 任务队列
-
ThreadFactory threadFactory 线程工厂 (BlockingQueue)
-
RejectedExecutionHandler handler (线程池繁忙,任务队列满了这种情况下,需要执行拒绝策略2)
默认拒绝策略(① Abort :抛异常 ② Discard: 扔掉,不抛异常 ③ DiscardOldest ④ CallerRuns:调用者处理服务)
可以自定义拒绝策略 , 实现RejectedExecutionHandler接口,重写rejectedExecution方法比如线程池处理订单业务,任务队列满了,实现自己的拒绝策略将 订单数据保存至 kafka 、mysql 、redis 等
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("redisTest-pool-%d").build();
ExecutorService ThreadPool = new ThreadPoolExecutor(4, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
线程池框架提供了两种方式提交任务,根据不同的业务需求选择不同的方式。
① Executor.execute() void execute(Runnable command)
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由 Executor
实现决定。
② ExecutorService.submit()
Future |
submit(**Callable** task) 提交一个返回值的任务用于执行,返回一个表示任务的未决结Future。 |
---|---|
Future |
submit(**Runnable** task) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 |
Future |
submit(**Runnable task, T result**) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 |
两个方法都可以向线程池提交任务,execute()方法的返回类型是void,它定义在Executor接口中,
而submit()方法可以返回持有计算结果的Future对象,它定义在ExecutorService接口中