欢迎光临
我们一直在努力

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

【注】:SynchronousQueue实现算法看的晕乎乎的,写了好久才写完,如果当中有什么错误之处,忘各位指正

作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:

  1. SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
  2. 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
  3. SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
  4. 若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)。

SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。

SynchronousQueue

与其他BlockingQueue一样,SynchronousQueue同样继承AbstractQueue和实现BlockingQueue接口:

public class SynchronousQueue<E> extends AbstractQueue<E>
 implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue提供了两个构造函数:

 public SynchronousQueue() {
 this(false);
 }
 public SynchronousQueue(boolean fair) {
 // 通过 fair 值来决定公平性和非公平性
 // 公平性使用TransferQueue,非公平性采用TransferStack
 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
 }

TransferQueue、TransferStack继承Transferer,Transferer为SynchronousQueue的内部类,它提供了一个方法transfer(),该方法定义了转移数据的规范,如下:

 abstract static class Transferer<E> {
 abstract E transfer(E e, boolean timed, long nanos);
 }

transfer()方法主要用来完成转移数据的,如果e != null,相当于将一个数据交给消费者,如果e == null,则相当于从一个生产者接收一个消费者交出的数据。

SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是通过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着非常重要的作用,SynchronousQueue的put、take操作都是委托这两个类来实现的。

TransferQueue

TransferQueue是实现公平性策略的核心类,其节点为QNode,其定义如下:

 static final class TransferQueue<E> extends Transferer<E> {
 /** 头节点 */
 transient volatile QNode head;
 /** 尾节点 */
 transient volatile QNode tail;
 // 指向一个取消的结点
 //当一个节点中最后一个插入时,它被取消了但是可能还没有离开队列
 transient volatile QNode cleanMe;
 /**
 * 省略很多代码O(∩_∩)O
 */
 }

在TransferQueue中除了头、尾节点外还存在一个cleanMe节点。该节点主要用于标记,当删除的节点是尾节点时则需要使用该节点。

同时,对于TransferQueue需要注意的是,其队列永远都存在一个dummy node,在构造时创建:

 TransferQueue() {
 QNode h = new QNode(null, false); // initialize to dummy node.
 head = h;
 tail = h;
 }

在TransferQueue中定义了QNode类来表示队列中的节点,QNode节点定义如下:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

上面代码没啥好看的,需要注意的一点就是isData,该属性在进行数据交换起到关键性作用,两个线程进行数据交换的时候,必须要两者的模式保持一致。

TransferStack

TransferStack用于实现非公平性,定义如下:

 static final class TransferStack<E> extends Transferer<E> {
 static final int REQUEST = 0;
 static final int DATA = 1;
 static final int FULFILLING = 2;
 volatile SNode head;
 /**
 * 省略一堆代码 O(∩_∩)O~
 */
 }

TransferStack中定义了三个状态:REQUEST表示消费数据的消费者,DATA表示生产数据的生产者,FULFILLING,表示匹配另一个生产者或消费者。任何线程对TransferStack的操作都属于上述3种状态中的一种(对应着SNode节点的mode)。同时还包含一个head域,表示头结点。

内部节点SNode定义如下:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

上面简单介绍了TransferQueue、TransferStack,由于SynchronousQueue的put、take操作都是调用Transfer的transfer()方法,只不过是传递的参数不同而已,put传递的是e参数,所以模式为数据(公平isData = true,非公平mode= DATA),而take操作传递的是null,所以模式为请求(公平isData = false,非公平mode = REQUEST),如下:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

公平模式

公平性调用TransferQueue的transfer方法:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

整个transfer的算法如下:

  1. 如果队列为null或者尾节点模式与当前节点模式一致,则尝试将节点加入到等待队列中(采用自旋的方式),直到被匹配或、超时或者取消。匹配成功的话要么返回null(producer返回的)要么返回真正传递的值(consumer返回的),如果返回的是node节点本身则表示当前线程超时或者取消了。
  2. 如果队列不为null,且队列的节点是当前节点匹配的节点,则进行数据的传递匹配并返回匹配节点的数据
  3. 在整个过程中都会检测并帮助其他线程推进

当队列为空时,节点入列然后通过调用awaitFulfill()方法自旋,该方法主要用于自旋/阻塞节点,直到节点被匹配返回或者取消、中断。

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

在自旋/阻塞过程中做了一点优化,就是判断当前节点是否为对头元素,如果是的则先自旋,如果自旋次数过了,则才阻塞,这样做的主要目的就在如果生产者、消费者立马来匹配了则不需要阻塞,因为阻塞、唤醒会消耗资源。在整个自旋的过程中会不断判断是否超时或者中断了,如果中断或者超时了则调用tryCancel()取消该节点。

tryCancel

 void tryCancel(Object cmp) {
 UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
 }

取消过程就是将节点的item设置为自身(itemOffset是item的偏移量)。所以在调用awaitFulfill()方法时,如果当前线程被取消、中断、超时了那么返回的值肯定时S,否则返回的则是匹配的节点。如果返回值是节点S,那么if(x == s)必定成立,如下:

 Object x = awaitFulfill(s, e, timed, nanos);
 if (x == s) { // wait was cancelled
 clean(t, s);
 return null;
 }

如果返回的x == s成立,则调用clean()方法清理节点S:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

这个clean()方法感觉有点儿难度,我也看得不是很懂。这里是引用http://www.jianshu.com/p/95cb570c8187

  1. 删除的节点不是queue尾节点, 这时 直接 pred.casNext(s, s.next) 方式来进行删除(和ConcurrentLikedQueue中差不多)
  2. 删除的节点是队尾节点
  • 此时 cleanMe == null, 则 前继节点pred标记为 cleanMe, 为下次删除做准备
  • 此时 cleanMe != null, 先删除上次需要删除的节点, 然后将 cleanMe至null, 让后再将 pred 赋值给 cleanMe

非公平模式

非公平模式transfer方法如下:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

整个处理过程分为三种情况,具体如下:

  1. 如果当前栈为空获取节点模式与栈顶模式一样,则尝试将节点加入栈内,同时通过自旋方式等待节点匹配,最后返回匹配的节点或者null(被取消)
  2. 如果栈不为空且节点的模式与首节点模式匹配,则尝试将该节点打上FULFILLING标记,然后加入栈中,与相应的节点匹配,成功后将这两个节点弹出栈并返回匹配节点的数据
  3. 如果有节点在匹配,那么帮助这个节点完成匹配和出栈操作,然后在主循环中继续执行

当节点加入栈内后,通过调用awaitFulfill()方法自旋等待节点匹配:

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

 

awaitFulfill()方法会一直自旋/阻塞直到匹配节点。在S节点阻塞之前会先调用shouldSpin()方法判断是否采用自旋方式,为的就是如果有生产者或者消费者马上到来,就不需要阻塞了,在多核条件下这种优化是有必要的。同时在调用park()阻塞之前会将当前线程设置到S节点的waiter上。匹配成功,返回匹配节点m。

shouldSpin()方法如下:

 boolean shouldSpin(SNode s) {
 SNode h = head;
 return (h == s || h == null || isFulfilling(h.mode));
 }

同时在阻塞过程中会一直检测当前线程是否中断了,如果中断了,则调用tryCancel()方法取消该节点,取消过程就是将当前节点的math设置为当前节点。所以如果线程中断了,那么在返回m时一定是S节点自身。

 void tryCancel() {
 UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
 }

awaitFullfill()方法如果返回的m == s,则表示当前节点已经中断取消了,则需要调用clean()方法,清理节点S:

 void clean(SNode s) {
 // 清理item域
 s.item = null;
 // 清理waiter域
 s.waiter = null;
 // past节点
 SNode past = s.next;
 if (past != null && past.isCancelled())
 past = past.next;
 // 从栈顶head节点,取消从栈顶head到past节点之间所有已经取消的节点
 // 注意:这里如果遇到一个节点没有取消,则会退出while
 SNode p;
 while ((p = head) != null && p != past && p.isCancelled())
 casHead(p, p.next); // 如果p节点已经取消了,则剔除该节点
 // 如果经历上面while p节点还没有取消,则再次循环取消掉所有p 到past之间的取消节点
 while (p != null && p != past) {
 SNode n = p.next;
 if (n != null && n.isCancelled())
 p.casNext(n, n.next);
 else
 p = n;
 }
 }

clean()方法就是将head节点到S节点之间所有已经取消的节点全部移出。【不清楚为何要用两个while,一个不行么】

至此,SynchronousQueue的源码分析完成了,说下我个人感觉吧:个人感觉SynchronousQueue实现好复杂(可能是自己智商不够吧~~~~(>_<)~~~~),源码看了好久,这篇博客写了将近一个星期,如果有什么错误之处,烦请各位指正!!

 收藏 (0) 打赏

您可以选择一种方式赞助本站

支付宝扫一扫赞助

微信钱包扫描赞助

未经允许不得转载:英协网 » 【死磕Java并发】—–J.U.C之阻塞队列:SynchronousQueue

分享到: 生成海报
avatar

热门文章

  • 评论 抢沙发

    • QQ号
    • 昵称 (必填)
    • 邮箱 (必填)
    • 网址

    登录

    忘记密码 ?

    切换登录

    注册

    我们将发送一封验证邮件至你的邮箱, 请正确填写以完成账号注册和激活