并发编程13
高性能读写锁StampedLock(jdk内部的)
- ReentrantReadWriteLock 的性能已经很好了但是他底层还是需要进行一系列的cas操作去加锁;
StampedLock如果是读锁上锁是没有这种cas操作的性能比ReentrantReadWriteLock 更好 - 也称为乐观读锁;即读获取锁的时候 是不加锁 直接返回一个值;然后执行临界区的时候去验证这个值是否有被人修改(写操作加锁)
- 如果没有被人修改则直接执行临界区的代码;如果被人修改了则需要升级为读写锁
(ReentrantReadWriteLock—>readLock);
基本语法
-
//获取戳 不存在锁 long stamp = lock.tryOptimisticRead(); //验证戳 if(lock.validate(stamp)){ //验戳成立则执行临界区的代码 //返回 } //如果没有返回则表示被人修改了 需要升级成为readLock lock.readLock();
两把读锁
-
package BingFaBianCheng.bingFaBianCheng13.shadow.stampedLock; import java.util.concurrent.TimeUnit; public class StampedLockTest { public static void main(String[] args) throws InterruptedException { //实例化数据容器 DataContainer dataContainer = new DataContainer(); //给了一个初始值 不算写 构造方法赋值 dataContainer.setI(1); //读取 new Thread(() -> { dataContainer.read(); }, "t1").start(); // 第二个线程也去读取 new Thread(() -> { dataContainer.read(); }, "t2").start(); } }
-
package BingFaBianCheng.bingFaBianCheng13.shadow.stampedLock; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; /** * 一个数据容器 * 不支持重入 * 不支持条件 */ @Slf4j(topic = "enjoy") public class DataContainer { int i; long stampw=0l; public void setI(int i) { this.i = i; } private final StampedLock lock = new StampedLock(); //首先 加 StampedLock @SneakyThrows public int read() { //尝试一次乐观读 long stamp = lock.tryOptimisticRead(); log.debug("StampedLock 读锁拿到的戳{}", stamp); //1s之后验戳 //TimeUnit.SECONDS.sleep(1); //验戳 if (lock.validate(stamp)) { //线程安全问题 // 这里怎么都不互斥,因为是无锁和写锁不互斥 log.debug("StampedLock 验证完毕stamp{}, data.i:{}", stamp, i); TimeUnit.SECONDS.sleep(10); return i; } //一定验证失败 log.debug("验证失败 被写线程给改变了{}", stampw); try { //锁的升级 也会改戳 stamp = lock.readLock(); log.debug("升级之后的加锁成功 {}", stamp); TimeUnit.SECONDS.sleep(1); log.debug("升级读锁完毕{}, data.i:{}", stamp, i); return i; } finally { log.debug("升级锁解锁 {}", stamp); lock.unlockRead(stamp); } } @SneakyThrows public void write(int i) { //cas 加鎖 stampw = lock.writeLock(); log.debug("写锁加锁成功 {}", stampw); try { TimeUnit.SECONDS.sleep(5); this.i = i; } finally { log.debug("写锁解锁 {},data.i{}", stampw,i); lock.unlockWrite(stampw); } } }
加写锁
-
package BingFaBianCheng.bingFaBianCheng13.shadow.stampedLock; import java.util.concurrent.TimeUnit; public class StampedLockTest { public static void main(String[] args) throws InterruptedException { //实例化数据容器 DataContainer dataContainer = new DataContainer(); //给了一个初始值 不算写 构造方法赋值 dataContainer.setI(1); //读取 new Thread(() -> { dataContainer.read(); }, "t1").start(); // new Thread(() -> { // dataContainer.read(); // }, "t2").start(); TimeUnit.SECONDS.sleep(3); new Thread(() -> { dataContainer.write(9); }, "t2").start(); } }
那么StampedLock的性能这么好能否替代ReentrantReadWriteLock ?
- 1、他不支持重入
- 2、不支持条件队列
- 3、存在一定的并发问题
samephore
-
来限制对资源访问的线程的上限,而不是资源的上线,除非线程和资源是一一对应的;好比洗浴店里面的手牌,比如你进去一个洗浴店里,服务生首先会给你一个手牌;如果手牌没有了你则需要去喝茶等待;等他其他问洗完你才可以去享受服务;手牌相当于你一个许可;你去享受服务的时候先要获取手牌,服务完成之后需要归还手牌;
-
比如mysql的连接池就可以,一次只能拿到1个连接;Tomcat就不行,一次可以拿到多个资源
示例
-
package BingFaBianCheng.bingFaBianCheng13.shadow.semaphore; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 来限制对资源访问的线程上线 */ @Slf4j(topic = "enjoy") public class SemaphoreTest { public static void main(String[] args) { //每次访问的线程上限是3 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 15; i++) { new Thread(() -> { try { // 获取许可 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("start..."); // 睡眠1秒钟 // 每一秒只有3个许可 TimeUnit.SECONDS.sleep(1); log.debug("end..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放 +(之前释放都是 -) semaphore.release(); } }).start(); } } }
获取许可
- 上面前三个线程都会顺利拿到锁,因为tryAcquireShared(arg) >=0
- 第四个线程来了,不满足上面的条件,会执行doAcquireSharedInterruptibly(arg)
- 形成aqs队列
解释
-
Node head ->{ thread=null prev=null next=t4 ws=-1} Node tail ->{ thread=t4 pre=head next=null ws=0} exclusiveOwnerThread:t1、t2、t3 state:0
释放许可
- 先执行tryReleaseShared方法,取出state,通过cas加1,如果成功,则释放成功
- 如果上一步释放锁成功,则继续执行doReleaseShared方法,唤醒aqs队列
总结
- samephore给这把锁的state赋了一个初值,限定了同时可以获取锁的线程数目(同时持有并且都没是方法)
- ReentrantLock初始值是0,加锁成功会加1,1表示的是锁被占有,刚好与samephore相反
CountDownLatch
- 倒计时锁;某个线程x等待倒计时为0的时候才执行;所谓的倒计时其实就是一个int类型的变量,在初始化CountDownLatch的时候会给他一个初始值(程序员定的);在多线程工作的时候可以通过countDown()方法来对计数器-1;当等于0的时候x则会解阻塞运行
基本语法
-
//初始化对象,给一个初始值 CountDownLatch latch = new CountDownLatch(3); //x线程 调用await阻塞 等待计数器为0的时候才会解阻塞 latch.await(); //其他线程调用countDown();对计数器-1 latch.countDown();
通过new thread方法获取新线程
-
package BingFaBianCheng.bingFaBianCheng13.shadow.countDownLatch; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j(topic = "enjoy") public class CountDownLatchTest1 { public static void main(String[] args) throws InterruptedException { // ExecutorService executorService = Executors.newFixedThreadPool(3); // executorService //计数器=3 CountDownLatch latch = new CountDownLatch(3); Thread thread = new Thread(() -> { log.debug("t1 thread start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //t1 把计数器-1 latch.countDown(); log.debug("t1 thread end;count[{}]", latch.getCount()); }, "t1"); thread.start(); new Thread(() -> { log.debug("t2 thread start"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.debug("t2 thread end;count[{}]", latch.getCount()); },"t2").start(); new Thread(() -> { log.debug("t3 thread start"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown();//主线程可以执行了 log.debug("t3 thread end;count[{}]", latch.getCount()); },"t3").start(); log.debug("main watiing"); latch.await(); log.debug("main wait end..."); } }
countDownLatch和thread.join有什么区别
- thread.join阻塞的是执行这句的线程
- 1.前者可以很方便的配合线程池使用,因为实际写代码不可能通过new thread这种方法来创建线程
- 2.join是一定等待线程执行完软方法才解阻塞,而countDownLatch在latch.countDown()方法之后就解阻塞了,该方法之后的代码有可能和主线程同时执行
线程池初始化线程
-
package BingFaBianCheng.bingFaBianCheng13.shadow.countDownLatch; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @Slf4j(topic = "enjoy") public class CountDownLatchTest2 { public static void main(String[] args) throws InterruptedException { //线程池里面创建4个线程 其中三个是计算的 第四个是汇总的 AtomicInteger i= new AtomicInteger(); ExecutorService executorService = Executors.newFixedThreadPool(4,(e)-> new Thread(e,"t"+i.incrementAndGet())); CountDownLatch latch = new CountDownLatch(3); executorService.submit(()->{ log.debug("t1 thread start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.debug("t1 thread end;count[{}]", latch.getCount()); }); executorService.submit(()->{ log.debug("t2 thread start"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.debug("t2 thread end;count[{}]", latch.getCount()); }); executorService.submit(()->{ log.debug("t3 thread start"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); log.debug("t3 thread end;count[{}]", latch.getCount()); }); executorService.submit(()->{ log.debug("t4 watiing"); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("t4 wait end..."); }); executorService.shutdown(); } }
-
线程池使用完之后,要注意shutdown()
-
其实前三个方法内容都是一样的,可以用一个for循环来精简代码
-
//线程池里面创建4个线程 其中三个是计算的 第四个是汇总的 AtomicInteger i= new AtomicInteger(); // 通过ThreadFactory来执行线程中线程的名字 ExecutorService executorService = Executors.newFixedThreadPool(4,(e)->{ return new Thread(e,"t"+i.incrementAndGet()); });
模拟四个子线程执行不同的工作,主线程等所有子线程执行完后才继续往下执行
-
package BingFaBianCheng.bingFaBianCheng13.shadow.countDownLatch; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @Slf4j(topic = "enjoy") public class CountDownLatchTest3 { public static void main(String[] args) throws InterruptedException { //线程池里面创建3个线程 List<String> list = new ArrayList<>(); list.add("Angel"); list.add("baby"); list.add("rose"); list.add("joyce"); AtomicInteger i= new AtomicInteger(); ExecutorService executorService = Executors.newFixedThreadPool(4,(runnable)->{ //技师的名字 return new Thread(runnable,list.get(i.getAndIncrement())); }); //让你先去沙发上休息 CountDownLatch latch = new CountDownLatch(4); Random random = new Random(); for (int j = 0; j <4 ; j++) { //new 4个线程 并发执行 int temp =j; executorService.submit(()->{ //k标识的是准备进度 直到准备到100% 才开始服务 这个时间每个技师不固定 因为是random for (int k = 0; k <100 ; k++) { try { //模拟每一个技师准备的时间 TimeUnit.MILLISECONDS.sleep(random.nextInt(200)); } catch (InterruptedException e) { e.printStackTrace(); } String name = Thread.currentThread().getName(); name=name+"("+k+"%)";//angel(3%) baby(10%) ... list.set(temp,name); System.out.print("\r"+Arrays.toString(list.toArray())); } //某个人准备好了 latch.countDown(); }); } latch.await(); System.out.println("\n 登上人生巅峰..."); executorService.shutdown(); } }
CyclicBarrier
- cyclicBarrier 重复栅栏(CountDownLatch),语法和CountDownLatch差不多,但是可重复
基本语法
-
//初始化一个cyclicBarrier 计数器为2 CyclicBarrier cyclicBarrier = new CyclicBarrier(2); //阻塞 计数器不为0的时候并且会把计数器-1 cyclicBarrier.await();
与countDownlatch区别
-
1.需要cyclicBarrier管理的代码块是可以重复执行的,而countDownLatch想重复执行,必须new多个相同的countDownLatch
-
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{ log.debug("t1 t2 end"); });
2.CyclicBarrier第二个参数的方法会在CyclicBarrier等于0后执行,起到汇总的作用
示例代码1
-
package BingFaBianCheng.bingFaBianCheng13.shadow.cyclic; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @Slf4j(topic = "enjoy") public class CyclicBarrierTest { public static void main(String[] args) { AtomicInteger i= new AtomicInteger(); CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{ log.debug("t1 t2 end"); }); ExecutorService service = Executors.newFixedThreadPool(2); for (int j = 0; j <2 ; j++) { service.submit(()->{ log.debug("start"); try { TimeUnit.SECONDS.sleep(1); log.debug("working"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } }); service.submit(()->{ log.debug("start"); try { TimeUnit.SECONDS.sleep(3); log.debug("working"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } }); } service.shutdown(); } }