介绍
线程池技术是用来解决高并发,合理利用服务器资源的技术,另外线程池技术无需每次重新创建一个线程,提高了任务的执行效率。如同多线程异步发起请求。
本文介绍一个简单线程池的实现,启用多个Worker(extends Runnable)线程,用一个任务(extends Runnable)队列来维护所有的任务,每执行一个任务,即在Worker中调用任务的run()方法即可。
代码实现
线程池抽象
public interface ThreadPool<Job extends Runnable> {
public void execute(Job job);//执行任务
public void shutdown();//关闭连接池
public void addWorkers(int num);//添加工作者线程
public void removeWorker(int num);//移除工作者线程
public int getJobSize();//获取任务
}
线程池实现
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
//线程池最大线程数量
private static final int MAX_WORKER_NUMBERS = 10;
//线程池默认线程数量
private static final int DEFUALT_WORKER_NUMBERS = 5;
//线程池最小线程数量
private static final int MIN_WORKER_NUMBERS = 1;
//任务队列
private final Queue<Job> jobs = new LinkedList<Job>();
//工作者列表,用来执行任务队列中的任务
private List<Worker> workerList = new ArrayList<Worker>();
//锁
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* 执行一个工作
*/
@Override
public void execute(Job job) {
if(job == null){
throw new NullPointerException("job不能为空");
}
lock.lock();
try{
jobs.add(job);
condition.signalAll();
}finally{
lock.unlock();
}
}
private class Worker implements Runnable{
private volatile boolean running = true;
@Override
public void run() {
while(running && !Thread.currentThread().isInterrupted()){
Job job = null;
lock.lock();
try{
while(jobs.isEmpty()){
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
job = jobs.poll();
}finally{
lock.unlock();
}
if(job != null){
job.run();
}
}
}
public void shutDown(){
running = false;
}
}
public DefaultThreadPool() {
initWorkers(DEFUALT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num){
int workerNum = num>MAX_WORKER_NUMBERS?MAX_WORKER_NUMBERS:num<MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num;
initWorkers(workerNum);
}
@Override
public void shutdown() {
for(Worker worker : workerList){
worker.shutDown();
}
}
@Override
public void addWorkers(int num) {
lock.lock();
try{
if(num + workerList.size() > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS-workerList.size();
}
initWorkers(num);
}finally{
lock.unlock();
}
}
@Override
public void removeWorker(int num) {
lock.lock();
try {
if(num > workerList.size()){
num = workerList.size();
}
for (int i = 0; i < num; i++) {
Worker worker = workerList.get(i);
if(workerList.remove(worker))
worker.shutDown();
}
}finally{
lock.unlock();
}
}
@Override
public int getJobSize() {
return jobs.size();
}
/**
* 初始化工作者线程
*/
public void initWorkers(int num){
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workerList.add(worker);
new Thread(worker).start();
}
}
}
测试类
public class Test {
private static class Task implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ThreadPool<Runnable> threadPool = new DefaultThreadPool<Runnable>();
for (int i = 0; i < 20; i++) {
threadPool.execute(new Task());
}
}
}