1.概述
因为需要,我在代码中写了一个程序一秒调度一次,执行一些不是很短的任务,但是具体耗时不知道,但是这个服务在现场出问题了,然后代码审核的时候,觉得这里每秒一次速度太快了。会不会出现一个还没执行完毕。另外一个调度又开始了?然后现场叠加了,导致内存中很多线程。
我于是做了个实验
package com.java.thread.demo.schedule;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author: chuanchuan.lcc
* @date: 2020-12-22 10:52
* @modifiedBy: chuanchuan.lcc
* @version: 1.0
* @description: 定时任务
*/
public class ScheduleTask {
private static ScheduledExecutorService scheduledExecutorService;
private static AtomicLong atomicLong = new AtomicLong(0L);
public static void main(String[] args) throws InterruptedException {
scheduledExecutorService = Executors.newScheduledThreadPool(3);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
atomicLong.incrementAndGet();
try {
long start = System.currentTimeMillis();
for (long i = 0; i < 10000000000L; i++) {
}
long end = System.currentTimeMillis();
System.out.println((end -start)/1000);
} catch (Exception e) {
e.printStackTrace();
}
long count = atomicLong.decrementAndGet();
System.out.println("当前线程个数:" + count);
}
}, 0, 1, TimeUnit.SECONDS);
Thread.sleep(Integer.MAX_VALUE);
}
}
执行如下
4
当前线程个数:0
3
当前线程个数:0
3
当前线程个数:0
3
发现并不是想象中的那样,我设置运行时间大概是3秒,调度时间是1秒一次,按理说应该在计算的时候,就一直产生新的线程,但是事实不是这样的,只有一个线程执行完毕了,下一个线程才开始调度。
2.源码
线程创建
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以看到最终调用的是ThreadPoolExecutor,但是传参却是Integer.MAX_VALUE,
线程无限大,这个是很不安全的,可能会导致OOM
这里的队列是DelayedWorkQueue,这个队列是阻塞队列
/**
* Specialized delay queue. To mesh with TPE declarations, this
* class must be declared as a BlockingQueue<Runnable> even though
* it can only hold RunnableScheduledFutures.
*/
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
其他解释请参考:添加链接描述