死磕 java併發包之LongAdder原始碼分析
問題
(1)java8中為什麼要新增LongAdder?
(2)LongAdder的實現方式?
(3)LongAdder與AtomicLong的對比?
簡介
LongAdder是java8中新增的原子類,在多執行緒環境中,它比AtomicLong效能要高出不少,特別是寫多的場景。
它是怎麼實現的呢?讓我們一起來學習吧。
原理
LongAdder的原理是,在最初無競爭時,只更新base的值,當有多執行緒競爭時通過分段的思想,讓不同的執行緒更新不同的段,最後把這些段相加就得到了完整的LongAdder儲存的值。
原始碼分析
LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內部類和各重要屬性。
主要內部類
// Striped64中的內部類,使用@sun.misc.Contended註解,說明裡面的值消除偽共享 @sun.misc.Contended static final class Cell { // 儲存元素的值,使用volatile修飾保證可見性 volatile long value; Cell(long x) { value = x; } // CAS更新value的值 final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe例項 private static final sun.misc.Unsafe UNSAFE; // value欄位的偏移量 private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
Cell類使用@sun.misc.Contended註解,說明是要避免偽共享的。
使用Unsafe的CAS更新value的值,其中value的值使用volatile修飾,保證可見性。
關於Unsafe的介紹請檢視【 死磕 java魔法類之Unsafe解析 】。
關於偽共享的介紹請檢視【 雜談 什麼是偽共享(false sharing)? 】。
主要屬性
// 這三個屬性都在Striped64中 // cells陣列,儲存各個段的值 transient volatile Cell[] cells; // 最初無競爭時使用的,也算一個特殊的段 transient volatile long base; // 標記當前是否有執行緒在建立或擴容cells,或者在建立Cell // 通過CAS更新該值,相當於是一個鎖 transient volatile int cellsBusy;
最初無競爭或有其它執行緒在建立cells陣列時使用base更新值,有過競爭時使用cells更新值。
最初無競爭是指一開始沒有執行緒之間的競爭,但也有可能是多執行緒在操作,只是這些執行緒沒有同時去更新base的值。
有過競爭是指只要出現過競爭不管後面有沒有競爭都使用cells更新值,規則是不同的執行緒hash到不同的cell上去更新,減少競爭。
add(x)方法
add(x)方法是LongAdder的主要方法,使用它可以使LongAdder中儲存的值增加x,x可為正可為負。
public void add(long x) { // as是Striped64中的cells屬性 // b是Striped64中的base屬性 // v是當前執行緒hash到的Cell中儲存的值 // m是cells的長度減1,hash時作為掩碼使用 // a是當前執行緒hash到的Cell Cell[] as; long b, v; int m; Cell a; // 條件1:cells不為空,說明出現過競爭,cells已經建立 // 條件2:cas操作base失敗,說明其它執行緒先一步修改了base,正在出現競爭 if ((as = cells) != null || !casBase(b = base, b + x)) { // true表示當前競爭還不激烈 // false表示競爭激烈,多個執行緒hash到同一個Cell,可能要擴容 boolean uncontended = true; // 條件1:cells為空,說明正在出現競爭,上面是從條件2過來的 // 條件2:應該不會出現 // 條件3:當前執行緒所在的Cell為空,說明當前執行緒還沒有更新過Cell,應初始化一個Cell // 條件4:更新當前執行緒所在的Cell失敗,說明現在競爭很激烈,多個執行緒hash到了同一個Cell,應擴容 if (as == null || (m = as.length - 1) < 0 || // getProbe()方法返回的是執行緒中的threadLocalRandomProbe欄位 // 它是通過隨機數生成的一個值,對於一個確定的執行緒這個值是固定的 // 除非刻意修改它 (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) // 呼叫Striped64中的方法處理 longAccumulate(x, null, uncontended); } }
(1)最初無競爭時只更新base;
(2)直到更新base失敗時,建立cells陣列;
(3)當多個執行緒競爭同一個Cell比較激烈時,可能要擴容;
longAccumulate()方法
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 儲存執行緒的probe值 int h; // 如果getProbe()方法返回0,說明隨機數未初始化 if ((h = getProbe()) == 0) { // 強制初始化 ThreadLocalRandom.current(); // force initialization // 重新獲取probe值 h = getProbe(); // 都未初始化,肯定還不存在競爭激烈 wasUncontended = true; } // 是否發生碰撞 boolean collide = false;// True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // cells已經初始化過 if ((as = cells) != null && (n = as.length) > 0) { // 當前執行緒所在的Cell未初始化 if ((a = as[(n - 1) & h]) == null) { // 當前無其它執行緒在建立或擴容cells,也沒有執行緒在建立Cell if (cellsBusy == 0) {// Try to attach new Cell // 新建一個Cell,值為當前需要增加的值 Cell r = new Cell(x);// Optimistically create // 再次檢測cellsBusy,並嘗試更新它為1 // 相當於當前執行緒加鎖 if (cellsBusy == 0 && casCellsBusy()) { // 是否建立成功 boolean created = false; try {// Recheck under lock Cell[] rs; int m, j; // 重新獲取cells,並找到當前執行緒hash到cells陣列中的位置 // 這裡一定要重新獲取cells,因為as並不在鎖定範圍內 // 有可能已經擴容了,這裡要重新獲取 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 把上面新建的Cell放在cells的j位置處 rs[j] = r; // 建立成功 created = true; } } finally { // 相當於釋放鎖 cellsBusy = 0; } // 建立成功了就返回 // 值已經放在新建的Cell裡面了 if (created) break; continue;// Slot is now non-empty } } // 標記當前未出現衝突 collide = false; } // 當前執行緒所在的Cell不為空,且更新失敗了 // 這裡簡單地設為true,相當於簡單地自旋一次 // 通過下面的語句修改執行緒的probe再重新嘗試 else if (!wasUncontended)// CAS already known to fail wasUncontended = true;// Continue after rehash // 再次嘗試CAS更新當前執行緒所在Cell的值,如果成功了就返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果cells陣列的長度達到了CPU核心數,或者cells擴容了 // 設定collide為false並通過下面的語句修改執行緒的probe再重新嘗試 else if (n >= NCPU || cells != as) collide = false;// At max size or stale // 上上個elseif都更新失敗了,且上個條件不成立,說明出現衝突了 else if (!collide) collide = true; // 明確出現衝突了,嘗試佔有鎖,並擴容 else if (cellsBusy == 0 && casCellsBusy()) { try { // 檢查是否有其它執行緒已經擴容過了 if (cells == as) {// Expand table unless stale // 新陣列為原陣列的兩倍 Cell[] rs = new Cell[n << 1]; // 把舊陣列元素拷貝到新陣列中 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 重新賦值cells為新陣列 cells = rs; } } finally { // 釋放鎖 cellsBusy = 0; } // 已解決衝突 collide = false; // 使用擴容後的新陣列重新嘗試 continue;// Retry with expanded table } // 更新失敗或者達到了CPU核心數,重新生成probe,並重試 h = advanceProbe(h); } // 未初始化過cells陣列,嘗試佔有鎖並初始化cells陣列 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 是否初始化成功 boolean init = false; try {// Initialize table // 檢測是否有其它執行緒初始化過 if (cells == as) { // 新建一個大小為2的Cell陣列 Cell[] rs = new Cell[2]; // 找到當前執行緒hash到陣列中的位置並建立其對應的Cell rs[h & 1] = new Cell(x); // 賦值給cells陣列 cells = rs; // 初始化成功 init = true; } } finally { // 釋放鎖 cellsBusy = 0; } // 初始化成功直接返回 // 因為增加的值已經同時建立到Cell中了 if (init) break; } // 如果有其它執行緒在初始化cells陣列中,就嘗試更新base // 如果成功了就返回 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;// Fall back on using base } }
(1)如果cells陣列未初始化,當前執行緒會嘗試佔有cellsBusy鎖並建立cells陣列;
(2)如果當前執行緒嘗試建立cells陣列時,發現有其它執行緒已經在建立了,就嘗試更新base,如果成功就返回;
(3)通過執行緒的probe值找到當前執行緒應該更新cells陣列中的哪個Cell;
(4)如果當前執行緒所在的Cell未初始化,就佔有佔有cellsBusy鎖並在相應的位置建立一個Cell;
(5)嘗試CAS更新當前執行緒所在的Cell,如果成功就返回,如果失敗說明出現衝突;
(5)當前執行緒更新Cell失敗後並不是立即擴容,而是嘗試更新probe值後再重試一次;
(6)如果在重試的時候還是更新失敗,就擴容;
(7)擴容時當前執行緒佔有cellsBusy鎖,並把陣列容量擴大到兩倍,再遷移原cells陣列中元素到新陣列中;
(8)cellsBusy在建立cells陣列、建立Cell、擴容cells陣列三個地方用到;
sum()方法
sum()方法是獲取LongAdder中真正儲存的值的大小,通過把base和所有段相加得到。
public long sum() { Cell[] as = cells; Cell a; // sum初始等於base long sum = base; // 如果cells不為空 if (as != null) { // 遍歷所有的Cell for (int i = 0; i < as.length; ++i) { // 如果所在的Cell不為空,就把它的value累加到sum中 if ((a = as[i]) != null) sum += a.value; } } // 返回sum return sum; }
可以看到sum()方法是把base和所有段的值相加得到,那麼,這裡有一個問題,如果前面已經累加到sum上的Cell的value有修改,不是就沒法計算到了麼?
答案確實如此,所以LongAdder可以說不是強一致性的,它是最終一致性的。
LongAdder VS AtomicLong
直接上程式碼:
public class LongAdderVSAtomicLongTest { public static void main(String[] args){ testAtomicLongVSLongAdder(1, 10000000); testAtomicLongVSLongAdder(10, 10000000); testAtomicLongVSLongAdder(20, 10000000); testAtomicLongVSLongAdder(40, 10000000); testAtomicLongVSLongAdder(80, 10000000); } static void testAtomicLongVSLongAdder(final int threadCount, final int times){ try { System.out.println("threadCount:" + threadCount + ", times:" + times); long start = System.currentTimeMillis(); testLongAdder(threadCount, times); System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms"); long start2 = System.currentTimeMillis(); testAtomicLong(threadCount, times); System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms"); } catch (InterruptedException e) { e.printStackTrace(); } } static void testAtomicLong(final int threadCount, final int times) throws InterruptedException { AtomicLong atomicLong = new AtomicLong(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ atomicLong.incrementAndGet(); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } static void testLongAdder(final int threadCount, final int times) throws InterruptedException { LongAdder longAdder = new LongAdder(); List<Thread> list = new ArrayList<>(); for (int i=0;i<threadCount;i++){ list.add(new Thread(() -> { for (int j = 0; j<times; j++){ longAdder.add(1); } })); } for (Thread thread : list){ thread.start(); } for (Thread thread : list){ thread.join(); } } }
執行結果如下:
threadCount:1, times:10000000 LongAdder elapse:158ms AtomicLong elapse:64ms threadCount:10, times:10000000 LongAdder elapse:206ms AtomicLong elapse:2449ms threadCount:20, times:10000000 LongAdder elapse:429ms AtomicLong elapse:5142ms threadCount:40, times:10000000 LongAdder elapse:840ms AtomicLong elapse:10506ms threadCount:80, times:10000000 LongAdder elapse:1369ms AtomicLong elapse:20482ms
可以看到當只有一個執行緒的時候,AtomicLong反而效能更高,隨著執行緒越來越多,AtomicLong的效能急劇下降,而LongAdder的效能影響很小。
總結
(1)LongAdder通過base和cells陣列來儲存值;
(2)不同的執行緒會hash到不同的cell上去更新,減少了競爭;
(3)LongAdder的效能非常高,最終會達到一種無競爭的狀態;
彩蛋
在longAccumulate()方法中有個條件是 n >= NCPU
就不會走到擴容邏輯了,而n是2的倍數,那是不是代表cells陣列最大隻能達到大於等於NCPU的最小2次方?
答案是明確的。因為同一個CPU核心同時只會執行一個執行緒,而更新失敗了說明有兩個不同的核心更新了同一個Cell,這時會重新設定更新失敗的那個執行緒的probe值,這樣下一次它所在的Cell很大概率會發生改變,如果執行的時間足夠長,最終會出現同一個核心的所有執行緒都會hash到同一個Cell(大概率,但不一定全在一個Cell上)上去更新,所以,這裡cells陣列中長度並不需要太長,達到CPU核心數足夠了。
比如,筆者的電腦是8核的,所以這裡cells的陣列最大隻會到8,達到8就不會擴容了。
歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。