AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
主要原理图如下:
1.1 AQS内部类 (数据结构–Node)先来看下AQS中最基本的数据结构—Node,Node即为上面CLH变体队列中的节点。 解释一下几个方法和属性值的含义:
两种锁模式:
//指示节点在共享模式下等待的标记 static final Node SHARED = new Node(); //指示节点以独占模式等待的标记 static final Node EXCLUSIVE = null;
waitStatus有下面几个枚举值:
首先我们先想到是获取锁:
- 尝试获取锁,不过有没有获取到,立即返回是否获取的标识
- 必须获取锁,如果锁被占用,进行等待
AQS有以下两个方法对应上述两种操作分别是tryAcquire和acquire,下面以独占方式获取锁为例 :
// 该方法必须被子类重写 //以独占的方式获取锁,成功返回true 失败返回false protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } // final 修饰,子类不可修改。 这是独占式获取锁的主逻辑 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }tryAcquire
tryAcquire是一个被protected修饰的方法,参数是int值,代表对int state 的增加操作,返回值是boolean,代表是否成功获得锁。 在子类自己实现中,需要更新 同步状态state和当前锁的持有线程exclusiveOwnerThread。 tryAcquire必须由子类实现,这样可以在获取锁的时候加上自己的业务逻辑,比如是否“可重入”等。 tryAcquire返回true,线程获得锁,此时可以对相应的共享资源进行操作,使用完释放。返回false,上层逻辑上不想等待锁,可以自己进行处理。 如果想要等待锁,可以直接调用acquire方法,该方法封装了复杂的排队处理逻辑,非常易用。
acquireacquire方法 被final修饰,不能overwrite,道哥看起来对自己的等待并获取锁 很自信哈。
if判断条件包含了两部分: !tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
!tryAcquire 为false 表示获取锁成功,无需参与排队。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 嵌套了addWaiter(Node.EXCLUSIVE),我们先看addWaiter
addwaiterprivate Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { ---1 pred.next = node; ---2 return node; ---3 } } enq(node); ---4 return node; }
顾名思义,这个方法的作用就是将当前线程封装成一个Node,然后加入等待队列,返回值即为该Node。 逻辑是新建一个Node对象,然后插入队尾。我们多线程场景下,假设存在多个线程调用addWait方法。
新建pred节点引用,指向当前的尾节点,如果尾节点不为空,进行下面三步操作: 1.将当前节点的prev指针指向pred节点(尾节点) 2.尝试通过CAS操作将当前节点置为尾节点 a. 如果返回false,说明pred节点已经不是尾节点了,其他线程修改了尾节点,退出判断,执行enq方法,准备重新入队。 b.如果返回true,CAS操作成功,pred为尾节点,CAS使得当前节点成为尾节点。那么需要将pred的next指针指向当前节点。这一步是不会存在线程安全的,因为其他节点不会操作pred节点了。
多线程环境下执行,容易迷糊的细节,也是理解该方法的重点。 1.程序执行到代码中 1 时,pred引用的对象很可能已经不是尾节点了,所以CAS失败; 2. 如果CAS成功,代码块2,3执行时不再具有原子性,但不会有线程不安全,此时pred节点和当前节点的相对位置已确定,其他线程只能在新的尾节点后插入; 3. 需要注意的是,当前后两个节点建立连接的时候,首先是后节点的pre指针指向前节点,当后节点成为尾节点后,前节点的next才会指向后节点。
如果理解了这些,什么情况下会执行到代码块4呢?
- 队列是空的
- 快速插入失败,需要完整的插入流程,也就是代码1处执行失败。
看看enq 完整的插入流程。
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
逻辑是:最外层是死循环,首先判断当前队列是不是空(tail==null),先初始化 头尾指针指向同一个节点,也就是第一个节点(CLH队列需要一个虚拟头结点启动,在第一次入队时构造该节点并设置头尾指针),然后尾节点插入,失败了不断重试CAS。
现在等待队列入队完成了,怎么出队获取锁的权限呢?我们接着来看acquireQueued源码
acquireQueuedfinal boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
逻辑是: 定义一个获取锁的标识符failed,默认是失败的。定义一个中断的标识符interrupted ,默认是非中断的。 死循环中逻辑: 1.predecessor方法拿到当前节点的前置节点,为空抛空指针异常; 2.当head指针指向前置节点,说明当前节点有权限去竞争锁了,这是一种约定。如果获取锁成功,将head指针指向当前节点,将前置节点的next指针置为null,利于GC回收,将获取锁的标识符置为成功,返回未中断的标识符。 3.不满足2中的条件则判断shouldParkAfterFailedAcquire和parkAndCheckInterrupt,看函数名,首先判断是否挂起等待,需要就挂起,并且判断外部是否调用线程中断;如果不需要,继续尝试获取锁。 4.finally块,判断是否竞争锁失败,如果是,取消当前节点的获取锁的行为,并从队列移出。 5.返回中断的标识符,在上层acquire方法判断是否中断,来选择是否调用当前线程中断,这里属于一种延迟中断机制。
这里有三点注意:
- 一个约定:head节点代表当前持有锁的节点。若当前节点的前置节点是head,那么该节点就开始自选的获取锁。一旦head节点释放,当前节点就能第一时间获取到
- shouldParkAfterFailedAcquire和parkAndCheckInterrupt 的具体细节。
- nterrupted变量最终被返回出去后,上层acquire判断该值,来选择是否调用线程中断。
接下来我们看看上面2中的两个方法。
shouldParkAfterFailedAcquire该方法的作用是 没有竞争锁权限的 或者 竞争失败的节点,是否应该被挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
逻辑是: 1.若前置节点pred的waitstatus是signal,说明pred节点也在等待锁,并且之后会唤醒当前节点,所以当前节点可以挂起休息,返回true。 2.如果前置节点ws>0,说明pred节点是cancelled,将其从队列移出,通过从后向前遍历,将pred指向遍历中第一个状态是非cancel的节点,相当于链式删除被cancel的节点,然后返回false,代表当前节点不需要被挂起,因为pred指向新的Node,需要重试外层逻辑。 3.除此之外ws还有两种可能,0和Propagate,为什么不可能是condition,因为waitstatus只有在其他条件模式下,才会被修改为condition,这里不会出现。 并且只有在共享模式下,才可能出现waitStatus为propagate,暂时不用管。那么在独占模式下,ws在这里只会出现 0的情况,代表pred出于初始化默认状态,通过CAS修改pred的状态为signal,然后返回false,重试外层逻辑。
这个方法涉及到对Node的waitstatus修改,相比比较关键。
如果shouldParkAfterFailedAcquire返回false,进行下一轮重试;如果返回true,代表当前节点需要被挂起,则执行parkAndCheckInterrupt方法。
parkAndCheckInterruptprivate final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
LockSupport.park(this)本质时通过Unsafe下的native方法调用操作系统原语将当前线程挂起。 此时当前Node中的线程将阻塞在此处,知道持有锁的线程调用release方法,唤醒后续节点。
Thread.interrupted() 有什么作用呢? 这是因为在线程挂起期间,线程可能被中断,park期间无法响应中断,所以只有当线程唤醒时,检查park期间是否被调用中断,有的话,传递出去,通过外层accquire 来响应中断。
总结以下,通过accquireQueued这个方法,我们可以明白,如果当前线程所在的系欸但处于头节点后一个,那么它将不断尝试自旋拿锁,直到成功,否则进行判断,是否需要挂起。这样就能保证head之后的一个节点在自旋CAS获取锁,其他线程在挂起或正在被挂起。这样最大限度避免无用的自旋消耗CPU。
上面我们讲到 大量线程被挂起,那么就会有被唤醒的时机。也提到,当持有所得线程释放了锁,会唤醒后续节点。 我们来看看 release方法。
releasepublic final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
和tryAcquire一样,tryRelease也是AQS开放给上层自由实现的抽象方法。 在release中,假如尝试释放锁成功,下一步就要唤醒等待队列的其他节点,这里我们重点看下unparkSucessor这个方法。参数是head Node。
unparkSucessorprivate void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
首先拿到head节点的waitStatus,如果不为0,将其置为0,表示锁释放。接下来后续指针为null,或者处于canceled状态,那么从后向前遍历,找到除了head的最靠前的非canceled状态的node,对其unpark,让其尝试拿锁。
这里注意为什么要从尾节点向前遍历,而不是从前往后?
- 因为我们在addWaiter方法中,是先将后节点的prev指针指向前节点;然后原子操作将尾指针指向新插入的节点。然后才将前节点,next指针指向当前节点 操作不是原子性的。
- 新插入的节点是先感知到前节点的,如果从前往后遍历,可能前节点的next 还为建立好,遍历就结束了。
这时,拿锁 、挂起 、释放、唤醒 都能够有条不紊,且高效的运行。
2.同步状态—state同步状态State来控制整体可重入的情况。State是Volatile修饰的,用于保证一定的可见性和有序性。
private volatile int state;
State这个字段主要的过程:
- State初始化的时候为0,表示没有任何线程持有锁。
- 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
- 解锁也是对这个字段-1,一直到0,此线程对锁释放。
- 共享锁是初始化state为n,每次获取锁-1,直到减到0,其他线程阻塞。
下面提供了几个访问这个字段的方法:
这几个方法都是Final修饰的,说明子类中无法重写它们。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。 对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是AQS架构图中的第一层:API层。
二.AQS在JUC中应用 1.ReentrantLock 引申:Java AQS 核心数据结构-CLH 锁 从ReentrantLock的实现看AQS的原理及应用
1.《JUC核心--AQS》援引自互联网,旨在传递更多网络信息知识,仅代表作者本人观点,与本网站无关,侵删请联系页脚下方联系方式。
2.《JUC核心--AQS》仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证。
3.文章转载时请保留本站内容来源地址,https://www.lu-xu.com/jiaoyu/2371955.html