package github.com.AllenDuke.concurrentTest.threadPoolTest; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; /** * @description 简单模仿线程池,原理基本一致,先由核心线程处理,处理不来则加入队列,队列若满则新建非核心线程,达最大线程数则拒绝。 * 每个线程完成当前任务后(还没有进入空闲,避免在空闲与非空闲之间频繁切换,而且造成线程不安全,因为如果此时有新任务进来,主线程尝试 * 为它设定任务,而它自己又尝试从队列中拉取,由此覆盖,造成任务丢失)尝试从任务队列中拉取任务, * 拉取失败(队列已空)进入空闲状态(自己不再尝试从队列中拉取,而是‘轮询等待’主线程分配,注意防止分配到一半就去拉取), * 非核心线程记录和空闲时间有关的数据,拉取成功重置。 * 非核心线程空闲一定时间后消亡,消亡后要从空闲非核心线程中移除,否则会造成任务丢失。 * shutDonw后拒绝任务,所有存活线程在完成当前任务后消亡。 * @contact [email protected] * @since 2019/11/30 */ public class ThreadPoolService { private int coreSize = 2;//核心线程数 private int maxSize = 4;//最大线程数 private long keepAlive = 2 * 1000;//空闲时间,单位毫秒 private boolean isShutDown = false; private ConcurrentLinkedQueue<Runnable> taskQueue = new ConcurrentLinkedQueue();//并发任务队列,控制并发拉取 private int queueCapacity = 10;//任务队列容量 private AtomicInteger queueSize = new AtomicInteger(0);//当前任务队列大小,原子变量控制并发提交 private RejectHandler rejectHandler = new MyRejectHandler();//拒绝策略 private ConcurrentLinkedQueue<CoreThread> freeCorePool = new ConcurrentLinkedQueue();//空闲核心线程队列 private ConcurrentLinkedQueue<NonCoreThread> freeNonCorePool = new ConcurrentLinkedQueue();//空闲非核心线程队列 private AtomicInteger curSize = new AtomicInteger(2);//当前线程数,原子变量控制并发新建和消亡 //初始化空闲核心线程队列 { CoreThread core0 = new CoreThread(); core0.setName("core0"); CoreThread core1 = new CoreThread(); core1.setName("core1"); freeCorePool.add(core0); freeCorePool.add(core1); } /** * @param task 任务 * @description: 有以下几种情况: * 1.如果线程池已经关闭则直接拒绝 * 2.成功交付空闲线程 * 3.成功加任务队列 * 4.如果队列已满且已达最大线程数,那么拒绝 * @return: void * @date: 2020/2/10 */ public void execute(Runnable task) { if (isShutDown) reject((task)); else if(!executeByCore(task)&&!executeByNonCore(task)&&!waitInQueue(task)) { if(curSize.get()==maxSize) reject(task);//有可能刚询问完,队列就为空了 else if(curSize.incrementAndGet()<=maxSize){//注意并发新建非核心线程 NonCoreThread nonCore=new NonCoreThread(); nonCore.setName("nonCore"+(curSize.get()-2)); freeNonCorePool.add(nonCore); executeByNonCore(task); }else{//新建失败 curSize.decrementAndGet(); reject(task); } } } /** * @param task * @description: 尝试从空闲的核心线程队列中弹出一个线程,任务就给该核心线程执行 * @return: boolean true为交付成功,false为交付失败 * @date: 2020/2/10 */ private boolean executeByCore(Runnable task) { CoreThread core = freeCorePool.poll(); if (core != null) { core.setTask(task); if (core.getState() == Thread.State.NEW) core.start(); return true; } else return false; } /** * @param task * @description: 尝试把任务加入队列,用原子变量控制 * @return: boolean true为添加成功,false为添加失败 * @date: 2020/2/10 */ private boolean waitInQueue(Runnable task) { //如果+1后仍在容量内,则添加成功 if (queueSize.incrementAndGet() <= queueCapacity) { taskQueue.add(task); System.out.println("加入队列任务——" + task); return true; } //+1后超出容量,则添加失败,需要把-1 else { queueSize.decrementAndGet(); return false; } } /** * @param task * @description: 尝试从空闲的非核心线程队列中弹出一个线程,任务就给该非核心线程执行 * @return: boolean true为交付成功,false为交付失败 * @date: 2020/2/10 */ private boolean executeByNonCore(Runnable task) { NonCoreThread nonCore = freeNonCorePool.poll(); if (nonCore != null) { nonCore.setTask(task); if (nonCore.getState() == Thread.State.NEW) nonCore.start(); return true; } else return false; } //拒绝当前任务 private void reject(Runnable task) { rejectHandler.reject(task); } //关闭线程池,每个线程完成当前任务后消亡 public void shutDown() { isShutDown = true; System.out.println("线程池关闭"); } /** * @description: 核心线程不断工作,完成当前任务后会尝试从队列中拉取任务,在shutDown前不会消亡 * @date: 2020/2/10 */ class CoreThread extends Thread { private volatile boolean isFree = false;//volatile禁止指令重排序,且确保主线程的修改对它本身可见 private volatile Runnable task; /** * @description: 尝试从阻塞队列拉取任务,若成功则设置当前任务,失败则当前线程回到空闲核心线程队列 * @return: void * @date: 2020/2/10 */ public void pullTask() { Runnable task = taskQueue.poll(); if (task != null) { setTask(task); queueSize.decrementAndGet(); } else {//失败说明任务队列已为空 isFree = true; freeCorePool.add(this); } } public void setTask(Runnable task) { isFree = false; this.task = task; } @Override public void run() { while (!isShutDown) { if (task != null) { task.run(); System.out.println(Thread.currentThread().getName() + "完成任务——" + task); task = null; } //这个判断条件是很苛刻的 if(!isFree&&task==null) pullTask();//二者都用volatile遵循happens-bofore,防止主线程修改到一半就去拉取 } System.out.println(Thread.currentThread().getName() + "消亡"); } } /** * @description: 非核心线程不断工作,完成当前任务后会尝试从队列中拉取任务,在空闲一定时间后消亡 * @date: 2020/2/10 */ class NonCoreThread extends Thread { private volatile boolean isFree = false;//volatile确保主线程的修改对它本身可见 private long beginFree; private volatile Runnable task; /** * @param task * @description: 每一次设置任务后重置空闲标志 * @return: void * @date: 2020/2/10 */ public void setTask(Runnable task) { this.task = task; isFree = false; } /** * @description: 尝试从阻塞队列拉取任务,若成功则重新设置任务, * 若失败,不是空闲则设置空闲标记,进行空闲计时,当前线程回到空闲队列 * @return: void * @date: 2020/2/10 */ public void pullTask() { Runnable task = taskQueue.poll(); if (task != null) { setTask(task); queueSize.decrementAndGet(); } else {//失败说明任务队列已为空 isFree = true; beginFree = System.currentTimeMillis(); freeNonCorePool.add(this); } } /** * @param * @description: 在没有shutDown且(在忙或者剩余时间 > 0)时不会消亡, * 每次完成任务后将当前任务置空 * 总是尝试从阻塞队列中拉取任务 * 消亡时,当前线程数-1 * @return: void * @date: 2020/2/10 */ @Override public void run() { while (!isShutDown && (!isFree || System.currentTimeMillis() - beginFree < keepAlive)) { if (task != null) { task.run(); System.out.println(Thread.currentThread().getName() + "完成任务——" + task); task = null; } if(!isFree&&task==null) pullTask(); } curSize.decrementAndGet(); freeNonCorePool.remove(this);//把消亡的线程从队列中移除,消亡后的线程不为null,不会被上面的executrByNonCore感知到,进而造成任务丢失 System.out.println(Thread.currentThread().getName() + "消亡"); } } }
package github.com.AllenDuke.concurrentTest.threadPoolTest; import java.util.concurrent.atomic.AtomicInteger; /** * @description 任务类 * @contact [email protected] * @since 2019/11/30 */ public class Task implements Runnable{ private static AtomicInteger sum=new AtomicInteger(0); public int num; public Task(){num=sum.getAndIncrement();} @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "准备执行任务——" + num); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return num+""; } public static void main(String[] args) throws InterruptedException { ThreadPoolService threadPoolService=new ThreadPoolService(); //并发提交 new Thread(()->{ for (int i = 0; i < 10; i++) { threadPoolService.execute(new Task()); } }).start(); new Thread(()->{ for (int i = 0; i < 20; i++) { threadPoolService.execute(new Task()); } }).start(); Thread.sleep(3000); for (int i = 0; i < 20; i++) { threadPoolService.execute(new Task()); } //threadPoolService.shutDown(); } }
package github.com.AllenDuke.concurrentTest.threadPoolTest; /** * @description 拒绝策略 * @contact [email protected] * @since 2020/2/10 */ public interface RejectHandler { void reject(Runnable task); }
package github.com.AllenDuke.concurrentTest.threadPoolTest; /** * @description * @contact [email protected] * @since 2020/2/10 */ public class MyRejectHandler implements RejectHandler { @Override public void reject(Runnable task) { System.out.println("队列已满,已达最大线程数,无空闲线程,抛弃当前任务——"+task.toString()); } }