AbsractQueuedSynchronizer - AQS
谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronized(AQS)!
本文介绍CountDownLatch、Semaphore、CyclicBarrier的原理和用法。
它的两种底层数据结构如下:
使用Node实现FIFO队列,可以用于构建锁和或者其他同步装置的基础框架
利用了一个int类型表示状态
使用方法是继承
子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操纵状态
可以同时实现排他锁和共享锁模式(独占、共享)
CountDownLatch
共享锁CountDownLatch
它是java5中新增的一个并发工具类,其使用非常简单,下面通过伪代码简单看一下使用方式:
这是一个使用CountDownLatch非常简单的例子,创建的时候,需要指定一个初始状态值,本例为2,主线程调用 latch.await时,除非latch状态值为0,否则会一直阻塞休眠。当所有任务执行完后,主线程唤醒,最终执行打印动作。
以上只是一个最简单的例子,接着咱们再来看一个,这回,咱们想要在任务执行完后做更多的事情,如下图所示:
这一次,在线程3和线程4中,分别调用了latch.await(),当latch状态值为0时,这两个线程将会继续执行任务,但是顺序性是无法保证的。
CountDownLatch的方便之处在于,你可以在一个线程中使用,也可以在多个线程上使用,一切只依据状态值,这样便不会受限于任何的场景。
我们以CountDownLatch第二个例子作为案例来分析一下,一开始,我们创建了一个CountDownLatch实例。
此时,AQS中,状态值state=2,对于 CountDownLatch 来说,state=2表示所有调用await方法的线程都应该阻塞,等到同一个latch被调用两次countDown后才能唤醒沉睡的线程。接着线程3和线程4执行了 await方法,这会的状态图如下:
注意,上面的通知状态是节点的属性,表示该节点出队后,必须唤醒其后续的节点线程。当线程1和线程2分别执行完latch.countDown方法后,会把state值置为0,此时,通过CAS成功置为0的那个线程将会同时承担起唤醒队列中第一个节点线程的任务,从上图可以看出,第一个节点即为线程3,当线程3恢复执行之后,其发现状态值为通知状态,所以会唤醒后续节点,即线程4节点,然后线程3继续做自己的事情,到这里,线程3和线程4都已经被唤醒,CountDownLatch功成身退。
上面的流程,如果落实到代码,把 state置为0的那个线程,会判断head指向节点的状态,如果为通知状态,则唤醒后续节点,即线程3节点,然后head指向线程3节点,head指向的旧节点会被删除掉。当线程3恢复执行后,发现自身为通知状态,又会把head指向线程4节点,然后删除自身节点,并唤醒线程4。
这里可能读者会有个疑问,线程节点的状态是什么时候设置上去的。其实,一个线程在阻塞之前,就会把它前面的节点设置为通知状态,这样便可以实现链式唤醒机制了。
其基本用法的代码如下:
package concurrency.example.AQS; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* *Created by William on 2018/5/3 0003 */ @Slf4j public class CountDownLatchExample1 { private static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int thredNum = i; executorService.execute(() -> { try { test(thredNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await();//也可以设置时间超时,超过时间就不再等待 log.info("finish"); executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
Semaphore
Semaphore又称信号量,是操作系统中的一个概念,在Java并发编程中,信号量控制的是线程并发的数量。
所以简单来讲,Semaphore可以控制线程并发的数量。
举例如下,定一个线程池有200个线程同时执行,但Semphore规定线程同时并发数量为3.
package concurrency.example.AQS; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /* *Created by William on 2018/5/3 0003 */ @Slf4j public class SemaphoreExample1 { private static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int thredNum = i; executorService.execute(() -> { try { semaphore.acquire();//获取一个许可 test(thredNum); semaphore.release();//释放一个许可 } catch (Exception e) { log.error("exception", e); } }); } executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(1000); log.info("{}", threadNum); } }
输出结果如下:
15:00:54.824 [pool-1-thread-3] INFO concurrency.example.AQS.SemaphoreExample1 - 2 15:00:54.824 [pool-1-thread-1] INFO concurrency.example.AQS.SemaphoreExample1 - 0 15:00:54.824 [pool-1-thread-2] INFO concurrency.example.AQS.SemaphoreExample1 - 1 15:00:55.845 [pool-1-thread-4] INFO concurrency.example.AQS.SemaphoreExample1 - 3 15:00:55.845 [pool-1-thread-5] INFO concurrency.example.AQS.SemaphoreExample1 - 4 15:00:55.845 [pool-1-thread-6] INFO concurrency.example.AQS.SemaphoreExample1 - 5 15:00:56.845 [pool-1-thread-7] INFO concurrency.example.AQS.SemaphoreExample1 - 6 15:00:56.845 [pool-1-thread-9] INFO concurrency.example.AQS.SemaphoreExample1 - 8 15:00:56.845 [pool-1-thread-8] INFO concurrency.example.AQS.SemaphoreExample1 - 7 15:00:57.845 [pool-1-thread-11] INFO concurrency.example.AQS.SemaphoreExample1 - 10 15:00:57.845 [pool-1-thread-12] INFO concurrency.example.AQS.SemaphoreExample1 - 11 15:00:57.847 [pool-1-thread-10] INFO concurrency.example.AQS.SemaphoreExample1 - 9 15:00:58.847 [pool-1-thread-14] INFO concurrency.example.AQS.SemaphoreExample1 - 13 15:00:58.847 [pool-1-thread-13] INFO concurrency.example.AQS.SemaphoreExample1 - 12 15:00:58.848 [pool-1-thread-15] INFO concurrency.example.AQS.SemaphoreExample1 - 14 15:00:59.847 [pool-1-thread-16] INFO concurrency.example.AQS.SemaphoreExample1 - 15 15:00:59.848 [pool-1-thread-17] INFO concurrency.example.AQS.SemaphoreExample1 - 16 15:00:59.850 [pool-1-thread-18] INFO concurrency.example.AQS.SemaphoreExample1 - 17 15:01:00.847 [pool-1-thread-19] INFO concurrency.example.AQS.SemaphoreExample1 - 18 15:01:00.853 [pool-1-thread-21] INFO concurrency.example.AQS.SemaphoreExample1 - 20 15:01:00.853 [pool-1-thread-20] INFO concurrency.example.AQS.SemaphoreExample1 - 19由左边的时间可以看出每秒只有三个线程执行,所以Semaphore控制了并发量。
但是当请求线程数实在过高,能不能丢弃部分线程呢,也是可以的,例子如下:
package concurrency.example.AQS; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /* *Created by William on 2018/5/3 0003 */ @Slf4j public class SemaphoreExample2 { private static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int thredNum = i; executorService.execute(() -> { try { if(semaphore.tryAcquire()){//尝试获取一个许可 test(thredNum); semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(1000); log.info("{}", threadNum); } }
输出只有三条:
15:12:36.326 [pool-1-thread-1] INFO concurrency.example.AQS.SemaphoreExample2 - 0 15:12:36.326 [pool-1-thread-2] INFO concurrency.example.AQS.SemaphoreExample2 - 1 15:12:36.326 [pool-1-thread-3] INFO concurrency.example.AQS.SemaphoreExample2 - 2 Process finished with exit code 0
改变semaphore.tryAcquire的参数,意思在3秒内获取许可
if(semaphore.tryAcquire(3,TimeUnit.SECONDS)){ test(thredNum); semaphore.release(); }
结果输出9条:
15:14:12.957 [pool-1-thread-3] INFO concurrency.example.AQS.SemaphoreExample2 - 2 15:14:12.964 [pool-1-thread-2] INFO concurrency.example.AQS.SemaphoreExample2 - 1 15:14:13.062 [pool-1-thread-1] INFO concurrency.example.AQS.SemaphoreExample2 - 0 15:14:14.032 [pool-1-thread-4] INFO concurrency.example.AQS.SemaphoreExample2 - 3 15:14:14.033 [pool-1-thread-6] INFO concurrency.example.AQS.SemaphoreExample2 - 5 15:14:14.064 [pool-1-thread-7] INFO concurrency.example.AQS.SemaphoreExample2 - 6 15:14:15.032 [pool-1-thread-8] INFO concurrency.example.AQS.SemaphoreExample2 - 7 15:14:15.033 [pool-1-thread-10] INFO concurrency.example.AQS.SemaphoreExample2 - 9 15:14:15.064 [pool-1-thread-11] INFO concurrency.example.AQS.SemaphoreExample2 - 10
解释以下,一秒执行三条,不考虑其他因素的情况下3秒可以执行9条,结果就是如此。
CyclicBarrier
CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
CyclicBarrier是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。
CountDownLatch:可以把他理解成倒计时锁;只能使用一次
CyclicBarrier可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。它可以循环使用(Reset重置)
举例:等待5个线程执行完后,才执行await后面的操作,代码如下:
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* *Created by William on 2018/5/3 0003 */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);//等待5个线程执行完后,才执行await后面的操作 public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { Thread.sleep(1000); final int threadNum = i; executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready ", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); } }
输出结果如下,先等待5个ready了,才同时输出5个continue。
15:44:24.141 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample1 - 0 is ready 15:44:25.130 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample1 - 1 is ready 15:44:26.155 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample1 - 2 is ready 15:44:27.160 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample1 - 3 is ready 15:44:28.160 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample1 - 4 is ready 15:44:28.160 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample1 - 4 continue 15:44:28.160 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample1 - 3 continue 15:44:28.160 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample1 - 1 continue 15:44:28.160 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample1 - 0 continue 15:44:28.160 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample1 - 2 continue
15:44:29.160 [pool-1-thread-6] INFO concurrency.example.AQS.CyclicBarrierExample1 - 5 is ready 15:44:30.160 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample1 - 6 is ready 15:44:31.160 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample1 - 7 is ready 15:44:32.160 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample1 - 8 is ready 15:44:33.160 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample1 - 9 is ready 15:44:33.160 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample1 - 9 continue 15:44:33.160 [pool-1-thread-6] INFO concurrency.example.AQS.CyclicBarrierExample1 - 5 continue 15:44:33.160 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample1 - 6 continue 15:44:33.161 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample1 - 7 continue 15:44:33.161 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample1 - 8 continue
还有一种用法,等待5个线程执行完后,先执行Runable里操作,再执行await后面的操作。
package concurrency.example.AQS; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* *Created by William on 2018/5/3 0003 */ @Slf4j public class CyclicBarrierExample3 { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { log.info("callback is ready!!!!"); });//等待5个线程执行完后,先执行Runable里操作,再执行await后面的操作 public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { Thread.sleep(1000); final int threadNum = i; executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready ", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); } }结果如下:
15:57:00.250 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample3 - 0 is ready 15:57:01.203 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample3 - 1 is ready 15:57:02.203 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample3 - 2 is ready 15:57:03.203 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample3 - 3 is ready 15:57:04.203 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample3 - 4 is ready 15:57:04.203 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample3 - callback is ready!!!! 15:57:04.203 [pool-1-thread-5] INFO concurrency.example.AQS.CyclicBarrierExample3 - 4 continue 15:57:04.203 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample3 - 3 continue 15:57:04.203 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample3 - 0 continue 15:57:04.203 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample3 - 1 continue 15:57:04.203 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample3 - 2 continue 15:57:05.203 [pool-1-thread-6] INFO concurrency.example.AQS.CyclicBarrierExample3 - 5 is ready 15:57:06.203 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample3 - 6 is ready 15:57:07.203 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample3 - 7 is ready 15:57:08.203 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample3 - 8 is ready 15:57:09.203 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample3 - 9 is ready 15:57:09.203 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample3 - callback is ready!!!! 15:57:09.203 [pool-1-thread-3] INFO concurrency.example.AQS.CyclicBarrierExample3 - 9 continue 15:57:09.203 [pool-1-thread-6] INFO concurrency.example.AQS.CyclicBarrierExample3 - 5 continue 15:57:09.203 [pool-1-thread-4] INFO concurrency.example.AQS.CyclicBarrierExample3 - 6 continue 15:57:09.203 [pool-1-thread-1] INFO concurrency.example.AQS.CyclicBarrierExample3 - 7 continue 15:57:09.204 [pool-1-thread-2] INFO concurrency.example.AQS.CyclicBarrierExample3 - 8 continue
参考文章:https://blog.csdn.net/yanyan19880509/article/details/52349056