高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现

在Java并发编程中,我们大多数时候所用到的阻塞队列,一般消费者等待生产者提供数据,消费者可能阻塞而生产者一般不阻塞(除非队列满)。但是,在有的时候,我们希望生产者在把任务成功交至消费者之前也阻塞,其目的可能是为了保证任务数据的安全转移、防止丢失等。因为,如果我们使用生产者不阻塞的队列,那么生产者把任务放进队列后就直接返回,消费者过一段时间再来获取,如果在这段时间内,系统宕机或者被强制重启(例如线程池shutdownNow),那么队列中的数据就会丢失。如果生产者事先没有备份,那么这些数据就会永久丢失。生产者把数据放入阻塞队列,和消费者从队列取出数据,这两个操作之间存在一段时间,在这段时间内数据可能丢失。

高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现
使用SynchronousQueue(或者TransferQueue)就可以避免这个问题,因为它会保证任务数据在生产者和消费者之间的直接转移,不再经过阻塞队列。然而,目前我们在网上几乎找不到几篇解析SynchronousQueue使用场景及其实现原理的文章,也没有文章讲清这种阻塞队列的用途。因此,本文将分析SynchronousQueue的使用方法及实现原理,值得一看,或许能为你设计高度数据安全的系统提供启发和参考。

高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现
注意,SynchronousQueue实现了生产者和消费者线程在执行时间上的对齐(如上图所示)。怎么理解?例如有一个Runnable的任务,它一开始在线程A中运行,后来借助SynchronousQueue转移到了线程B,那么这个Runnabled的run方法中计时操作就不包含在队列中进行等待的时间(包含阻塞等待的时间)。

  1. SynchronousQueue阻塞队列
    JDK并发包提供了五花八门的阻塞队列,包括ArrayBlockingQueue(有界阻塞队列)、LinkedBlockingQueue(无界阻塞队列)、PriorityBlockingQueue(优先级阻塞队)等。其中,SynchronousQueue(同步阻塞队列)是比较特殊的一种阻塞队列,特点如下:

队列存储空间始终等于0(1个元素也不保存)。
当没有消费者时,生产者往队列放数据会阻塞。
当没有生产者时,消费者从队列取数据会阻塞。
上面第1个特征,在介绍线程池的时候提到过,容量一定是0,只有等消费者到来之后,生产者才可以把数据放进去。而一旦放进去,就立即被消费者取走。如果你对线程池的使用不太熟悉,请看线程池使用方法1和线程池使用方法2,如果对线程池的底层实现感兴趣,想要碾压某些面试官,请看线程池实现原理详解。

如果生产者和消费者两者中只有一个操作队列(put或take),则会阻塞;只有当生产者调用put且消费者调用take,形成一条同步的连接,才会继续往下执行。从最终效果看,对象从生产者转移到消费者线程,相当于跨线程同步执行。

高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现
2. SynchronousQueue的实现原理
虽然SynchronousQueue容量始终为0,但是其内部有数据结构,用来临时存放被阻塞的生产者/消费者线程。这个数据结构就是dual stack(非公平模式)或dual queue(公平模式)。当生产者往队列提交数据(put操作)而此时没有消费者时,就往这个dual中放入一个"data"节点;当消费者从队列取数据(take操作)而此时没有生产者时,就往这个dual中放入一个"request"节点。"data"节点与"request"节点互相抵消(或称为匹配,match)。某种程度上,类似于表达式匹配问题的左右括号匹配。

高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现
3. 非公平模式的dual stack算法
dual stack其实就是用一个栈,保存等待线程。至于为什么叫dual stack请继续看下文。

当生产者执行put或消费者执行take时,

1)如果此时栈为空或栈顶已经存在同类节点,则创建节点(put对应"data"节点,take对应"request"节点)并阻塞等待匹配。

高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现
2)如果此时栈顶存在一个匹配节点(put操作遇到"request"节点,或take操作遇到"data"节点),那么把新节点标记为fulfilling,然后入栈,把新节点放到原来等待匹配节点的match字段中,唤醒等待线程,然后从栈弹出两个节点(这就是为什么叫dual stack)。

高度可靠系统,数据不再丢失?SynchronousQueue使用场景及实现
3)如果此时栈顶已经是一个被标记了fulfilling的节点(其他线程放进去的),那么协助执行步骤2的"唤醒所对应线程,然后从栈弹出两个节点"操作(与步骤2操作相同)。

以上3个步骤就是dual stack算法实现无锁TransferQueue的步骤。实现代码中还加入了一些自旋、判断是否取消、计时等操作细节,这些操作是插入到以上3个步骤的小代码段,比较琐碎,就不分析了。

  1. 公平模式的dual queue算法
    dual queue算法与dual stack算法比较接近,但是更简单一些,执行步骤如下:

当生产者执行put或消费者执行take时,

1) 队列为空或队列头部不是匹配节点,在队列尾部插入当前操作所对应的节点(put操作对应"data"节点,take操作对应"request"),阻塞等待匹配。

2) 队列头部是匹配节点,则取出头部节点,唤起等待线程;

心得:大牛总喜欢在代码中构造很多"巧妙之处"。例如,执行到某个地方,仅做一个简单判断,"正好"把各种情况封装成一个非常简洁的判断,设计得非常优雅,有数学上的美感,值得学习。

补充一下,之前的文章可能对某些分析得不够深入,我可能会更加深入分析,如果你感兴趣的话,请及时本关注公众号。感谢支持,祝工作顺利,面试成功!
下面是我整理了一份2018年最全面的java架构的学习资料和总结多年经验开发的架构师知识体系送给各位小伙伴们。
在这里插入图片描述

欢迎工作一到五年的Java工程师朋友们加入Java高级互联网架构:714526711
提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

猜你喜欢

转载自blog.csdn.net/javaxueyuan_yezi/article/details/88725160