上一篇文章主要介绍了线程池相关知识点,接下来结合实例来理解其用法
实例
ExecutorService
ExecutorService是Java中对线程池定义的一个接口,它java.util.concurrent包中。Java API对ExecutorService接口的实现有两个(ThreadPoolExecutor和ScheduledThreadPoolExecutor),所以这两个即是Java线程池具体实现类。除此之外,ExecutorService还继承了Executor接口(注意区分Executor接口和Executors工厂类),这个接口只有一个execute()方法。下面是继承树。
Executors只是一个工厂类,它所有的方法返回的都是ThreadPoolExecutor
、ScheduledThreadPoolExecutor
这两个类的实例。一共可以创建四种线程池。
自定义线程池
public class CustomizeThreadPool implements Runnable {
private int taskId;
private String taskName;
public CustomizeThreadPool(int taskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("任务id:" + taskId + ",任务名称:" + taskName);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.定义阻塞队列
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(2);
// 2.定义拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 3.创建线程池(核心线程2,最大线程3,阻塞2,超出抛异常)
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), handler);
// 4.启用线程
for (int i = 0; i < 6; i++){
threadPool.execute(new CustomizeThreadPool(i, "任务" + i));
System.out.println("活跃的线程数:"+threadPool.getActiveCount() + ",核心线程数:" + threadPool.getCorePoolSize()
+ ",线程池大小:" + threadPool.getPoolSize() + ",队列的大小:" + threadPool.getQueue().size());
if (queue.size() > 0) {
System.out.println("----------------队列中阻塞的线程数" + queue.size());
}
}
// 5.关闭线程
threadPool.shutdown();
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
}
结果如下:
任务id:0,任务名称:任务0
活跃的线程数:1,核心线程数:2,线程池大小:1,队列的大小:0
活跃的线程数:2,核心线程数:2,线程池大小:2,队列的大小:0
活跃的线程数:2,核心线程数:2,线程池大小:2,队列的大小:1
----------------队列中阻塞的线程数1
活跃的线程数:2,核心线程数:2,线程池大小:2,队列的大小:2
----------------队列中阻塞的线程数2
活跃的线程数:3,核心线程数:2,线程池大小:3,队列的大小:2
----------------队列中阻塞的线程数2
任务id:1,任务名称:任务1
任务id:4,任务名称:任务4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.mydeveloping.thread.threadpool.example.CustomizeThreadPool@123a439b rejected from java.util.concurrent.ThreadPoolExecutor@7de26db8[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.mydeveloping.thread.threadpool.example.CustomizeThreadPool.main(CustomizeThreadPool.java:42)
任务id:2,任务名称:任务2
任务id:3,任务名称:任务3
结果分析:先创建线程,直到达到核心数2,接下来放入阻塞队列,直到阻塞队列2,再继续创建线程,直到最大线程数3,超出3后直接抛出RejectedExecutionException异常
自定义拒绝策略
原生的拒绝策略都不太实用,比如互联网任务,我们定义一个线程池,当线程池满了,我们要合理的处理后续的任务,比如记录下来下次再去执行,或者告知责任人那些任务没有处理等等,个人任务这个应该自己定义,当线程满了,我们可以自由控制。下面定义一个拒绝策略。
/**
* 自定义拒绝策略
* @author weimr
*/
public class CustomizeRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CustomizeThreadPool ctp = (CustomizeThreadPool)r;
System.out.println("报警信息:"+ctp.getTaskName()+" 被线程池拒绝,没有被执行");
}
}
/**
* 自定义线程池实例
* @author weimr
*/
public class CustomizeThreadPool implements Runnable {
private int taskId;
private String taskName;
public CustomizeThreadPool(int taskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("任务id:" + taskId + ",任务名称:" + taskName);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.定义阻塞队列
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(2);
// 2.定义拒绝策略
RejectedExecutionHandler handler = new CustomizeRejected();
// 3.创建线程池(核心线程2,最大线程3,阻塞2,超出抛异常)
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), handler);
// 4.启用线程
for (int i = 0; i < 6; i++){
threadPool.execute(new CustomizeThreadPool(i, "任务" + i));
System.out.println("活跃的线程数:"+threadPool.getActiveCount() + ",核心线程数:" + threadPool.getCorePoolSize()
+ ",线程池大小:" + threadPool.getPoolSize() + ",队列的大小:" + threadPool.getQueue().size());
if (queue.size() > 0) {
System.out.println("----------------队列中阻塞的线程数" + queue.size());
}
}
// 5.关闭线程
threadPool.shutdown();
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
}
结果如下:
任务id:0,任务名称:任务0
活跃的线程数:1,核心线程数:2,线程池大小:1,队列的大小:0
活跃的线程数:2,核心线程数:2,线程池大小:2,队列的大小:0
活跃的线程数:2,核心线程数:2,线程池大小:2,队列的大小:1
----------------队列中阻塞的线程数1
活跃的线程数:2,核心线程数:2,线程池大小:2,队列的大小:2
----------------队列中阻塞的线程数2
活跃的线程数:3,核心线程数:2,线程池大小:3,队列的大小:2
----------------队列中阻塞的线程数2
报警信息:任务5 被线程池拒绝,没有被执行
活跃的线程数:3,核心线程数:2,线程池大小:3,队列的大小:2
----------------队列中阻塞的线程数2
任务id:1,任务名称:任务1
任务id:4,任务名称:任务4
任务id:2,任务名称:任务2
任务id:3,任务名称:任务3
CachedThreadPool
public class CachedThreadPoolTest implements Runnable {
private int taskId;
private String taskName;
public CachedThreadPoolTest(int taskId, String taskName){
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("运行时间:" + sdf.format(new Date()) + ",任务id:" + taskId + ",任务名称:" + taskName);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.初始化CachedThreadPool
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 6; i++) {
cachedThreadPool.submit(new CachedThreadPoolTest(i, "任务" + i)); //启用
}
cachedThreadPool.shutdown();
}
执行结果:
运行时间:2018-08-18 11:25:37,任务id:0,任务名称:任务0
运行时间:2018-08-18 11:25:37,任务id:2,任务名称:任务2
运行时间:2018-08-18 11:25:37,任务id:3,任务名称:任务3
运行时间:2018-08-18 11:25:37,任务id:5,任务名称:任务5
运行时间:2018-08-18 11:25:37,任务id:1,任务名称:任务1
运行时间:2018-08-18 11:25:37,任务id:4,任务名称:任务4
FixedThreadPool
public class FixedThreadPoolTest implements Runnable {
private int taskId;
private String taskName;
public FixedThreadPoolTest(int taskId, String taskName) {
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("运行时间:" + sdf.format(new Date()) + ",任务id:" + taskId + ",任务名称:" + taskName);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.初始化FixedThreadPool
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
fixedThreadPool.submit(new FixedThreadPoolTest(i, "任务" + i)); // 启用
}
fixedThreadPool.shutdown();
}
}
执行结果:
运行时间:2018-08-18 11:28:36,任务id:4,任务名称:任务4
运行时间:2018-08-18 11:28:36,任务id:2,任务名称:任务2
运行时间:2018-08-18 11:28:36,任务id:3,任务名称:任务3
运行时间:2018-08-18 11:28:36,任务id:0,任务名称:任务0
运行时间:2018-08-18 11:28:36,任务id:1,任务名称:任务1
运行时间:2018-08-18 11:28:41,任务id:5,任务名称:任务5
SingleThreadExecutor
public class SingleThreadExecutorTest implements Runnable {
private int taskId;
private String taskName;
public SingleThreadExecutorTest(int taskId, String taskName) {
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("运行时间:" + sdf.format(new Date()) + ",任务id:" + taskId + ",任务名称:" + taskName);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.初始化SingleThreadExecutor
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 6; i++) {
singleThreadPool.submit(new SingleThreadExecutorTest(i, "任务" + i)); // 启用
}
singleThreadPool.shutdown();
}
}
执行结果:
运行时间:2018-08-18 11:33:35,任务id:0,任务名称:任务0
运行时间:2018-08-18 11:33:40,任务id:1,任务名称:任务1
运行时间:2018-08-18 11:33:45,任务id:2,任务名称:任务2
运行时间:2018-08-18 11:33:50,任务id:3,任务名称:任务3
运行时间:2018-08-18 11:33:55,任务id:4,任务名称:任务4
运行时间:2018-08-18 11:34:00,任务id:5,任务名称:任务5
ScheduledThreadPoolExecutor
1. 当线程执行时间大于时间间隔
public class ScheduledThreadPoolExecutorTest {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
public ScheduledThreadPoolExecutorTest(SimpleDateFormat sdf) {
System.out.println("运行时间:" + sdf.format(new Date()) + ",Thread:" + Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.初始化ScheduledThreadPool
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("当前时间:" + sdf.format(new Date()));
// 循环周期执行
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
new ScheduledThreadPoolExecutorTest(sdf);
System.out.println("时间:" + sdf.format(new Date()) + ",delay 5 seconds,excute every 3 seconds,活动线程数:" + Thread.activeCount());
if(Thread.activeCount() == 6) { //活跃线程数为6时关闭线程池
scheduledThreadPool.shutdown();
}
}
},5 , 3, TimeUnit.SECONDS); //延迟5秒,每3秒执行一次
}
}
执行结果:
当前时间:2018-08-18 03:29:15
运行时间:2018-08-18 03:29:20,Thread:pool-1-thread-1
时间:2018-08-18 03:29:30,delay 5 seconds,excute every 3 seconds,活动线程数:2
运行时间:2018-08-18 03:29:30,Thread:pool-1-thread-1
时间:2018-08-18 03:29:40,delay 5 seconds,excute every 3 seconds,活动线程数:3
运行时间:2018-08-18 03:29:40,Thread:pool-1-thread-2
时间:2018-08-18 03:29:50,delay 5 seconds,excute every 3 seconds,活动线程数:4
运行时间:2018-08-18 03:29:50,Thread:pool-1-thread-1
时间:2018-08-18 03:30:00,delay 5 seconds,excute every 3 seconds,活动线程数:5
运行时间:2018-08-18 03:30:00,Thread:pool-1-thread-3
时间:2018-08-18 03:30:10,delay 5 seconds,excute every 3 seconds,活动线程数:6
分析:当执行任务时间大于间隔时间,此方法不会重新开启一个新的任务进行执行,而是等待原有任务执行完成,马上开启下一个任务进行执行。此时,执行间隔时间已经被打乱。
2. 当线程执行时间小于时间间隔
public class ScheduledThreadPoolExecutorTest {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
public ScheduledThreadPoolExecutorTest(SimpleDateFormat sdf) {
System.out.println("运行时间:" + sdf.format(new Date()) + ",Thread:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// 1.初始化ScheduledThreadPool
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("当前时间:" + sdf.format(new Date()));
// 循环周期执行
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
new ScheduledThreadPoolExecutorTest(sdf);
System.out.println("时间:" + sdf.format(new Date()) + ",delay 5 seconds,excute every 3 seconds,活动线程数:" + Thread.activeCount());
if(Thread.activeCount() == 6) {//活跃线程数为6时关闭线程池
scheduledThreadPool.shutdown();
}
}
},5 , 3, TimeUnit.SECONDS); //延迟5秒,每3秒执行一次
}
}
执行结果:
当前时间:2018-08-18 03:31:56
运行时间:2018-08-18 03:32:01,Thread:pool-1-thread-1
时间:2018-08-18 03:32:02,delay 5 seconds,excute every 3 seconds,活动线程数:2
运行时间:2018-08-18 03:32:04,Thread:pool-1-thread-1
时间:2018-08-18 03:32:05,delay 5 seconds,excute every 3 seconds,活动线程数:3
运行时间:2018-08-18 03:32:07,Thread:pool-1-thread-2
时间:2018-08-18 03:32:08,delay 5 seconds,excute every 3 seconds,活动线程数:4
运行时间:2018-08-18 03:32:10,Thread:pool-1-thread-1
时间:2018-08-18 03:32:11,delay 5 seconds,excute every 3 seconds,活动线程数:5
运行时间:2018-08-18 03:32:13,Thread:pool-1-thread-3
时间:2018-08-18 03:32:14,delay 5 seconds,excute every 3 seconds,活动线程数:6
分析:在任务执行时间小于间隔时间的情况下,程序以起始时间为准则,每隔指定时间执行一次,不受任务执行时间影响。