本文导读
- 承接《线程池理论 1》,本文主要开始结束线程池的基本创建以及使用
- 本文所指的线程池就是指"ThreadPoolExecutor"
- ThreadPoolExecutor类继承AbstractExecutorService抽象类, 抽象类AbstractExecutorService实现ExecutorService接口,ExcutorService接口继承Executor接口
创建线程池
构造器创建
- 直接使用ThreadPoolExecutor的构造器创建线程池
- ThreadPoolExecutor类提供了四个构造器,常用的如下所示:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
- @param corePoolSize :核心线程数,池中所保存的线程数,包括空闲线程。活动线程小于corePoolSize时则直接创建,大于等于时则添加到workQueue(工作队列)中等待执行。
- @param maximumPoolSize : 池中允许的最大线程数。每个线程都有自己约定大小的workQueue(工作队列),如果corePoolSize已经满了,而且所有核心线程的workQueue也已经排满了,当前线程池活动线程总数又小于maximumPoolSize时,则创建新的线程执行任务。如果线程池中活动线程数已经超过maximumPoolSize了,则执行饱和策略(handler)
- @param keepAliveTime :大于核心线程数的线程,是因为corePoolSize不够,临时新建的线程,所以当线程执行任务完成后,就要关闭多余的线程(它们此时处于空闲状态),直到线程池中线程的数量恢复到corePoolSize的大小。为了提高线程的利用效率,可约定空闲线程保持存活的时间,大小为keepAliveTime。对于数量多,时间短的任务,可以适当将keepAliveTime放大一点。
- @param unit : keepAliveTime 参数的时间单位。
- @param workQueue :此工作队列仅保持由 execute 方法提交的 Runnable 任务。每个活动(工作)线程都有一个工作队列,然后从这里获取任务依次进行执行
- @param handler:当线程池中线程总数已经超过了maximumPoolSize,且每个线程的workQueue(工作队列)也已经排满时,则执行饱和策略。通俗的说是线程池中的任务已经爆满了,此时对于新加的任务应该如何处理?可以参考《线程池理论》中的饱和策略部分。
Executors创建
- Excutors类支持以下各种简洁高效的用法:
- 创建并返回设置有常用配置字符串的 ExecutorService 的方法,而线程池ThreadPoolExecutor的父类就是实现了ExecutorService接口。
- 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
- 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
- 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
- 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。
- Excutors创建线程池的优点是简单,它的方法内部仍然使用的是ThreadPoolExecutor构造器进行创建的,只是采用了一些默认值来代替而已,常用方法的如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 创建一个可缓存(大小可伸缩)线程池
- 掌握了上面的ThreadPoolExecutor的构造器创建线程池之后,它的这些默认值也就一目了然了:核心线程数为0、线程池最大线程数为2147483647,空闲线程保留时间为60秒,SynchronousQueue是直接提交排队策略。
- 它没有指定线程饱和策略参数(RejectedExecutionHandler),此时默认使用AbortPolicy(即在任务超过时抛出异常),因为线程总数为无穷大,所以用不到包含策略
- 这其实和我们直接使用构造器创建其实是一样的,线程池为无限大,执行效率高,因为核心线程数为0,所以会及时回收空闲线程,适合处理数量多时间短的任务
- 注意:超时时间默认为60秒,所以当线程闲置时间达到1分钟时,整个线程池中的线程就会关闭,线程池销毁
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
- 可以看出核心线程数和线程池总数一致,因为LinkedBlockingQueue是无界排队策略,maximumPoolSize设置是无效的
- 没有指定线程饱和策略时,默认使用AbortPolicy(任务总数超过时抛出异常),因为是无界排队策略,所以也用不到饱和策略
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 创建一个线程的线程池------虽然池中只有一个线程,也要好于用Thread于Runnable的两种方式
- 核心线程数与最大线程数都为1,空闲线程等待时间为0s,使用的是无界排队策略,即工作队列没有上限,因为没有上限
- 因为采用无界排队策略,所以用不到饱和策略
- 它只会用唯一的工作线程来执行任务,保证所有任务按照先进先出的顺序执行。
- 现行大多数GUI程序都是单线程的,特别是Android应用
向线程池提交任务
- 可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。
- execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功,submit()方法用于提交需要返回值的任务。
execute()
- public void execute(Runnable command)--只接受Runnable参数,而Runnale的run方法是没返回值的,所以execute方法同样没有返回值。
/** * Created by Administrator on 2018/6/8 0008. * 线程池任务执行类 */ public class ThreadPoolTask implements Runnable, Serializable { /** * threadPoolTaskData:模拟调用线程传入的数据 * threadPool:为了统计信息而传入进来的 */ private Integer threadPoolTaskData; private ThreadPoolExecutor threadPool; ThreadPoolTask(Integer tasks, ThreadPoolExecutor threadPool) { this.threadPoolTaskData = tasks; this.threadPool = threadPool; } public void run() { for (int i = 0; i < 2; i++) { System.out.println("--------线程名:" + Thread.currentThread().getName() + ":" + (threadPoolTaskData++)); try { /**用延时来模拟线程在操作*/ Thread.sleep(2000); } catch (Exception e) { System.out.println("支线程:"+e.getMessage()); e.printStackTrace(); } System.out.println("/线程池内线程数量为:" + threadPool.getPoolSize()); } } }
/** * Created by Administrator on 2018/6/8 0008. * 线程池测试类 */ public class ThreadPoolExecutorTest { public static void main(String[] args) { /** 构造器方式构造线程池*/ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(6), new ThreadPoolExecutor.DiscardOldestPolicy()); /**循环产生多个任务,并将其加入到线程池去执行*/ for (int i = 0; i <= 3; i++) { try { threadPool.execute(new ThreadPoolTask(1, threadPool)); /**便于观察,延时*/ Thread.sleep(500); } catch (Exception e) { System.out.println("异常:"+e.getMessage()); e.printStackTrace(); } } System.out.println("主线程已完毕"); } }
-----------结果输出---------
--------线程名:pool-1-thread-1:1
--------线程名:pool-1-thread-2:1
--------线程名:pool-1-thread-3:1
/线程池内线程数量为:3
--------线程名:pool-1-thread-1:2
主线程已完毕
/线程池内线程数量为:3
--------线程名:pool-1-thread-2:2
/线程池内线程数量为:3
--------线程名:pool-1-thread-3:2
/线程池内线程数量为:3
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:3
/线程池内线程数量为:3
/线程池内线程数量为:3
--------线程名:pool-1-thread-1:2
/线程池内线程数量为:3
submit()
提交Runnable任务
- public Future<?> submit(Runnable task)
- submit方法照样接受Runnable参数,Future的get用于获取新线程的返回值,而Runnable的run方法本身并无返回,所以任务完成时,Future的get方法返回null
- 单纯的submit()方法并不会阻塞当前线程,而是调用Future的get方法时才阻塞当前线程,类似TCP。所以如果不使用Future的get方法而导致阻塞线程时,此方法与executr(Runnable command)方法一样
- 开发中如果新开的线程不需要有返回值,那么使用executr(Runnable command)、submit(Runnable task)即可
/** * Created by Administrator on 2018/6/8 0008. * 线程池任务执行类 */ public class ThreadPoolTask implements Runnable, Serializable { /** * threadPoolTaskData:模拟调用线程传入的数据 * threadPool:为了统计信息而传入进来的 */ private Integer threadPoolTaskData; private ThreadPoolExecutor threadPool; ThreadPoolTask(Integer tasks, ThreadPoolExecutor threadPool) { this.threadPoolTaskData = tasks; this.threadPool = threadPool; } public void run() { for (int i = 0; i < 2; i++) { System.out.println("--------线程名:" + Thread.currentThread().getName() + ":" + (threadPoolTaskData++)); try { /**用延时来模拟线程在操作*/ Thread.sleep(2000); } catch (Exception e) { System.out.println("支线程:"+e.getMessage()); e.printStackTrace(); } System.out.println("/线程池内线程数量为:" + threadPool.getPoolSize()); } } }
/** * Created by Administrator on 2018/6/8 0008. * 线程池测试类 */ public class ThreadPoolExecutorTest { public static void main(String[] args) { /** 构造器方式构造线程池*/ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(6), new ThreadPoolExecutor.DiscardOldestPolicy()); /**循环产生多个任务,并将其加入到线程池去执行*/ for (int i = 0; i <= 3; i++) { try { /**submit方法提交线程任务 * 单纯的submit方法以及返回Future对象引用,是不会阻塞main主线程的*/ Future future = threadPool.submit(new ThreadPoolTask(1, threadPool)); /**只有当Future调用get方法获取线程返回值时,main主线程就会阻塞 * 会一直等到线程执行完成main线程才会继续执行,Runnable的run方法无返回值,所以下面输出null * 实际开发中提交Runnable任务时,很少去接收它的null返回值,所以基本不做get阻塞操作*/ System.out.println(future.get()); /**便于观察,延时*/ Thread.sleep(500); } catch (Exception e) { System.out.println("异常:"+e.getMessage()); e.printStackTrace(); } } System.out.println("主线程已完毕"); } }
-------------结果输出----------------------
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:1
--------线程名:pool-1-thread-1:2
/线程池内线程数量为:1
null
--------线程名:pool-1-thread-2:1
/线程池内线程数量为:2
--------线程名:pool-1-thread-2:2
/线程池内线程数量为:2
null
--------线程名:pool-1-thread-3:1
/线程池内线程数量为:3
--------线程名:pool-1-thread-3:2
/线程池内线程数量为:3
null
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:3
--------线程名:pool-1-thread-1:2
/线程池内线程数量为:3
null
主线程已完毕
提交Callable任务
- public <T> Future<T> submit(Callable<T> task)
- Callable接口类似Runnable接口,都是为了执行新线程的。区别一:Runnable的run方法没有返回值,Callable的call返回有返回值;区别二:Runable接口的子类Thread有start方法,所以Runnable接口借助Thread类也可以开线程,而Callable得借助线程池submit方法
/** * Created by Administrator on 2018/6/11 0011. * 使用方式与Runnable接口是一样的 */ public class ThreadPoolCall implements Callable { /** * threadPoolTaskData:模拟调用线程传入的数据 * threadPool:为了统计信息而传入进来的 */ private Integer threadPoolTaskData; private ThreadPoolExecutor threadPool; ThreadPoolCall(Integer tasks, ThreadPoolExecutor threadPool) { this.threadPoolTaskData = tasks; this.threadPool = threadPool; } @Override public Object call() throws Exception { for (int i = 0; i < 2; i++) { System.out.println("--------线程名:" + Thread.currentThread().getName() + ":" + (threadPoolTaskData++)); try { /**用延时来模拟线程在操作*/ Thread.sleep(2000); } catch (Exception e) { System.out.println("支线程:"+e.getMessage()); e.printStackTrace(); } System.out.println("/线程池内线程数量为:" + threadPool.getPoolSize()); } /**call方法返回参数,会由Future的get方法获取*/ return Thread.currentThread().getName()+"执行完成..."; } }
/** * Created by Administrator on 2018/6/8 0008. * 线程池测试类 */ public class ThreadPoolExecutorTest { public static void main(String[] args) { /** 构造器方式构造线程池*/ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(6), new ThreadPoolExecutor.DiscardOldestPolicy()); /**循环产生多个任务,并将其加入到线程池去执行*/ for (int i = 0; i <= 3; i++) { try { /**单纯的submit方法以及返回Future对象引用,是不会阻塞main主线程的*/ Future future = threadPool.submit(new ThreadPoolCall(1, threadPool)); /**Future调用get方法获取线程返回值时,main主线程会被阻塞 * 会一直等到线程执行完成main线程才会继续执行,get方法会接收Callable中call方法的返回值*/ System.out.println("future.get():"+future.get()); /**便于观察,延时*/ Thread.sleep(500); } catch (Exception e) { System.out.println("异常:"+e.getMessage()); e.printStackTrace(); } } System.out.println("主线程已完毕"); } }
------------结果输出-------------
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:1
--------线程名:pool-1-thread-1:2
/线程池内线程数量为:1
future.get():pool-1-thread-1执行完成...
--------线程名:pool-1-thread-2:1
/线程池内线程数量为:2
--------线程名:pool-1-thread-2:2
/线程池内线程数量为:2
future.get():pool-1-thread-2执行完成...
--------线程名:pool-1-thread-3:1
/线程池内线程数量为:3
--------线程名:pool-1-thread-3:2
/线程池内线程数量为:3
future.get():pool-1-thread-3执行完成...
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:3
--------线程名:pool-1-thread-1:2
/线程池内线程数量为:3
future.get():pool-1-thread-1执行完成...
主线程已完毕
Future get方法定时阻塞
- V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException
- Future的get()方法会无限期的阻塞到线程执行完毕,类似TCP连接超时,这里也提供了重载的方法进行超时设置,timeout为超时时间,unit为时间单位
/** * Created by Administrator on 2018/6/8 0008. * 线程池测试类 */ public class ThreadPoolExecutorTest { public static void main(String[] args) { /** 构造器方式构造线程池*/ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(6), new ThreadPoolExecutor.DiscardOldestPolicy()); /**循环产生多个任务,并将其加入到线程池去执行*/ for (int i = 0; i <= 3; i++) { try { /**单纯的submit方法以及返回Future对象引用,是不会阻塞main主线程的*/ Future future = threadPool.submit(new ThreadPoolCall(1, threadPool)); /**Future调用get方法获取线程返回值时,main主线程会被阻塞 * 但现在只会阻塞3秒,超过3秒线程如果没有执行完毕返回时,这里直接抛超时异常*/ System.out.println("future.get():"+future.get(3,TimeUnit.SECONDS)); /**便于观察,延时*/ Thread.sleep(500); } catch (Exception e) { System.out.println("异常:"+e.getMessage()); e.printStackTrace(); } } System.out.println("主线程已完毕"); } }
-----------------结果输出-----------------------
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:1
--------线程名:pool-1-thread-1:2
异常:null
java.util.concurrent.TimeoutException
--------线程名:pool-1-thread-2:1
at java.util.concurrent.FutureTask.get(FutureTask.java:201)
at test.threadPool.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
/线程池内线程数量为:2
/线程池内线程数量为:2
--------线程名:pool-1-thread-2:2
java.util.concurrent.TimeoutException
异常:null
at java.util.concurrent.FutureTask.get(FutureTask.java:201)
--------线程名:pool-1-thread-3:1
/线程池内线程数量为:3
/线程池内线程数量为:3
--------线程名:pool-1-thread-3:2
java.util.concurrent.TimeoutException
异常:null
at java.util.concurrent.FutureTask.get(FutureTask.java:201)
--------线程名:pool-1-thread-1:1
/线程池内线程数量为:3
/线程池内线程数量为:3
--------线程名:pool-1-thread-1:2
异常:null
java.util.concurrent.TimeoutException
主线程已完毕
/线程池内线程数量为:3