一、读写锁
读(共享锁):可以被多线程持有
写(独占锁):只能被一个线程持有
二、阻塞队列
1、什么情况下会阻塞
第一种:当队列满了以后,还要继续往队列里面添加元素
第二种:当队列空了以后,还要从队列里面取元素,
2、为什么使用阻塞队列
多线程下,如果不关心线程什么时候唤醒,就可以使用阻塞队列,例如:MQ,消息发送发只管发送,消息接收方只管接收
第一组会抛出异常 | 返回一个布尔值,不会抛出异常 | 延时等待 | 一直等待 | |
---|---|---|---|---|
插入 | add() | offer(e) | offer(e,time) | put() |
取出 | remove() | poll() | poll(time) | take() |
检查 | element() | peek() | - |
三、线程池
1、池化技术
程序运行的本质:占用系统资源!提高程序的使用率,降低我们的性能消耗
线程池、连接池、内存池、对象池
2、为什么要用线程池:线程复用
3、三大方法、七大参数、4种拒绝策略
三大方法创建线程池:
//创建只有一个线程的线程池 ExecutorService threadPool= Executors.newSingleThreadExecutor(); //创建一个有固定线程数的线程池 ExecutorService threadPool=Executors.newFixedThreadPool(5); //创建一个遇强则强,遇弱则弱的、可扩容的线程池 ExecutorService threadPool=Executors.newCachedThreadPool(); try { for (int i = 1; i <=10 ; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"\t"+"OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //使用完毕关闭,避免内存泄漏 threadPool.shutdown(); }
三大方法创建源码:
1、Executors.newSingleThreadExecutor(); new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); 2、Executors.newFixedThreadPool(5); new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 3、Executors.newCachedThreadPool(); new ThreadPoolExecutor(0, Integer.MAX_VALUE,//约等于21亿 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ThreadPoolExecutor源码
public ThreadPoolExecutor(int corePoolSize,//核心池线程大小,即永久存在的线程数量 int maximumPoolSize,//核心池最大线程数 long keepAliveTime,//超时等待时间(除核心线程数意外的线程的等待时间,超过这个时间就关闭该线程) TimeUnit unit,//超时等待时间单位 BlockingQueue<Runnable> workQueue,//阻塞队列(核心池里面所有的线程都在使用,这个就是后面进来的等待队列,这个队列最好用双端队列,效率高,注意:使用的队列必须填上大小,不然默认的Integer.MAX_VALUE) ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler//拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
自定义线程池的策略
public class ThreadPoolTest { public static void main(String[] args) { //创建只有一个线程的线程池 //ExecutorService threadPool= Executors.newSingleThreadExecutor(); //创建一个有固定线程数的线程池 //ExecutorService threadPool=Executors.newFixedThreadPool(5); //创建一个遇强则强,遇弱则弱的、可扩容的线程池 //ExecutorService threadPool=Executors.newCachedThreadPool(); System.out.println(Runtime.getRuntime().availableProcessors()); ExecutorService threadPool=new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(),//根据电脑的配置来 3L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()// /** * 1、new ThreadPoolExecutor.AbortPolicy():抛出异常,丢弃任务 * 2、new ThreadPoolExecutor.DiscardPolicy():不抛出异常,丢弃任务 * 3、new ThreadPoolExecutor.DiscardOldestPolicy():尝试获取任务,但不一定执行 * 4、new ThreadPoolExecutor.CallerRunsPolicy:线程原路返回,交给调用者去执行 */ ); try { for (int i = 1; i <=9 ; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()+"\t"+"OK"); }); } } catch (Exception e) { e.printStackTrace(); } finally { //使用完毕关闭,避免内存泄漏 threadPool.shutdown(); } } }
四大拒绝策略:
1、new ThreadPoolExecutor.AbortPolicy():抛出异常,丢弃任务 */ public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } 2、new ThreadPoolExecutor.DiscardPolicy():不抛出异常,丢弃任务 /** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } 3、new ThreadPoolExecutor.DiscardOldestPolicy():尝试获取任务,但不一定执行 public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } 4、new ThreadPoolExecutor.CallerRunsPolicy:线程原路返回,交给调用者去执行 public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
最好效率:所有CPU同时执行
1、CPU密集型:根据CPU数量来决定,保证最大效率
2、IO密集型:50个线程操作IO大资源的,比较耗时,所以做大线程数要大于IO的线程数,