Exchanger 原始碼分析
摘要:
Exchanger
此類提供對外的操作是同步的;
用於成對出現的執行緒之間交換資料【主場景】;
可以視作雙向的同步佇列;
可應用於基因演算法、流水線設計、資料校對等場景
建立例項
/**
* arena 陣列中兩個已使用的 slot 之間的索引距離,將它們分開以避免錯誤的...
Exchanger
此類提供對外的操作是同步的; 用於成對出現的執行緒之間交換資料【主場景】; 可以視作雙向的同步佇列; 可應用於基因演算法、流水線設計、資料校對等場景
建立例項
/** * arena 陣列中兩個已使用的 slot 之間的索引距離,將它們分開以避免錯誤的共享 */ private static final int ASHIFT = 5; /** * arena 陣列的最大索引值,最大 size 值為 0xff+1 */ private static final int MMASK = 0xff; /** * Unit for sequence/version bits of bound field. Each successful * change to the bound also adds SEQ. */ private static final int SEQ = MMASK + 1; /** JVM 的 CPU 核數,用於自旋和擴容控制 */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** *arena 的最大索引值:原則上可以讓所有執行緒不發生競爭 */ static final int FULL = NCPU >= MMASK << 1 ? MMASK : NCPU >>> 1; /** * 當前執行緒阻塞等待匹配節點前的自旋次數,CPU==1 時不進行自旋 */ private static final int SPINS = 1 << 10; /** * Value representing null arguments/returns from public methods. * 舊 API 不支援 null 值所以需要適配。 */ private static final Object NULL_ITEM = new Object(); /** *交換超時的返回值物件 */ private static final Object TIMED_OUT = new Object(); @jdk.internal.vm.annotation.Contended static final class Node { // Arena index int index; // Last recorded value of Exchanger.bound int bound; // Number of CAS failures at current bound int collides; // 自旋的偽隨機數 int hash; // 執行緒的當前資料物件 Object item; // 匹配執行緒的資料物件 volatile Object match; // 駐留阻塞執行緒 volatile Thread parked; } /** 參與者 */ static final class Participant extends ThreadLocal<Node> { @Override public Node initialValue() { return new Node(); } } /** *每個執行緒的狀態 */ private final Participant participant; /** *消除陣列,只在出現競爭時初始化。 */ private volatile Node[] arena; /** *未發生競爭時使用的 slot */ private volatile Node slot; /** * The index of the largest valid arena position, OR'ed with SEQ * number in high bits, incremented on each update.The initial * update from 0 to SEQ is used to ensure that the arena array is * constructed only once. */ private volatile int bound; /** * Creates a new Exchanger. */ public Exchanger() { participant = new Participant(); }
執行緒間交換資料
/** *阻塞等待其他執行緒到達交換點後執行資料交換,支援中斷 */ @SuppressWarnings("unchecked") public V exchange(V x) throws InterruptedException { Object v; // 將目標物件 v 進行編碼 final Object item = x == null ? NULL_ITEM : x; // translate null args /** * 1)arena==null,表示未出現執行緒競爭,則使用 slot 進行資料交換 * 2)執行緒已經中斷,則丟擲 InterruptedException * 3)arena!=null,則使用 arena 中的 slot 進行資料交換 */ if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && (Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null)) { throw new InterruptedException(); } // 解碼目標物件 return v == NULL_ITEM ? null : (V)v; } /** *未出現競爭時的資料交換方式 * @param item需要交換的目標物件 * @param timed 是否是超時模式 * @param ns超時的納秒數 * @return *1)目標執行緒的資料物件 *2)null slot 交換出現競爭、執行緒被中斷 *3)TIMED_OUT 交換超時 */ private final Object slotExchange(Object item, boolean timed, long ns) { // 讀取參與者節點 final Node p = participant.get(); // 讀取當前執行緒 final Thread t = Thread.currentThread(); // 執行緒被設定了中斷標識,則返回 null if (t.isInterrupted()) { return null; } for (Node q;;) { // 1)已經有執行緒在阻塞等待交換資料 if ((q = slot) != null) { // 將 slot 置為 null if (SLOT.compareAndSet(this, q, null)) { // 讀取目標物件 final Object v = q.item; // 寫入交換物件 q.match = item; // 如果執行緒在阻塞等待 final Thread w = q.parked; if (w != null) { // 則喚醒交換執行緒 LockSupport.unpark(w); } // 返回交換到的物件 return v; } /** * NCPU > 1 多核 CPU 才會啟用競技場 && * 設定最大有效的 arena 索引值 */ if (NCPU > 1 && bound == 0 && BOUND.compareAndSet(this, 0, SEQ)) { // 建立競技場 arena = new Node[FULL + 2 << ASHIFT]; } } // 2)啟用了 arena else if (arena != null) { return null; // caller must reroute to arenaExchange // 3)slot 為空 && 未啟用 arena } else { // 寫入交換資料 p.item = item; // 將 Node 寫入 slot,成功則退出迴圈 if (SLOT.compareAndSet(this, null, p)) { break; } // 出現競爭,則重試 p.item = null; } } // 等待釋放 int h = p.hash; // 計算截止時間 final long end = timed ? System.nanoTime() + ns : 0L; // 計算自旋次數,多核 CPU 為 1024 int spins = NCPU > 1 ? SPINS : 1; Object v; // 只要沒有匹配的交換資料 while ((v = p.match) == null) { // 1)自旋還未完成 if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) { h = SPINS | (int)t.getId(); } else if (h < 0 && (--spins & (SPINS >>> 1) - 1) == 0) { Thread.yield(); } } // 2)slot 已經更新 else if (slot != p) { spins = SPINS; /** * 3)執行緒未中斷 && 未啟用競技場 && 不是超時模式; *如果是超時模式,則計算剩餘時間,當前還未超時 */ } else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 寫入駐留執行緒 p.parked = t; // 如果 slot 未更新,沒有執行緒來進行資料交換 if (slot == p) { // 1)阻塞等待 if (ns == 0L) { LockSupport.park(this); // 2)超時阻塞等待 } else { LockSupport.parkNanos(this, ns); } } // 執行緒釋放後,清空 parked p.parked = null; } // 如果執行緒被中斷或已經超時,則將 slot 清空 else if (SLOT.compareAndSet(this, p, null)) { // 如果是超時,則返回 TIMED_OUT;執行緒中斷,則返回 null v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 清空 match MATCH.setRelease(p, null); // 清空 item p.item = null; p.hash = h; // 返回交換到的資料物件 return v; } /** * Exchange function when arenas enabled. See above for explanation. * * @param item需要交換的目標物件 * @param timed 是否是超時模式 * @param ns超時的納秒數 * @return *1)目標執行緒的資料物件 *2)null 執行緒被中斷 *3)TIMED_OUT 交換超時 */ private final Object arenaExchange(Object item, boolean timed, long ns) { // 讀取 arena final Node[] a = arena; // 讀取陣列長度 final int alen = a.length; // 讀取當前執行緒的參與者,初始值為 0 final Node p = participant.get(); for (int i = p.index;;) { // access slot at i int b, m, c; // 一般為 31 int j = (i << ASHIFT) + (1 << ASHIFT) - 1; if (j < 0 || j >= alen) { j = alen - 1; } // 讀取指定 slot 的 Node final Node q = (Node) AA.getAcquire(a, j); // 1)目標 slot 已經有執行緒在等待交換資料,則嘗試清空 slot if (q != null && AA.compareAndSet(a, j, q, null)) { // 讀取目標物件 final Object v = q.item; // release // 寫入交換物件 q.match = item; final Thread w = q.parked; if (w != null) { // 喚醒駐留執行緒 LockSupport.unpark(w); } // 返回交換到的值 return v; // 2)目標索引 i 在有效索引範圍內 && slot 為 null } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 寫入 item p.item = item; // offer // 寫入節點 if (AA.compareAndSet(a, j, null, p)) { // 計算截止時間 final long end = timed && m == 0 ? System.nanoTime() + ns : 0L; // 讀取當前執行緒 final Thread t = Thread.currentThread(); // wait // 讀取自旋次數 1024 for (int h = p.hash, spins = SPINS;;) { // 讀取匹配資料 final Object v = p.match; // 1)已經有執行緒將交換資料寫入 if (v != null) { MATCH.setRelease(p, null); p.item = null; // clear for next use p.hash = h; return v; // 2)自旋還未結束 } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) { h = SPINS | (int) t.getId(); } else if (h < 0 && // approx 50% true (--spins & (SPINS >>> 1) - 1) == 0) { Thread.yield(); // two yields per wait } // 3)slot 已經更新 } else if (AA.getAcquire(a, j) != p) { spins = SPINS; // releaser hasn't set match yet // 4) 執行緒未中斷、未超時 } else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { // 寫入駐留執行緒 p.parked = t; // minimize window // 如果 slot 未更新,則執行緒被阻塞 if (AA.getAcquire(a, j) == p) { if (ns == 0L) { LockSupport.park(this); } else { LockSupport.parkNanos(this, ns); } } p.parked = null; // 5)slot 未更新 && 執行緒超時或中斷,則清空 slot } else if (AA.getAcquire(a, j) == p && AA.compareAndSet(a, j, p, null)) { if (m != 0) { BOUND.compareAndSet(this, b, b + SEQ - 1); } p.item = null; p.hash = h; i = p.index >>>= 1; // descend // 執行緒被中斷 if (Thread.interrupted()) { return null; } // 執行緒超時 if (timed && m == 0 && ns <= 0L) { return TIMED_OUT; } break; // expired; restart } } // 2)寫入 slot 出現競爭 } else { p.item = null; // clear offer } } else { if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; i = i != m || m == 0 ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !BOUND.compareAndSet(this, b, b + SEQ + 1)) { p.collides = c + 1; i = i == 0 ? m : i - 1; // cyclically traverse } else { i = m + 1; // grow } p.index = i; } } }