并发编程14
定义一个线程池
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class TestPool { public static void main(String[] args) { //线程池里面创建4个线程 其中三个是计算的 第四个是汇总的 AtomicInteger i= new AtomicInteger(); ExecutorService executorService = Executors.newFixedThreadPool(4,(e)-> new Thread(e,"t"+i.incrementAndGet())); executorService.submit(() -> System.out.println("提交任务")); } }
threadpool
-
set(核心线程集合) core(核心线程数) Queue[4](队列){ queue Lock put() poll() queueSize} submit(提交任务方法)
-
提交的线程数大于core,之后会放入Queue中
模拟线程池v1
组成
自定义的线程池
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.HashSet; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class EnjoyThreadPool { //核心线程的集合 // 存线程 HashSet<EnjoyNode> set; EnjoyQueue enjoyQueue; // 支持同时几个线程运行 //核心线程数 int core; public EnjoyThreadPool(int core){ //初始化核心线程数 this.core=core; // 默认队列最多存放两个 // 实际应该再抽象一个参数出来 enjoyQueue = new EnjoyQueue(2); // 实例化hastset set = new HashSet<>(); } public EnjoyQueue getEnjoyQueue() { return enjoyQueue; } public void remove(EnjoyNode enjoyNode){ set.remove(enjoyNode); } // 1.首先提交任务到这里来 // 2.考虑当前核心线程数有没有达到上限 // 3.如果还有达到核心数上限,直接new一个线程出来,并执行 // 4.核心线程数达到上限,应该让task去队列中排队 // 5.如果线程执行完了,不能直接结束,应该让它复用 // 6. // CustomTask是一个runnable接口 // 不需要提交线程,因为hashset里面已经保存线程了 public void submitTask(CustomTask target){ if(set.size()<core){ log.debug("核心线程数还有空闲 new node"); EnjoyNode enjoyNode = new EnjoyNode(target,"t"+(set.size()+1),this); log.debug("把new出来的node add到set集合当中"); // 把new出来的core线程加入到hashset中 set.add(enjoyNode); // 线程启动 enjoyNode.start(); //log.debug("核心线程数还有空闲 直接执行不需要去队列当中 new t"); }else{ log.debug("核心线程数達到上限 应该让这个task去队列当中[{}]",target.getName()); // 核心线程满了,提交的多余线程放入阻塞队列 enjoyQueue.put(target); } } public void remove(EnjoyNode enjoyNode){ set.remove(enjoyNode); } }
自定义线程节点(为了线程做更多的事)
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class EnjoyNode extends Thread{ private CustomTask target; // 为了核心线程执行完,能从队列中继续取任务,需要把线程传进来 private EnjoyThreadPool enjoyThreadPool; private EnjoyQueue enjoyQueue; // 生成线程时,把runnable软方法传进来 // 传入线程的自定义名字tname public EnjoyNode(CustomTask target, String tname,EnjoyThreadPool enjoyThreadPool){ // 设置线程名字 setName(tname); this.target=target; this.enjoyThreadPool = enjoyThreadPool; // 利用线程池的get方法取出阻塞队列 enjoyQueue = enjoyThreadPool.getEnjoyQueue(); } /** * 这里执行完成之后不能直接结束 * 执行到这里其实有两种情况 * 1、这个任务是直接给到我的 线程池给我的 target!=null * 2、这个任务从阻塞队列当中取到的 (target=enjoyQueue.poll())!=null */ @Override public void run() { // 线程执行完了不能死,应该继续从队列中取任务 // 只要队列中有任务,这个线程就不能死 // target!=null 是第一次进入到这个线程中自己提交的任务 // while (target!=null||(target=enjoyQueue.poll())!=null){ log.debug("task---[{}]",target.getName()); target.run(); // 跑完后置空,再从队列中取任务 // 如果不置空 target一直是不等于null的,永远不会去队列中取 target=null; } } }
自定义线程中的软方法
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class CustomTask implements Runnable{ // 为了知道线程池中每个线程执行了哪些task // 所以拓展runnable,给其增加个名字 String name; public CustomTask(String name){ this.name=name; } public String getName() { return name; } @Override public void run() { log.debug("==========================================-task-------{}",name); } }
线程阻塞队列
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class EnjoyQueue { Lock lock = new ReentrantLock(); //第一个条件队列 //如果deque当中满了的条件队列 Condition busyws = lock.newCondition(); //第二个条件队列 //当deque当中没有了task则进入条件队列 Condition emptyws = lock.newCondition(); // 双向链表实现的队列 Deque<CustomTask> deque = new ArrayDeque(); // 队列的数量上限 private int queueSize; public EnjoyQueue(int queueSize) { this.queueSize = queueSize; } // 往阻塞队列中添加任务 // 1.首先需要判断队列上限 // 2.这里要用while循环,因为要阻塞,不然释放后,不会再去判断 // 3. public void put(CustomTask task){ lock.lock(); try{ // 阻塞队列满了,加入到条件队列中阻塞 while (deque.size()==queueSize){ log.debug("put 队列已经满了 应该去阻塞"); try { busyws.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("put 队列可以存放 则put一个task[{}]",task.getName()); // 添加到队尾 deque.addLast(task); emptyws.signal(); }finally { lock.unlock(); } } public CustomTask poll(){ lock.lock(); try{ log.debug("poll 一个task"); // 如果队列为空,阻塞来去任务的线程 while (deque.isEmpty()) { log.debug("poll 队列当中没有则阻塞"); emptyws.await(); } //拿出第一个并且在队列中删除掉取出的这个 CustomTask task = deque.removeFirst(); log.debug("poll 能够获取 一个正常的task[{}]",task.getName()); //唤醒满员队列中的线程 busyws.signal(); return task; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return null; } }
测试线程池
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class TestPool { public static void main(String[] args) { EnjoyThreadPool enjoyThreadPool = new EnjoyThreadPool(2); // 单线程提交任务 for (int i = 0; i <5 ; i++) { enjoyThreadPool.submitTask(new CustomTask("task"+i)); } EnjoyNode enjoyNode = new EnjoyNode(new CustomTask("s"), "s", enjoyThreadPool); } }
模拟线程池v2
组成
自定义的线程池
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.HashSet; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class EnjoyThreadPool { //核心线程的集合 // 存线程 HashSet<EnjoyNode> set; EnjoyQueue enjoyQueue; PolicyHandler policyHandler; // 支持同时几个线程运行 //核心线程数 int core; public EnjoyThreadPool(int core,PolicyHandler policyHandler){ //初始化核心线程数 this.core=core; // 默认队列最多存放两个 // 实际应该再抽象一个参数出来 enjoyQueue = new EnjoyQueue(2,policyHandler); // 实例化hastset set = new HashSet<>(); } public EnjoyQueue getEnjoyQueue() { return enjoyQueue; } public void remove(EnjoyNode enjoyNode){ set.remove(enjoyNode); } // 1.首先提交任务到这里来 // 2.考虑当前核心线程数有没有达到上限 // 3.如果还有达到核心数上限,直接new一个线程出来,并执行 // 4.核心线程数达到上限,应该让task去队列中排队 // 5.如果线程执行完了,不能直接结束,应该让它复用 // 6. // CustomTask是一个runnable接口 // 不需要提交线程,因为hashset里面已经保存线程了 public void submitTask(CustomTask target){ if(set.size()<core){ log.debug("核心线程数还有空闲 new node"); EnjoyNode enjoyNode = new EnjoyNode(target,"t"+(set.size()+1),this); log.debug("把new出来的node add到set集合当中"); // 把new出来的core线程加入到hashset中 set.add(enjoyNode); // 线程启动 enjoyNode.start(); //log.debug("核心线程数还有空闲 直接执行不需要去队列当中 new t"); }else{ log.debug("核心线程数達到上限 应该让这个task去队列当中[{}]",target.getName()); // 核心线程满了,提交的多余线程放入阻塞队列 enjoyQueue.put(target); } } public void remove(EnjoyNode enjoyNode){ set.remove(enjoyNode); } }
自定义线程节点(为了线程做更多的事)
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class EnjoyNode extends Thread{ private CustomTask target; // 为了核心线程执行完,能从队列中继续取任务,需要把线程传进来 private EnjoyThreadPool enjoyThreadPool; private EnjoyQueue enjoyQueue; // 生成线程时,把runnable软方法传进来 // 传入线程的自定义名字tname public EnjoyNode(CustomTask target, String tname,EnjoyThreadPool enjoyThreadPool){ // 设置线程名字 setName(tname); this.target=target; this.enjoyThreadPool = enjoyThreadPool; // 利用线程池的get方法取出阻塞队列 enjoyQueue = enjoyThreadPool.getEnjoyQueue(); } /** * 这里执行完成之后不能直接结束 * 执行到这里其实有两种情况 * 1、这个任务是直接给到我的 线程池给我的 target!=null * 2、这个任务从阻塞队列当中取到的 (target=enjoyQueue.poll())!=null */ @Override public void run() { // 线程执行完了不能死,应该继续从队列中取任务 // 只要队列中有任务,这个线程就不能死 // target!=null 是第一次进入到这个线程中自己提交的任务 // v2:如果去队列中拿任务,规定时间内都拿不到,就不拿了 while (target!=null||(target=enjoyQueue.poll())!=null){ log.debug("task---[{}]",target.getName()); target.run(); // 跑完后置空,再从队列中取任务 // 如果不置空 target一直是不等于null的,永远不会去队列中取 target=null; } } }
自定义线程中的软方法
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class CustomTask implements Runnable{ // 为了知道线程池中每个线程执行了哪些task // 所以拓展runnable,给其增加个名字 String name; public CustomTask(String name){ this.name=name; } public String getName() { return name; } @Override public void run() { log.debug("==========================================-task-------{}",name); } }
线程阻塞队列
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class EnjoyQueue { Lock lock = new ReentrantLock(); //第一个条件队列 //如果deque当中满了的条件队列 Condition busyws = lock.newCondition(); //第二个条件队列 //当deque当中没有了task则进入条件队列 Condition emptyws = lock.newCondition(); // 双向链表实现的队列 Deque<CustomTask> deque = new ArrayDeque(); // 队列的数量上限 private int queueSize; long timeOut = 2000; public EnjoyQueue(int queueSize) { this.queueSize = queueSize; } // 往阻塞队列中添加任务 // 1.首先需要判断队列上限 // 2.这里要用while循环,因为要阻塞,不然释放后,不会再去判断 // v2版本:如果阻塞超过一定时间后,则放弃这个任务 // 超过线程核心数+阻塞队列大小的线程, // 也就是测试中的task5会一直阻塞 // public void put(CustomTask task){ lock.lock(); try{ // 阻塞队列满了,加入到条件队列中阻塞 while (deque.size()==queueSize){ log.debug("put 队列已经满了 应该去阻塞"); try { // 带超时时间的阻塞 busyws.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("put 队列可以存放 则put一个task[{}]",task.getName()); // 添加到队尾 deque.addLast(task); emptyws.signal(); }finally { lock.unlock(); } } public void tryPut(CustomTask task){ lock.lock(); try{ // 阻塞队列满了,加入到条件队列中阻塞 long n = timeOut; while (deque.size()==queueSize){ log.debug("put 队列已经满了 应该去阻塞"); try { if(n<=0){ return; } // 带超时时间的阻塞 // nanos是纳秒 // 如果睡眠500纳秒被打断,则返回1500纳秒 n = busyws.awiteNanos(n); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("put 队列可以存放 则put一个task[{}]",task.getName()); // 添加到队尾 deque.addLast(task); emptyws.signal(); }finally { lock.unlock(); } } // 1.队列满了直接放弃 // 2.队列满了永远等 // 3.队列满了抛异常 // 4.队列满了调用者自己执行(就不是多线程了) // 利用一种设计模式,spring里面用的很多的,策略模式 public void tryPut(CustomTask task){ lock.lock(); try{ // 阻塞队列满了,加入到条件队列中阻塞 long n = timeOut; while (deque.size()==queueSize){ log.debug("put 队列已经满了 应该去阻塞"); try { if(n<=0){ return; } // 带超时时间的阻塞 // nanos是纳秒 // 如果睡眠500纳秒被打断,则返回1500纳秒 n = busyws.awiteNanos(n); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("put 队列可以存放 则put一个task[{}]",task.getName()); // 添加到队尾 deque.addLast(task); emptyws.signal(); }finally { lock.unlock(); } } public CustomTask poll(){ lock.lock(); try{ log.debug("poll 一个task"); // 如果队列为空,阻塞来去任务的线程 while (deque.isEmpty()) { log.debug("poll 队列当中没有则阻塞"); emptyws.await(); } //拿出第一个并且在队列中删除掉取出的这个 CustomTask task = deque.removeFirst(); log.debug("poll 能够获取 一个正常的task[{}]",task.getName()); //唤醒满员队列中的线程 busyws.signal(); return task; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return null; } public CustomTask pollTimeOut(){ lock.lock(); try{ log.debug("poll 一个task"); // 如果队列为空,阻塞来去任务的线程 while (deque.isEmpty()) { log.debug("poll 队列当中没有则阻塞"); emptyws.await(4,TimeUnit.SECONDS); if(deque.isEmpty()){ // 返回null,核心线程也就停止了 return null; } } //拿出第一个并且在队列中删除掉取出的这个 CustomTask task = deque.removeFirst(); log.debug("poll 能够获取 一个正常的task[{}]",task.getName()); //唤醒满员队列中的线程 busyws.signal(); return task; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return null; } }
v2新增:tryPut的不同策略
-
package BingFaBianCheng.bingFaBianCheng14.poolv2; /** * @Author 钢牌讲师--子路 **/ public interface PolicyHandler { public void handler(EnjoyQueue queue, CustomTask task); }
-
package BingFaBianCheng.bingFaBianCheng14.poolv2; /** * @Author 钢牌讲师--子路 **/ public class A implements PolicyHandler{ @Override public void handler(EnjoyQueue queue, CustomTask task) { throw new RuntimeException("线程池不支持这么多队列"); } }
-
PolicyHandler policyHandler; public EnjoyQueue(int queueSize, PolicyHandler policyHandler) { this.queueSize = queueSize; this.policyHandler = policyHandler; } // 1.队列满了直接放弃 // 2.队列满了永远等 // 3.队列满了抛异常 // 4.队列满了调用者自己执行(就不是多线程了) // 利用一种设计模式,spring里面用的很多的,策略模式 public void tryPut(CustomTask task){ lock.lock(); // 不要在while里面try-catch,如果try里面的方法抛异常,会死循环一直抛异常 try{ while (deque.size()==queueSize){ log.debug("put 队列已满,"); policyHandler.handler(this,task); // 防止又执行一遍 return; } log.debug("put 队列可以存放 则put一个task[{}]",task.getName()); // 添加到队尾 deque.addLast(task); emptyws.signal(); }catch(Exception e){ e.printStackTrace(); }finally { lock.unlock(); } }
测试线程池
-
package BingFaBianCheng.bingFaBianCheng14.poolv1; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Author 钢牌讲师--子路 **/ @Slf4j(topic = "e") public class TestPool { public static void main(String[] args) { EnjoyThreadPool enjoyThreadPool = new EnjoyThreadPool(2,(queue,task)->{ // 以下策略中选一种 // 1.永远阻塞 queue.put(task); // 2.null 什么都不执行 // 3.自己调用 task.run(); // 4.抛异常 throw new RuntimeException("线程不支持这么多队列") }); PolicyHandler policyHandler; // 单线程提交任务 for (int i = 0; i <5 ; i++) { enjoyThreadPool.submitTask(new CustomTask("task"+i),policyHandler); } EnjoyNode enjoyNode = new EnjoyNode(new CustomTask("s"), "s", enjoyThreadPool); } }