Java 基礎--佇列同步器(AQS)
在 Java 5 之前,Java 程式是靠 synchronized 關鍵字實現鎖的功能的,在 Java 5 之後併發包中提供了 Lock 介面及相關實現類(ReentrantLock、CountDownLatch …)來實現鎖的功能,而這些實現類內部正是用到了 AbstractQueuedSynchronizer 來實現對應的功能。
簡介
佇列同步器 AbstractQueuedSynchronizer(簡稱同步器)是鎖和其他同步元件的基礎框架,內部使用了一個 int 型別的成員變數來表示同步狀態,還使用了一個 FIFO 佇列來管理執行緒的排隊工作。
同步狀態
同步器內部提供了對同步狀態操作的方法,包括設定和獲取:
// 同步狀態 private volatile int state; // 獲取同步狀態,1 表示獲取同步狀態成功,0 表示獲取同步狀態失敗 protected final int getState(){ return state; } // 設定同步狀態 protected final void setState(int newState){ state = newState; } // 採用 CAS 方式設定同步狀態 protected final boolean compareAndSetState(int expect, int update){ return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
同步器中還有幾個空方法,在自定義同步器時可以按需重寫,當需要操作同步狀態時可通過上面三個方法來完成:
// 獨佔式獲取同步狀態 protected boolean tryAcquire(int arg){ throw new UnsupportedOperationException(); } // 獨佔式釋放同步狀態 protected boolean tryRelease(int arg){ throw new UnsupportedOperationException(); } // 共享式獲取同步狀態 protected int tryAcquireShared(int arg){ throw new UnsupportedOperationException(); } // 共享式釋放同步狀態 protected boolean tryReleaseShared(int arg){ throw new UnsupportedOperationException(); } // 是否被當前執行緒佔用 protected boolean isHeldExclusively(){ throw new UnsupportedOperationException(); }
此外,同步器還提供了以下常用的模板方法:
tryAcquire(int arg) acquire(int arg) acquireInterruptibly(int arg) tryAcquireShared(int arg) acquireShared(int arg) acquireSharedInterruptibly(int arg) tryRelease(int arg) tryReleaseShared(int arg)
同步佇列
同步器內部通過 FIFO 佇列(雙向連結串列)來管理那些獲取同步狀態失敗的執行緒。當執行緒獲取同步狀態失敗後,會將當前執行緒以及一些狀態資訊構造成一個節點(Node)新增到佇列的尾部,同時阻塞該執行緒;當同步狀態被釋放後,會把後繼節點的執行緒喚醒並嘗試獲取同步狀態。
Node 是 AQS 的靜態內部類:
static final class Node{ // 共享式 static final Node SHARED = new Node(); // 獨佔式 static final Node EXCLUSIVE = null; // 等待狀態 static final int CANCELLED =1; static final int SIGNAL= -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** * 節點的等待狀態有以下幾種: * CANCELLED:由於超時或中斷導致取消,節點的狀態將不再變化 * SIGNAL:後繼節點處於等待狀態,如果當前節點釋放同步狀態或被取消就會通知後繼節點,讓後繼節點去獲取同步狀態 * CONDITION:節點在 Condition 佇列(等待佇列),此時不會去獲取同步狀態,直到呼叫 signal() 方法將其轉移到同步佇列 * PROPAGATE:下一次共享式獲取同步狀態將會傳播下去 * 0:初始狀態 */ volatile int waitStatus; // 前驅節點 volatile Node prev; // 後繼節點 volatile Node next; // 當前節點的執行緒 volatile Thread thread; Node nextWaiter; // 是否共享 final boolean isShared(){ return nextWaiter == SHARED; } // 獲取前驅節點 final Node predecessor()throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() {} Node(Thread thread, Node mode) {// Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
節點(Node)是構成同步佇列的基礎,同步器擁有頭節點和尾節點的引用。獲取同步狀態失敗的執行緒會構建成節點被新增到同步佇列的尾部。
同步佇列示意圖:
當一個執行緒獲取同步狀態成功後,其他執行緒則無法獲取同步狀態,轉而被構造成節點新增到同步佇列的尾部。這個新增過程必須是執行緒安全的,所以同步器提供了一個基於 CAS 的方法 compareAndSetTail(Node expect, Node update)
來完成新增。
頭節點在釋放同步狀態後會通知後繼節點,當後繼節點獲取同步狀態成功後將自己設定為頭節點。同步器同樣提供了一個基於 CAS 的方法 compareAndSetHead(Node update)
。
實現分析
下面將從獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態來分析同步器的實現。
獨佔式獲取與釋放同步狀態
首先看一個自定義獨佔式同步器用法的示例:
// 這是一個獨佔鎖 class Muteximplements Lock,java.io.Serializable{ // 推薦自定義靜態內部類繼承 AbstractQueuedSynchronizer private static class Syncextends AbstractQueuedSynchronizer{ // 判斷是否同步狀態 protected boolean isHeldExclusively(){ return getState() == 1; } // 如果狀態為 0 就獲取同步狀態 public boolean tryAcquire(int acquires){ assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 如果狀態為 1 就釋放同步狀態 protected boolean tryRelease(int releases){ assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 提供一個 Condition ConditionnewCondition(){ return new ConditionObject(); } private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // 將所有操作代理到 Sync private final Sync sync = new Sync(); // 暴露給外部使用 public void lock(){ sync.acquire(1); } public boolean tryLock(){ return sync.tryAcquire(1); } public void unlock(){ sync.release(1); } public Condition newCondition(){ return sync.newCondition(); } public boolean isLocked(){ return sync.isHeldExclusively(); } public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); } public void lockInterruptibly()throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
Mutex 是一個獨佔鎖,在同一個時刻只允許一個執行緒佔有鎖,Sync 是個繼承了 AbstractQueuedSynchronizer 的靜態內部類,重寫了同步器的空方法並實現了具體的邏輯,這種方式是官方所推薦的。
獨佔式獲取同步狀態
呼叫同步器的 acquire(int arg)
方法可以獲取同步狀態,該方法不響應中斷操作。也就是說當執行緒獲取同步狀態失敗後會加入到同步佇列中,如果此時對執行緒進行中斷操作,執行緒不會從同步佇列中移出。
下面看看 acquire(int arg)
方法的實現:
public final void acquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire(int arg)
方法的程式碼雖然少,但是做的事卻不少,接下來分幾步進行介紹。
獲取同步狀態
這一步是通過 tryAcquire(arg)
方法來完成的,而這個方法是需要重寫的。
構建節點
如果獲取同步狀態失敗,會用當前執行緒和其他狀態資訊構建一個節點:
private Node addWaiter(Node mode){ // 構建 Node,引數 mode 是 Node.EXCLUSIVE,表示獨佔式 Node node = new Node(Thread.currentThread(), mode); // 嘗試快速新增 Node pred = tail; if (pred != null) { node.prev = pred; // 新增到尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 新增到同步佇列 enq(node); return node; }
新增到同步佇列
新增節點到同步佇列,這裡有兩種情況:一種是同步佇列為空的時候,也就是說當前執行緒是第二個獲取同步狀態的,此時還沒有頭節點和尾節點,然後添加了一個空節點( new Node()
);另一種情況是同步佇列不為空的時候:
private Node enq(final Node node){ for (;;) { Node t = tail; if (t == null) { // 此時同步佇列為空,新增第一個節點 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; // 新增到同步佇列尾部 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
自旋
這個過程其實就是當前節點在死迴圈中獲取同步狀態:
final boolean acquireQueued(final Node node, int arg){ boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 只有前驅節點是頭節點並且當前節點獲取同步狀態成功才退出迴圈 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 更新等待狀態並阻塞執行緒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 設定當前節點為頭節點 private void setHead(Node node){ head = node; node.thread = null; node.prev = null; } // 阻塞當前執行緒 private final boolean parkAndCheckInterrupt(){ LockSupport.park(this); return Thread.interrupted(); }
獨佔式的獲取同步狀態經過前面四步就完成了,畫個流程圖加深下印象:
在獲取同步狀態時,同步器維護了一個同步佇列,獲取狀態失敗的執行緒都會被構建成節點加入到佇列中並進行自旋;移出佇列的條件是前驅節點為頭節點且成功獲取了同步狀態。
獨佔式釋放同步狀態
執行緒獲取同步狀態成功並執行完相關邏輯後就要釋放同步狀態,後繼節點就可以繼續獲取同步狀態。而呼叫同步器的 release(int arg)
方法可以釋放同步狀態:
public final boolean release(int arg){ // tryRelease() 需要重寫 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 喚醒後繼節點 private void unparkSuccessor(Node node){ // 獲取、修改頭節點等待狀態 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 解除阻塞 LockSupport.unpark(s.thread); }
所以釋放同步狀態就是修改同步狀態,並且喚醒後繼節點的執行緒。
共享式獲取與釋放同步狀態
共享式與獨佔式最大的區別在於:同一個時刻能否允許多個執行緒同時獲取到同步狀態。
再來看自定義的共享式同步器的示例:
class BooleanLatch{ private static class Syncextends AbstractQueuedSynchronizer{ boolean isSignalled(){ return getState() != 0; } // 返回大於等於 0 表示獲取同步狀態成功 protected int tryAcquireShared(int ignore){ return isSignalled() ? 1 : -1; } // 釋放同步狀態 protected boolean tryReleaseShared(int ignore){ setState(1); return true; } } // 將所有操作代理到 Sync private final Sync sync = new Sync(); // 暴露給外部使用 public boolean isSignalled(){ return sync.isSignalled(); } public void signal(){ sync.releaseShared(1); } public void await()throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
BooleanLatch 是一個共享鎖,在同一個時刻允許多個執行緒佔有鎖,Sync 是個繼承了 AbstractQueuedSynchronizer 的靜態內部類。
共享式獲取同步狀態
呼叫同步器的 acquireShared(int arg)
方法可以獲取同步狀態。下面看看程式碼實現:
public final void acquireShared(int arg){ // 大於等於 0 表示獲取同步狀態成功 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 自旋 private void doAcquireShared(int arg){ // addWaiter() 過程和獨佔式基本是一致的,唯一不同是的此時傳入了 Node.SHARED,表示共享式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); if (p == head) { // 獲取同步狀態,大於等於 0 表示獲取成功 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 更新等待狀態並阻塞執行緒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在 acquireShared()
方法中嘗試呼叫 tryAcquireShared()
方法獲取同步狀態,當 tryAcquireShared()
的返回值大於等於 0 表示獲取同步狀態成功。 doAcquireShared()
方法表示自旋,跳出迴圈的條件是前驅節點是頭節點並且 tryAcquireShared()
返回值大於等於 0。
共享式釋放同步狀態
和獨佔式一樣,共享式也需要釋放同步狀態,通過呼叫同步器的 releaseShared(int arg)
方法釋放同步狀態:
public final boolean releaseShared(int arg){ if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // 修改等待狀態 private void doReleaseShared(){ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;// loop to recheck cases // 喚醒後繼節點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;// loop on failed CAS } if (h == head)// loop if head changed break; } }
在 releaseShared()
方法中嘗試呼叫 tryReleaseShared()
方法釋放同步狀態,當 tryReleaseShared()
返回 true 表示同步狀態已經修改成功。 doReleaseShared()
方法主要是修改頭節點的等待狀態以及喚醒後繼節點的執行緒。
小結
獲取同步狀態失敗後會把當前執行緒以及其他一些狀態資訊構建成節點新增到同步佇列中(如果此時同步佇列是空的,那麼會先新增一個空節點,然後再新增這個節點),如果是獨佔式獲取,新構建的節點是 Node.EXCLUSIVE
,否則是 Node.SHARED
,加入到同步佇列後節點會自旋(死迴圈獲取同步狀態)。
釋放同步狀態成功後,將會喚醒後繼節點的執行緒,後繼節點會在自旋狀態中獲取到同步狀態,然後從同步佇列中移除。