多线程编程可以很大程度上的运用cpu资源。但是线程数量过大时,频繁的创建和销毁线程,会占用很多cpu和内存资源,从而影响系统稳定性,降低代码效率。为了避免频繁的创建和销毁线程,可以让现场进行复用,线程池就满足了这一需求。线程池中存在着创建好的很多线程,这些线程处于空闲状态,当程序需要使用线程时,从线程池中取几个空闲线程。当任务完成后,线程并不立即关闭,而是将线程退还给线程池,供给其他程序使用。线程池就相当于一个工具箱,里面有若干把螺丝刀,每当一个工人需要用螺丝刀的时候,就从中拿一把出来,用完再还回去。过了不久,又有工人来了,再随机从工具箱里拿把螺丝刀用完再还回去。但是如果工具箱里只有10把螺丝刀,但是20个工人同时都要使用,那咋办呢,只能随机十个工人先用,用完了把螺丝刀换回去了,剩下的人再按随机顺序拿来用。等没有人需要螺丝刀,也就是没有程序需要使用到线程,或者程序执行完毕了,就可以关闭线程池了。
一、线程池的创建
1、固定大小的线程池
newFixedThreadPool()创建了固定大小的线程池,示例如下。
public class FixedThreadPool {
static class MyThread implements Runnable{
@Override
public void run() {
System.err.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0;i<20;i++) {
exec.submit(new MyThread());
}
exec.shutdown();
}
}
newFixedThreadPool(5)线程池中有5个线程,向线程池提交了20个任务,每个任务暂停1秒,线程池分4次处理,每次运行5个线程,一共花了4秒完成任务。
2、计划任务
newScheduledThreadPool方法返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的主要方法如下
public ScheduledFuture<?> schedule(Runnable command,long delay,Timeout unit);
public ScheduledFuture<?> schedule(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
publicd ScheduleFuture(?) SchedleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
Timeout unit);
与其他几个线程不同,ScheduledExecutorService并不一定会立即执行任务,它起到了定时任务的作用,它会在指定的时间对任务进行调度。方法schedule()会在给定的时间对任务进行一次调度,方法scheduleAtFixedRate()和scheduleWithFixedRate()会对任务进行周期性的调用,但是两者有细微的差别。
ScheduleAtFixedRate()在指定的initialDelay时间后开始执行任务,它将规定时间长短的period作为周期。每个任务在一个周期中进行,比如intialDelay为1s,period为2s,那么产生效果为任务等待1s之后开始运行,之后每2s时间内执行一次任务。如果任务在period规定的时间内执行,那么下一次执行任务就要等待period周期结束在开始,如果任务在period时间内没有执行完,则下次任务执行需要等待本次任务执行完成后执行
ScheduleWithFixedRate()则是在指定的initialDelay时间之后开始执行任务,每次任务完成等待规定时间长短之后再开始执行下一次任务,无论每一次任务执行时间的长短,两次任务之间间隔的时间为不变的delay,设delay为2s,整个流程就如下图展示。
下面是scheduleAtFixedRate()的一个简单调用
public class ScheduleThreadPoolTest {
static ScheduledExecutorService ses = Executors.newScheduledThreadPool(5);
static class MyThread implements Runnable{
@Override
public void run() {
try {
Thread.sleep(1000); //模拟每个任务消耗时间为1s
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.err.println(Thread.currentThread().getName()+System.currentTimeMillis()); //输出系统时间
}
}
public static void main(String[] args) {
Thread m = new Thread(new MyThread());
ses.scheduleAtFixedRate(m, 0, 2, TimeUnit.SECONDS); //周期为2s
}
}
3、单线程化线程池
newSignleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程 来执行任务,保证所有任务按照指定顺序先进先出(FIFO)
public class SingleThreadPoolTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i = 0;i<20;i++) {
final int index = i;
exec.execute(new Runnable(){
public void run() {
System.err.println(index);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
exec.shutdown();
}
});
}
}
}
输出为0-19的顺序输出。
4、可缓存线程池
newCachedThreadPool可以创建一个可缓存的线程池,如果线程池的长度超过任务所需要的线程数,它可以回收空闲线程。若线程池的长度不足以完成任务所需,则会新建线程。但是该线程池很容易造成堆内存溢出。因此实际很少用。
public class CachedThreadPoolTest {
public static void main(String[] args) {
Executor exec = Executors.newCachedThreadPool();
for(int i = 0;i<10;i++) {
final int index = i;
try {
Thread.sleep(index*100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
exec.execute(new Runnable() {
public void run() {
System.err.println(index+"当前线程为"+Thread.currentThread().getName());
}
});
}
}
}
线程池虽然为无限大,但是因为在执行第二个任务之前第一个任务已经完成,所以第二个任务会复用第一个任务用到的线程,以此类推,在没有任务需要用到线程池的时候,该线程池会在一分钟之后自动关闭。
二、核心线程池的内部实现
无论是newFixedThreadPool(),还是newCachedThreadPool(),还是newScheduleThreadPool(),等虽然创建的线程池有着不同的功能,但是内部实现都是用到了ThreadPoolExecutor()
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这三个方法都是对ThreadPoolExecutor类的封装,下面是ThreadPoolExecutor最重要构造函数;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandle handle
)
函数的参数含义如下
1.corePoolSize:指定线程池的大小;
2.maximumPoolSize:指定线程池的最大线程数量
3.keepAliveTime:当线程池线程数量超过corePoolSizeshi是,多余的空闲线程的存货时间,即超过corePoolSize的空闲线程在多久的时间内被销毁。
4.unit:keepAliveTime的单位
5.workQueue:任务队列,被提交但尚未被执行的任务。
6.ThreadFactory:线程工厂,用于创建线程,一般用默认的即可
7.handle:拒绝策略,当任务太多来不及处理,如何拒绝任务。
这里着重学习一下workQueue(面试题常问)和handle这两个参数。
workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用一下几种BlockingQueue
1.直接提交队列:该功能有SynchronousQueue对象提供。它是一个特殊的BlockingQueue,它没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。SynchronousQueue队列并没有真是的将任务保存,它总是将任务交给线程执行,当没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大,则会执行拒绝策略。因此使用SynchronousQueue时通常需要设置很大的maximumPoolSize.
2.有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量。当使用有界队列的时候,线程池的线程数量小于corePoolSize,则新建线程,若大于corePoolSize则会将新任务加入等待队列,如果等待队列已满,如果总线程数小于maximumPoolSize,则创建新的进程执行任务,若大于maximumPoolSize,则执行拒绝策略。因此,ArrayBlockingQueue仅当任务队列装满是,才会将线程数提升到corePoolSize以上。
3.无界的任务队列:无界任务队列LinkedBlockingQueue不存在加入队列失败的情况,除非将系统资源耗尽。当有新的任务加入,如果线程数小于corePoolSize,则创建新的线程,若大于corePoolSize则加入无界队列。如果任务加入的效率远大于线程池线程处理的效率,则无界队列会一直增长,直到耗尽系统内存。
4.优先任务队列:优先任务队列是特殊的无界队列,它通过PriorityBlockingQueeu实现,可以根据任务自身的优先级顺序执行。在确保系统性能的同时,也有很好地质量保证。
三、拒绝策略
当任务数量超过了系统实际承载能力时,就需要用到拒绝策略。也就是说,线程池的线程已经用完了,无法继续为新任务服务。同时等待队列中也已经排满了,这个时候就需要用拒绝策略。
JDK内置了四种拒绝策略。
1、AbortPolicy策略:该策略直接抛出异常,阻止系统继续工作
2、CallerOledestPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,这样不会真正的丢弃任务,但是有可能会大大降低向性能
3、DiscardOledestPolicy策略:该策略抛弃一个即将被执行的任务,并尝试再次提交当前的任务
4、DiscardPolicy策略:该策略抛弃无法处理的任务,如果任务允许被弃置,这种方法是最好的拒绝策略
public class RejectThreadPoolTest {
public static class MyThread implements Runnable{
@Override
public void run() {
System.err.println(System.currentTimeMillis()+"Thread ID:"+Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyThread m = new MyThread();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10),
Executors.defaultThreadFactory()
, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//打印丢弃的任务
System.out.println(r.toString() + " is discard");
}
});
for(int i = 0;i<Integer.MAX_VALUE;i++) {
es.submit(m);
Thread.sleep(100);
}
}
}
该线程池大小为5,线程池的最大线程数量为5,等待队列容量为10,由于任务数量太过巨大,必然会导致许多任务被抛弃。设置的拒绝策略在抛弃的同时生效。
四、自定义线程创建:ThreadFactory
线程池的创建是为了避免频繁的创建和销毁线程,节省系统资源。线程池的构造函数中指定了该线程池创建线程的策略,即ThreadFactory。ThreadFactory是一个接口,它只有一个方法,用来创建线程。
Thread newThread(Runnable r);
可以通过自定义线程池更好的控制线程池中的线程,比如可以跟踪线程池在什么时候创建了线程池,也可以自定义线程的优先级,名称等信息,也可以把线程池中的线程全部设为守护线程,用户线程全部结束之后,如果线程池中的线程全部为守护线程那么线程池会被强制关闭。
public class ThreadFactoryTest {
public static class MyThread implements Runnable{
@Override
public void run() {
System.err.println(System.currentTimeMillis()+"使用的线程为:"+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyThread m = new MyThread();
ExecutorService exec = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.err.println("create"+t);
return t;
}
});
for(int i = 0;i<20;i++) {
exec.submit(m);
Thread.sleep(2000);
}
}
}
上述代码把线程池中的线程都自定义为了守护线程,在主线程main执行完毕之后,线程池虽然没有shutdown,但是也自动销毁了,然后程序运行结束。
五、扩展线程池
ThreadPoolExecutor线程池是一个可扩展的线程池,它提供了beforeExecute()、afterExecute()和terminated()三个接口对线程池进行控制。通过这三个接口可以自定义一些增强功能。比如记录一个任务的开始,结束和整个线程池的退出
public class ExtThreadPoolTest {
public static class MyThread implements Runnable {
static String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
System.err.println(name+"正在执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService exec = new ThreadPoolExecutor(5,5,0L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()) {
@Override
protected void beforeExecute(Thread t,Runnable r) {
System.err.println(Thread.currentThread().getName()+"任务开始");
}
@Override
protected void afterExecute(Runnable r,Throwable t) {
System.err.println(Thread.currentThread().getName()+"任务结束");
}
@Override
protected void terminated() {
System.err.println("线程池关闭");
}
};
for(int i = 0;i<10;i++) {
exec.execute(new MyThread("线程"+i));
}
exec.shutdown();
}
}
beforeExecute()记录一个任务的开始,afterExecute()记录着一个任务的结束,terminated记录着线程池的关闭。上述代码将10个任务提交给线程池。当线程池中的任务分配线程并开始执行时,每一个线程开始前都会执行beforeExecute()方法,打印线程名称,线程执行完成时调用afterExecute()方法,等10个任务全部完成之后,线程池关闭,关闭前调用terminated()方法。这三个方法可以在开发时获取调用线程的时间等信息,为处理程序问题提供有效的帮助。
六、优化线程池线程数量
线程池过大或者过小都会对系统的性能有一定的影响。在确定线程池的大小时,虽然不必很精确,但是线程池的大小需要避免极大或者极小两种情况。线程池的估算公式为
在java中取得可用cpu数量的方法为
Runtime.getRuntime().availableProcessors();