1. 创建一个线程执行者:
使用Executor framework的第一步就是创建一个ThreadPoolExecutor类的对象。你可以使用这个类提供的4个构造器或Executors工厂类来 创建ThreadPoolExecutor。有了执行者,你就可以提交Runnable或Callable对象给执行者来执行。
一个模拟web服务器的示例:
// 1.首先,实现能被服务器执行的任务。创建实现Runnable接口的Task类。 class Task implements Runnable { //2.声明一个类型为Date,名为initDate的属性,来存储任务创建日期,和一个类型为String,名为name的属性,来存储任务的名称。 private Date initDate; private String name; // 3.实现Task构造器,初始化这两个属性。 public Task(String name) { initDate = new Date(); this.name = name; } // 4.实现run()方法。 @Override public void run() { // 5.首先,将initDate属性和实际日期(这是任务的开始日期)写入到控制台。 System.out.printf("%s: Task %s: Created on: %s\n", Thread.currentThread().getName(), name, initDate); System.out.printf("%s: Task %s: Started on: %s\n", Thread.currentThread().getName(), name, new Date()); // 6.然后,使任务睡眠一个随机时间。 try { Long duration = (long) (Math.random() * 10); System.out.printf("%s: Task %s: Doing a task during %dseconds\n", Thread.currentThread().getName(), name, duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } //7.最后,将任务完成时间写入控制台。 System.out.printf("%s: Task %s: Finished on: %s\n", Thread.currentThread().getName(), name, new Date()); } } // 8.现在,实现服务器类,用来执行使用执行者接受的所有任务。创建一个Server类。 class Server { // 9.声明一个类型为ThreadPoolExecutor,名为executor的属性。 private ThreadPoolExecutor executor; // 10.实现Server构造器,使用Executors类初始化ThreadPoolExecutor对象。 public Server(){ executor=(ThreadPoolExecutor)Executors.newCachedThreadPool(); } // 11.实现executeTask()方法,接收Task对象作为参数并将其提交到执行者。首先,写入一条信息到控制台,表明有一个新的任务到达。 public void executeTask(Task task) { System.out.printf("Server: A new task has arrived\n"); //12.然后,调用执行者的execute()方法来提交这个任务。 executor.execute(task); //13.最后,将执行者的数据写入到控制台来看它们的状态。 System.out.printf("Server: Pool Size: %d\n", executor.getPoolSize()); System.out.printf("Server: Active Count: %d\n", executor.getActiveCount()); System.out.printf("Server: Completed Tasks: %d\n", executor.getCompletedTaskCount()); } //14.实现endServer()方法,在这个方法中,调用执行者的shutdown()方法来结束任务执行。 public void endServer() { executor.shutdown(); } } // 15.最后,实现这个示例的主类,创建Main类,并实现main()方法。 class Main3 { public static void main(String[] args) { Server server=new Server(); for (int i=0; i<100; i++){ Task task=new Task("Task "+i); server.executeTask(task); } server.endServer(); } }Server类是这个示例的关键。它创建和使用ThreadPoolExecutor执行任务。
第一个重要点是在Server类的构造器中创建ThreadPoolExecutor。ThreadPoolExecutor有4个不同的构造器,但由于它 们的复杂性,Java并发API提供Executors类来构造执行者和其他相关对象。即使我们可以通过ThreadPoolExecutor类的任意一 个构造器来创建ThreadPoolExecutor,但这里推荐使用Executors类。
在本例中,你已经使用 newCachedThreadPool()方法创建一个缓存线程池。这个方法返回ExecutorService对象,所以它被转换为 ThreadPoolExecutor类型来访问它的所有方法。你已创建的缓存线程池,当需要执行新的任务会创建新的线程,如果它们已经完成运行任务,变成可用状态,会重新使用这些线程。线程重复利用的好处是,它减少线程创建的时间。缓存线程池的缺点是,为新任务不断创建线程, 所以如果你提交过多的任务给执行者,会使系统超载。
注意事项:使用通过newCachedThreadPool()方法创建的执行者,只有当你有一个合理的线程数或任务有一个很短的执行时间。
一旦你创建执行者,你可以使用execute()方法提交Runnable或Callable类型的任务。在本例中,你提交实现Runnable接口的Task类对象。
你也打印了一些关于执行者信息的日志信息。特别地,你可以使用了以下方法:
- getPoolSize():此方法返回线程池实际的线程数。
- getActiveCount():此方法返回在执行者中正在执行任务的线程数。
- getCompletedTaskCount():此方法返回执行者完成的任务数。
ThreadPoolExecutor 类和一般执行者的一个关键方面是,你必须明确地结束它。如果你没有这么做,这个执行者会继续它的执行,并且这个程序不会结束。如果执行者没有任务可执行, 它会继续等待新任务并且不会结束它的执行。一个Java应用程序将不会结束,除非所有的非守护线程完成它们的执行。所以,如果你不结束这个执行者,你的应用程序将不会结束。
当执行者完成所有待处理的任务,你可以使用ThreadPoolExecutor类的shutdown()方法来表明你想要结束执行者。在你调用shutdown()方法之后,如果你试图提交其他任务给执行者,它将会拒绝,并且抛出RejectedExecutionException异常。
ThreadPoolExecutor 类提供了许多获取它状态的方法,我们在这个示例中,使用getPoolSize()、getActiveCount()和 getCompletedTaskCount()方法来获取执行者的池大小、线程数、完成任务数信息。你也可以使用 getLargestPoolSize()方法,返回池中某一时刻最大的线程数。
ThreadPoolExecutor类也提供其他与结束执行者相关的方法,这些方法是:
- shutdownNow():此方法立即关闭执行者。它不会执行待处理的任务,但是它会返回待处理任务的列表。当你调用这个方法时,正在运行的任务继续它们的执行,但这个方法并不会等待它们的结束。
- isTerminated():如果你已经调用shutdown()或shutdownNow()方法,并且执行者完成关闭它的处理时,此方法返回true。
- isShutdown():如果你在执行者中调用shutdown()方法,此方法返回true。
- awaitTermination(long timeout, TimeUnit unit):此方法阻塞调用线程,直到执行者的任务结束或超时。
2. 创建一个大小固定的线程执行者:
当你使用由Executors类的 newCachedThreadPool()方法创建的基本ThreadPoolExecutor,你会有执行者运行在某一时刻的线程数的问题。这个执行者为每个接收到的任务创建一个线程(如果池中没有空闲的线程),所以,如果你提交大量的任务,并且它们有很长的(执行)时间,你会使系统过载和引发应用程序性能不佳的问题。
如果你想要避免这个问题,Executors类提供一个方法newFixedThreadPool()来创建大小固定的线程执行者。这个执行者有最大线程数。 如果你提交超过这个最大线程数的任务,这个执行者将不会创建额外的线程,并且剩下的任务将会阻塞,直到执行者有空闲线程。这种行为,保证执行者不会引发应用程序性能不佳的问题。
3. 执行者执行返回结果的任务:
Executor framework的一个优点是你可以并发执行返回结果的任务。Java并发API使用以下两种接口来实现:
- Callable:此接口有一个call()方法。在这个方法中,你必须实现任务的(处理)逻辑。Callable接口是一个参数化的接口。意味着你必须表明call()方法返回的数据类型。
- Future:此接口有一些方法来保证Callable对象结果的获取和管理它的状态。
Future<Integer> result = executor.submit(calculator);
4. 运行多个任务并处理第一个结果:
在并发编程中的一个常见的问题就是,当有多种并发任务解决一个问题时,你只对这些任务的第一个结果感兴趣。比如,你想要排序一个数组。你有多种排序算法。 你可以全部启用它们,并且获取第一个结果(对于给定数组排序最快的算法的结果)。String result = executor.invokeAny(taskList);
5.运行多个任务并处理所有结果:
ThreadPoolExecutor类提供一个方法,允许你提交任务列表给执行者,并且在这个列表上等待所有任务的完成。
List<Future<Result>> resultList = executor.invokeAll(taskList);
6. 执行者延迟运行一个任务:
执行者框架提供ThreadPoolExecutor类,使用池中的线程来执行Callable和Runnable任务,这样可以避免所有线程的创建操作。当你提交一个任务给执行者,会根据执行者的配置尽快执行它。在有些使用情况下,当你对尽快执行任务不感觉兴趣。你可能想要在一段时间之后执行任务或周期性地执行任务。基于这些目的,执行者框架提供 ScheduledThreadPoolExecutor类。schedule(Callable<V> callable, long delay, TimeUnit unit)
7. 执行者周期性地运行一个任务:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
8. 执行者取消一个任务:
当你使用执行者工作时,你不得不管理线程。你只实现Runnable或 Callable任务和把它们提交给执行者。执行者负责创建线程,在线程池中管理它们,当它们不需要时,结束它们。有时候,你想要取消已经提交给执行者 的任务。在这种情况下,你可以使用Future的cancel()方法,它允许你做取消操作。
9. 执行者控制一个任务完成:
FutureTask类提供一个 done()方法,允许你在执行者执行任务完成后执行一些代码。你可以用来做一些后处理操作,生成一个报告,通过e-mail发送结果,或释放一些资源。当执行的任务由FutureTask来控制完成,FutureTask会内部调用这个方法。这个方法在任务的结果设置和它的状态变成isDone状态之后被调用,不管任务是否已经被取消或正常完成。10. 执行者分离任务的启动和结果的处理:
通常,当你使用执行者执行并发任务时,你将会提交 Runnable或Callable任务给这个执行者,并获取Future对象控制这个方法。你可以发现这种情况,你需要提交任务给执行者在一个对象中,而处理结果在另一个对象中。基于这种情况,Java并发API提供 CompletionService类。CompletionService 类有一个方法来提交任务给执行者和另一个方法来获取已完成执行的下个任务的Future对象。在内部实现中,它使用Executor对象执行任务。这种行为的优点是共享一个CompletionService对象,并提交任务给执行者,这样其他(对象)可以处理结果。其局限性是,第二个对象只能获取那些已经完成它们的执行的任务的Future对象,所以,这些Future对象只能获取任务的结果。
11. 执行者控制被拒绝的任务:
当你想要结束执行者的执行,你使用shutdown()方法来表明它的结束。执行者等待正在运行或等待它的执行的任务的结束,然后结束它们的执行。如果你在shutdown()方法和执行者结束之间,提交任务给执行者,这个任务将被拒绝,因为执行者不再接收新的任务。ThreadPoolExecutor类提供一种机制,在调用shutdown()后,不接受新的任务。
通过实现 RejectedExecutionHandler,在执行者中管理拒绝任务。
参考资料:《Java 7 Concurrency Cookbook》
《Java 9 Concurrency Cookbook Second Edition》