最近一直在研究AQS的源码,希望可以更深刻的理解AQS的实现原理。虽然网上有很多关于AQS的源码分析,但是看完以后感觉还是一知半解。于是,我将自己的整个理解过程记录下来了,希望对大家有所帮助。
AQS是Java中锁的基础,主要由两个队列组成。一个队列是 同步队列 ,另一个是 条件队列 。
head
tail
next
prev
Node
nextWaiter
null
firstWaiter
lastWaiter
signal
signalAll
首先,了解以下同步队列中队列的节点Node的数据结构
static final class Node { /** 共享锁的标识 */ static final Node SHARED = new Node(); /** 排他锁的标识 */ static final Node EXCLUSIVE = null; /** 线程取消 */ static final int CANCELLED = 1; /** 持有锁的线程的后继线程被挂起 */ static final int SIGNAL = -1; /** 条件队列标识 */ static final int CONDITION = -2; /** * 共享锁情况下,通知所有其他节点 */ static final int PROPAGATE = -3; /** * waitStatus的取值如下: * SIGNAL(-1): 当前节点的后继节点应该被挂起 * CANCELLED(1): 当前节点被取消 * CONDITION(-2): 当前节点在条件队列 * PROPAGATE(-3): 释放共享锁时需要通知所有节点 * 0: 初始值 * */ volatile int waitStatus; /** * 前驱节点 */ volatile Node prev; /** * 后继节点 */ volatile Node next; /** * 节点对应的线程 */ volatile Thread thread; /** * 在共享锁的情况下,该节点的值为SHARED * 在排他锁的情况下,该节点的值为EXCLUSIVE * 在条件队列的情况下,链接的是下一个等待条件的线程 */ Node nextWaiter; }
其次,我们来看一下 同步队列 的链表结构
接着,我们根据同步队列的原理来分析以下acquire和release需要做哪些事情:
acquire
release
node
waitStatus
SIGNAL
CANCEL
步骤3
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg):对外提供的一个扩展方法,常用的锁都要实现这个方法,具体实现与锁相关
tryAcquire(arg)
addWaiter(Node.EXCLUSIVE): 创建一个排他锁节点,并将该节点添加到 同步队列 尾部,代码如下:
addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) { // 创建一个node,EXCLUSIVE类型 Node node = new Node(mode); for (;;) { // 获取尾节点 Node oldTail = tail; if (oldTail != null) { // 设置即将成为尾节点的前驱 node.setPrevRelaxed(oldTail); // CAS操作设置尾节点 if (compareAndSetTail(oldTail, node)) { // 将新尾节点的前驱节点与新的尾节点关联起来 oldTail.next = node; // 返回添加的节点 // 这个节点现在不一定是尾节点,因为如果有多个线程调用这个方法时, // 可能还有节点添加在这个节点后面 return node; } } else { // 如果队列为空,初始化头节点 initializeSyncQueue(); } } }
acquireQueued
final boolean acquireQueued(final Node node, int arg) { try { // 线程是否中断 boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 如果前驱节点是头节点,获取锁 if (p == head && tryAcquire(arg)) { // 修改头节点 setHead(node); // 释放头节点的资源 p.next = null; // help GC // 返回线程中断的状态 // 这也是该方法唯一的返回值 // 没有获取锁的线程会一直执行该方法直到获取锁以后再返回 return interrupted; } // 获取锁失败后是否需要将线程挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程挂起并返回是否被中断 interrupted = true; } } catch (Throwable t) { // 取消该节点 cancelAcquire(node); throw t; } }
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 状态已经设置成SIGNAL,可以直接挂起该节点 */ return true; // 节点被取消 if (ws > 0) { /* * 找到pred第一个有效的前驱节点 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // pred可能是一个新的节点,需要将pred的next重写设置为node pred.next = node; } else { /* * CAS操作将pred节点的状态设置为SIGNAL */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } // 只有当pred节点的waitStatus已经是SIGNAL状态时,才可以安全的挂起线程 // 否则需要不能被挂起 return false; }
parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { // 线程挂起 LockSupport.park(this); // 检测线程是否中断 return Thread.interrupted(); }
cancelAcquire
private void cancelAcquire(Node node) { // 如果节点为空,什么都不做 if (node == null) return; // 释放线程 node.thread = null; // 从后往前过滤掉所有的被取消的节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 有效前驱节点的nex节点 Node predNext = pred.next; // 将node设置为CANCELLED node.waitStatus = Node.CANCELLED; // 如果是尾节点,设置新的尾节点 if (node == tail && compareAndSetTail(node, pred)) { // 将新的尾节点的后续设置为null pred.compareAndSetNext(predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; // 如果前驱节点的线程不为null并且waitStatus为SIGNAL if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; // 将node设置成pred的后继节点 if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next); } else { // 唤起node节点的后继节点 // 因为node节点已经释放锁了 unparkSuccessor(node); } node.next = node; // help GC } }
unparkSuccessor
private void unparkSuccessor(Node node) { /* * 获取node节点的waitStatus */ int ws = node.waitStatus; // 用CSA操作将waitStatus设置成初始状态 // 不管设置是否成功,都无所谓,因为该节点即将被销毁 if (ws < 0) node.compareAndSetWaitStatus(ws, 0); /* * 获取node的后继节点 */ Node s = node.next; // 如果后继节点为null或者被取消, // 通过从同步队列的尾节点开始一直往前找到一个有效的后继节点 if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } // 如果后继节点不为空 if (s != null) LockSupport.unpark(s.thread);// 唤醒后继节点的线程 }
与acquire方法类似的还有acquireInterruptibly、tryAcquireNanos、acquireShared、acquireSharedInterruptibly和tryAcquireSharedNanos,我们都一一分析以下
acquireInterruptibly
tryAcquireNanos
acquireShared
acquireSharedInterruptibly
tryAcquireSharedNanos
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 如果线程中断,直接返回 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); // 中断式的获取锁 }
doAcquireInterruptibly
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 创建一个排他节点加入同步队列 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 如果前驱节点是头节点,说明已经获取的锁 if (p == head && tryAcquire(arg)) { // 修改头节点 setHead(node); p.next = null; // help GC return; } // 如果没有获取锁,检测是否需要挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // 如果发现线程已经被中断,需要抛出异常 } } catch (Throwable t) { // 发生异常取消节点 cancelAcquire(node); throw t; } }
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 线程中断直接返回 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); // 超时获取排他锁 }
doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 如果超时直接返回 if (nanosTimeout <= 0L) return false; // 获取超时时长 final long deadline = System.nanoTime() + nanosTimeout; // 添加一个排他节点到同步队列尾部 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 已经获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } nanosTimeout = deadline - System.nanoTime(); // 如果超时了就取消 if (nanosTimeout <= 0L) { cancelAcquire(node); return false; } // 检测节点是否需要被挂起 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) // 如果需要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD // 线程挂起nanosTimeout时间 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { // 发生异常取消节点 cancelAcquire(node); throw t; } }
public final void acquireShared(int arg) { // 对外提供的一个扩展方法,常用的锁都要实现这个方法, // 该方法的实现与锁的用途有关 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); // 获取共享锁 }
doAcquireShared
private void doAcquireShared(int arg) { // 添加一个共享节点到同步队列尾部 final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); if (p == head) { // 返回结果大于等于0表示获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { // 设置头节点并广播通知其他获取共享锁的节点 setHeadAndPropagate(node, r); p.next = null; // help GC // 如果线程被中断,将该线程中断 // 共享锁会被多个线程获取,如果需要中断 // 所有获取共享锁的线程都要被中断 if (interrupted) selfInterrupt(); return; } } // 检测是否需要挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 挂起并中断 interrupted = true; } } catch (Throwable t) { // 发生异常取消节点 cancelAcquire(node); throw t; } }
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // 记录旧的头节点 setHead(node);// 设置新的头节点 /* * 如果头节点为null或者是不是取消状态,尝试唤醒后继节点 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // node节点的next是SHARED,即共享锁 if (s == null || s.isShared()) // 唤起获取共享锁的线程 doReleaseShared(); } }
doReleaseShared
private void doReleaseShared() { /* * 唤醒时是从头节点开始先唤醒第一个共享节点, * 第一个共享节点被唤醒后会在doAcquireShared方法里继续执行(之前就是在这个方法里被挂起的) * 第一个共享节点如果获取锁会调用setHeadAndPropagate方法修改头节点,然后再调用doReleaseShared方法 * 唤醒第二个共享节点,以此类推,最后把所有的共享节点都唤醒 */ for (;;) { Node h = head; if (h != null && h != tail) { // 获取头节点的状态 int ws = h.waitStatus; // 如果头节点是SIGNAL,需要将状态设置为0,表示已经即将被唤醒 if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // 如果失败了说明有其他线程在修改头节点,需要继续重试 unparkSuccessor(h); // 唤醒头节点的后继节点 } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // 将头节点状态从0设置成PROPAGATE,如果失败了继续,因为也有其他获取共享锁的线程在更改头节点 } // 如果头节点未改变(因为没有后继节点需要等待共享锁),跳出循环 if (h == head) break; } }
selfInterrupt
static void selfInterrupt() { Thread.currentThread().interrupt(); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 如果线程被中断抛出异常 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); // 可中断的方式获取共享锁 }
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 添加共享锁节点到同步队列尾部 final Node node = addWaiter(Node.SHARED); try { for (;;) { // 获取前驱节点 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 获取共享锁以后修改头节点,通知其他等待共享锁的节点 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 线程获取共享锁失败后需要挂起,并且发现线程被中断,所以抛出异常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { // 发生异常取消节点 cancelAcquire(node); throw t; } }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) // 线程如果中断了,直接抛出异常 throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); // 超时获取共享锁 }
doAcquireSharedNanos
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { // 超时直接返回 if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; // 添加共享节点到同步队列尾部 final Node node = addWaiter(Node.SHARED); try { for (;;) { // 获取前驱节点 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 获取锁,修改头节点,通知所有其他等待共享锁的节点 setHeadAndPropagate(node, r); p.next = null; // help GC return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) { // 超时取消节点 cancelAcquire(node); return false; } if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) // 如果需要挂起,且超时时长大于SPIN_FOR_TIMEOUT_THRESHOLD // 线程挂起nanosTimeout时间 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); // 中断了抛出异常 } } catch (Throwable t) { // 发生异常取消节点 cancelAcquire(node); throw t; } }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 头节点不能是一个中间态 if (h != null && h.waitStatus != 0) // 唤醒后继节点 unparkSuccessor(h); return true; } return false; }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 释放共享锁,从头节点开始一个一个的释放 // 如果存在多个共享节点在同步队列时,doReleaseShared方式其实是递归调用 doReleaseShared(); return true; } return false; }
至此,将所有获取锁和释放锁的方法相关的源码全部分析完
我们来看一下 条件队列 的链表结构
CONDITION
Condition.await()
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 如果线程中断,直接抛出异常 // 创建一个CONDITION类型的节点,将该节点添加到条件队列尾部 Node node = addConditionWaiter(); // 释放锁 // 在调用await方法之前都会调用lock方法,这个时候已经获取锁了 // 有时候锁还是可重入的,所以需要将所有的资源都释放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 如果节点不再同步队列,全部都要挂起 while (!isOnSyncQueue(node)) { LockSupport.park(this); // 如果在等待期间发生过中断(不管是调用signal之前还是之后),直接退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 让线程尝试去获取锁,如果无法获取锁就挂起 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清除所有在条件队列中是取消状态的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 发生中断,上报中断情况 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter
private Node addConditionWaiter() { Node t = lastWaiter; // 清除条件队列中无效的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 创建一个节点 Node node = new Node(Node.CONDITION); // 添加到条件队列尾部 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
unlinkCancelledWaiters
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; // 遍历条件队列将所有不是CONDITION状态的节点全部清除掉 // 这些节点都是取消状态的节点 while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
fullyRelease
final int fullyRelease(Node node) { try { int savedState = getState(); // 释放所有的资源 // 如果是可重入锁,savedState就是重入的次数 if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { // 发生异常就取消该节点 node.waitStatus = Node.CANCELLED; throw t; } }
isOnSyncQueue
final boolean isOnSyncQueue(Node node) { // waitStatus是CONDITION或者node没有前驱节点,说明node不在同步队列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // 有后继节点一定在同步队列 return true; /* * 在同步队列中查找node,看是否在同步队列中 */ return findNodeFromTail(node); }
findNodeFromTail
private boolean findNodeFromTail(Node node) { // 从尾节点开始查找 for (Node p = tail;;) { if (p == node) // 找到了 return true; if (p == null) // 找到头了还没找到 return false; p = p.prev; } }
checkInterruptWhileWaiting
private int checkInterruptWhileWaiting(Node node) { // 没有发生中断返回0 // 调用signal之前发生中断返回THROW_IE // 调用signal之后发生中断返回REINTERRUPT return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
transferAfterCancelledWait
// 只有线程处于中断状态,才会调用此方法 // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列 // 返回 true,如果此线程在 signal 之前被取消,否则返回false final boolean transferAfterCancelledWait(Node node) { // 用 CAS 将节点状态设置为 0 // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断, // 因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { enq(node); // 将节点放入阻塞队列 return true; } // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0 // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成 // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
enq
private Node enq(Node node) { // 无限循环,将节点添加到同步队列尾部 for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { // 如果同步队列为空,初始化 initializeSyncQueue(); } } }
reportInterruptAfterWait
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 如果是THROW_IE状态,抛异常 if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) // 再次中断,因为中断状态被使用过一次 selfInterrupt(); }
awaitNanos、awaitUntil和await(long time, TimeUnit unit)这几个方法的整体逻辑是一样的,就不再分析了
awaitNanos
awaitUntil
await(long time, TimeUnit unit)
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取条件队列中的第一个节点 Node first = firstWaiter; if (first != null) // 唤醒等待条件的节点 doSignal(first); }
doSignal
private void doSignal(Node first) { do { // 去掉无效的节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 将节点转移到同步队列 (first = firstWaiter) != null); }
transferForSignal
final boolean transferForSignal(Node node) { /* * 取消的节点不需要转移 */ if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; /* * 将节点加入同步队列尾部 */ Node p = enq(node); int ws = p.waitStatus; // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用 // 节点入队后,需要把前驱节点的状态设为SIGNAL if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程 LockSupport.unpark(node.thread); return true; }
public final void signalAll() { // 如果是当前线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 唤醒所有等待条件的节点 doSignalAll(first); }
doSignalAll
// 将所有的节点都转移到同步队列 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
现在将与AQS相关的核心代码都整理了一遍,里面如果有描述不清晰或者不准确的地方希望大家可以帮忙指出!
原文链接:https://www.cnblogs.com/pinxiong/p/13288201.html