Java併發-CyclicBarrier原始碼分析
ofollow,noindex">CyclicBarrier
是一個同步工具類,它讓一組執行緒等待直到一個屏障條件到達才接著執行後續程式碼。名如其類,它的意思就是迴圈屏障,就是可以服複用的,而我們知道另一個同步類CountDownLatch
是不能複用的。
用法舉例
public class CB { static class MyThread extends Thread { private CyclicBarrier cyclicBarrier; public MyThread(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("執行緒" + Thread.currentThread().getName() + "正在準備..."); try { Thread.sleep(5000);//以睡眠來模擬準備操作 System.out.println("執行緒" + Thread.currentThread().getName() + "準備完畢"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "所有執行緒準備完畢,各自繼續處理其他任務..."); } } public static void main(String[] args) { int cnt = 4; CyclicBarrier cyclicBarrier = new CyclicBarrier(cnt); for (int i = 0; i < cnt; i++) { new MyThread(cyclicBarrier).start(); } } }
以上面程式碼為例子,其輸出是:
執行緒Thread-0正在準備... 執行緒Thread-3正在準備... 執行緒Thread-2正在準備... 執行緒Thread-1正在準備... 執行緒Thread-3準備完畢 執行緒Thread-0準備完畢 執行緒Thread-2準備完畢 執行緒Thread-1準備完畢 Thread-3所有執行緒準備完畢,各自繼續處理其他任務... Thread-2所有執行緒準備完畢,各自繼續處理其他任務... Thread-0所有執行緒準備完畢,各自繼續處理其他任務... Thread-1所有執行緒準備完畢,各自繼續處理其他任務...
可以看出,每一個執行緒都必須等到所有執行緒準備完畢之後才能各自執行其後續操作,這個條件就是屏障狀態barrier,當呼叫cyclicBarrier.await()方法之後,執行緒處於等待barrier狀態而無法執行後續程式碼。
構造器和成員變數
CyclicBarrier提供了兩個構造器:
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
引數parties記錄一共等待至barrier狀態的執行緒個數,count記錄依然等待至barrier狀態的執行緒的個數,引數barrierAction為當這些執行緒都達到barrier狀態時會執行的內容。
看下它的成員變數:
/**守護barrier狀態的鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 條件佇列 */ private final Condition trip = lock.newCondition(); /** 參與的執行緒的數目 */ private final int parties; /* 所有執行緒到達barrier狀態後的執行操作 */ private final Runnable barrierCommand; /** 當前代,複用通過這個類實現 */ private Generation generation = new Generation(); /**依然在等待barrier狀態的執行緒數目*/ private int count;
核心方法
核心方法是dowait
:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) {// tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
所有執行緒進入屏障後(即count==0)會呼叫nextGeneration()
方法進入下一代所有執行緒又可以重新進入到屏障中,原始碼:
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
會呼叫signalAll()
方法喚醒所有的執行緒繼續執行後續操作。