Java基础 --- 线程池
什么是线程池
- 线程池包括许多个可以随时运行的idle线程
- 将 Runnable 放入线程池, 线程池中的一个线程会执行 Runnable中的run方法.
- 当 run 方法退出, 这个线程不会结束, 而是变为idle线程继续呆在线程池里
为什么需要线程池
- 创建线程的代价很大, 因为需要和操作系统交互.
- 如果需要大量的运行时间不长的线程时, 应该使用线程池
- 还有时候使用线程池只是为了方便对task分组并控制
Java中的线程池
Executor
类包含了几个静态工厂方法用来创建线程池
newCachedThreadPool
方法创建一个线程池, 当新的任务进来时, 如果有idle线程则使用idle线程, 没有会创建新的newFixedThreadPool
方法创建一个固定大小的线程池. 如果任务数量超出了idle线程的数量, 任务则会被放进queuenewSingleThreadExecutor
方法创建一个大小为1的线程池- 以上三个方法, 会返回一个实现了
ExecutorService
的ThreadPoolExecutor
对象
- 然后通过返回的对象的以下三个方法, 将Runnable或者Callable放进线程池
Future<?> submit(Runnable task)
Future<T> submit(Runnable task, T result)
Future<T> submit(Callable<T> task)
- submit方法会返回一个Future对象, 用来查询任务状态
- 使用完线程池之后, 使用
shutdown
方法可以关闭线程池. 线程池将不会接受新的任务, 当所有任务执行完毕后, 线程池关闭- 也可以使用
shutdownNow
方法, 线程池将取消所有没有执行的任务, 并且会中断正在运行的线程
Summary: 使用线程池的步骤:
- Call the static newCachedThreadPool or newFixedThreadPool method of the Executors class.
- Call submit to submit Runnable or Callable objects.
- If you want to be able to cancel a task, or if you submit Callable objects, hang
on to the returned Future objects.
- Call shutdown when you no longer want to submit any tasks.
Example code
- 下面代码的主要功能是统计一个目录下所有文件keyword出现的次数(包括sub directories), keyword是用户自定义输入
- 对于每个directory都会使用一个线程, 因为用到的数量很多, 并且持续时间很短, 所以用到了线程池
package threadPool;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
/**
* @version 1.02 2015-06-21
* @author Cay Horstmann
*/
public class ThreadPoolTest {
public static void main(String[] args) throws Exception {
try (Scanner in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
//创建线程池
ExecutorService pool = Executors.newCachedThreadPool();
//实现了Callable接口的任务
MatchCounter counter = new MatchCounter(new File(directory), keyword, pool);
//提交任务
Future<Integer> result = pool.submit(counter);
try {
System.out.println(result.get() + " matching files.");
}
catch (ExecutionException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
}
//关闭线程池
pool.shutdown();
int largestPoolSize = ((ThreadPoolExecutor) pool).getLargestPoolSize();
System.out.println("largest pool size=" + largestPoolSize);
}
}
}
/**
* This task counts the files in a directory and its subdirectories that contain a given keyword.
*/
class MatchCounter implements Callable<Integer> {
private File directory;
private String keyword;
private ExecutorService pool;
private int count;
/**
* Constructs a MatchCounter.
* @param directory the directory in which to start the search
* @param keyword the keyword to look for
* @param pool the thread pool for submitting subtasks
*/
public MatchCounter(File directory, String keyword, ExecutorService pool) {
this.directory = directory;
this.keyword = keyword;
this.pool = pool;
}
public Integer call() {
count = 0;
try {
File[] files = directory.listFiles();
List<Future<Integer>> results = new ArrayList<>();
for (File file : files)
if (file.isDirectory()) {
//对于每个subdirectory都开启一个线程
MatchCounter counter = new MatchCounter(file, keyword, pool);
Future<Integer> result = pool.submit(counter);
results.add(result);
}
else {
if (search(file)) count++;
}
for (Future<Integer> result : results)
try {
count += result.get();
}
catch (ExecutionException e) {
e.printStackTrace();
}
}
catch (InterruptedException e) {
}
return count;
}
/**
* Searches a file for a given keyword.
* @param file the file to search
* @return true if the keyword is contained in the file
*/
public boolean search(File file) {
try {
try (Scanner in = new Scanner(file, "UTF-8")) {
boolean found = false;
while (!found && in.hasNextLine()) {
String line = in.nextLine();
if (line.contains(keyword)) found = true;
}
return found;
}
}
catch (IOException e) {
return false;
}
}
}
Fork-in Framework
- 使用fork-in framework可以利用多线程完成递归任务
package forkJoin;
import java.util.concurrent.*;
import java.util.function.*;
/**
* This program demonstrates the fork-join framework.
* @version 1.01 2015-06-21
* @author Cay Horstmann
*/
public class ForkJoinTest {
public static void main(String[] args) {
final int SIZE = 10000000;
double[] numbers = new double[SIZE];
for (int i = 0; i < SIZE; i++) numbers[i] = Math.random();
Counter counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(counter);
System.out.println(counter.join());
}
}
class Counter extends RecursiveTask<Integer> {
public static final int THRESHOLD = 1000;
private double[] values;
private int from;
private int to;
private DoublePredicate filter;
public Counter(double[] values, int from, int to, DoublePredicate filter) {
this.values = values;
this.from = from;
this.to = to;
this.filter = filter;
}
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.test(values[i])) count++;
}
return count;
}
else {
int mid = (from + to) / 2;
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}