Java多執行緒9 同步工具類CyclicBarrier
CyclicBarrier是一個同步工具類,它允許一組執行緒互相等待,直到達到某個公共屏障點。與CountDownLatch不同的是該barrier在釋放執行緒等待後可以重用,所以它稱為迴圈(Cyclic)的屏障(Barrier)。
CyclicBarrier支援一個可選的Runnable命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),該命令只在每個屏障點執行一次。若再繼續所有的參與執行緒之前更新共享狀態,此遮蔽操作很有用。
1 CyclicBarrier方法說明
CyclicBarrier提供的方法有:
CyclicBarrier(parties):初始化相互等待的執行緒數量的構造方法。
CyclicBarrier(parties,Runnable barrierAction):初始化相互等待的執行緒數量以及屏障執行緒的構造方法。
屏障執行緒的執行時機:
等待的執行緒數量=parties之後,CyclicBarrier開啟屏障之前。
舉例:在分組計算中,每個執行緒負責一部分計算,最終這些執行緒計算結束之後,交由屏障執行緒進行彙總計算。
int getParties():獲取CyclicBarrier開啟屏障的執行緒數量,也成為方數。
int getNumberWaiting():獲取正在CyclicBarrier上等待的執行緒數量。
int await():在CyclicBarrier上進行阻塞等待,直到發生以下情形之一:
- 在CyclicBarrier上等待的執行緒數量達到parties,則所有執行緒被釋放,繼續執行。
- 當前執行緒被中斷,則丟擲InterruptedException異常,並停止等待,繼續執行。
- 其他等待的執行緒被中斷,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
- 其他等待的執行緒超時,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
- 其他執行緒呼叫CyclicBarrier.reset()方法,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
int await(timeout,TimeUnit):在CyclicBarrier上進行限時的阻塞等待,直到發生以下情形之一:
- 在CyclicBarrier上等待的執行緒數量達到parties,則所有執行緒被釋放,繼續執行。
- 當前執行緒被中斷,則丟擲InterruptedException異常,並停止等待,繼續執行。
- 當前執行緒等待超時,則丟擲TimeoutException異常,並停止等待,繼續執行。
- 其他等待的執行緒被中斷,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
- 其他等待的執行緒超時,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
- 其他執行緒呼叫CyclicBarrier.reset()方法,則當前執行緒丟擲BrokenBarrierException異常,並停止等待,繼續執行。
boolean isBroken():獲取是否破損標誌位broken的值,此值有以下幾種情況:
- CyclicBarrier初始化時,broken=false,表示屏障未破損。
- 如果正在等待的執行緒被中斷,則broken=true,表示屏障破損。
- 如果正在等待的執行緒超時,則broken=true,表示屏障破損。
- 如果有執行緒呼叫CyclicBarrier.reset()方法,則broken=false,表示屏障回到未破損狀態。
void reset():使得CyclicBarrier迴歸初始狀態,直觀來看它做了兩件事:
- 如果有正在等待的執行緒,則會丟擲BrokenBarrierException異常,且這些執行緒停止等待,繼續執行。
- 將是否破損標誌位broken置為false。
2 CyclicBarrier例項
假若有若干個執行緒都要進行寫資料操作,並且只有所有執行緒都完成寫資料操作之後,這些執行緒才能繼續做後面的事情,此時就可以利用CyclicBarrier了:
public static void main(String[] args) { int N = 4; CyclicBarrier barrier= new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("執行緒"+Thread.currentThread().getName()+"正在寫入資料..."); try { Thread.sleep(5000);//以睡眠來模擬寫入資料操作 System.out.println("執行緒"+Thread.currentThread().getName()+"寫入資料完畢,等待其他執行緒寫入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有執行緒寫入完畢,繼續處理其他任務..."); } }
執行緒Thread-0正在寫入資料... 執行緒Thread-3正在寫入資料... 執行緒Thread-1正在寫入資料... 執行緒Thread-2正在寫入資料... 執行緒Thread-1寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-3寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-2寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-0寫入資料完畢,等待其他執行緒寫入完畢 所有執行緒寫入完畢,繼續處理其他任務... 所有執行緒寫入完畢,繼續處理其他任務... 所有執行緒寫入完畢,繼續處理其他任務... 所有執行緒寫入完畢,繼續處理其他任務...
從上面輸出結果可以看出,每個寫入執行緒執行完寫資料操作之後,就在等待其他執行緒寫入操作完畢。
當所有執行緒執行緒寫入操作完畢之後,所有執行緒就繼續進行後續的操作了。
如果想在所有執行緒寫入操作完之後,進行額外的其他操作可以為CyclicBarrier提供Runnable引數:
public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier= new CyclicBarrier(N,new Runnable() { @Override public void run() { System.out.println("當前執行緒"+Thread.currentThread().getName()); } }); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("執行緒"+Thread.currentThread().getName()+"正在寫入資料..."); try { Thread.sleep(3000);//以睡眠來模擬寫入資料操作 System.out.println("執行緒"+Thread.currentThread().getName()+"寫入資料完畢,等待其他執行緒寫入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有執行緒寫入完畢,繼續處理其他任務..."); } } }
執行緒Thread-0正在寫入資料... 執行緒Thread-3正在寫入資料... 執行緒Thread-2正在寫入資料... 執行緒Thread-1正在寫入資料... 執行緒Thread-1寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-3寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-0寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-2寫入資料完畢,等待其他執行緒寫入完畢 當前執行緒Thread-2 所有執行緒寫入完畢,繼續處理其他任務... 所有執行緒寫入完畢,繼續處理其他任務... 所有執行緒寫入完畢,繼續處理其他任務... 所有執行緒寫入完畢,繼續處理其他任務...
從結果可以看出,當四個執行緒都到達barrier狀態後,會從四個執行緒中選擇一個執行緒去執行Runnable。
await指定時間的效果:
public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for (int i = 0; i < N; i++) { if (i < N - 1) new Writer(barrier).start(); else { try { //執行時間遠小於2000(cyclicBarrier.await 指定時間) 就不會丟擲TimeoutException Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } new Writer(barrier).start(); } } } static class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("執行緒" + Thread.currentThread().getName() + "正在寫入資料..."); try { Thread.sleep(3000);//以睡眠來模擬寫入資料操作 System.out.println("執行緒" + Thread.currentThread().getName() + "寫入資料完畢,等待其他執行緒寫入完畢"); try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "所有執行緒寫入完畢,繼續處理其他任務..."); } } }
執行緒Thread-0正在寫入資料... 執行緒Thread-2正在寫入資料... 執行緒Thread-1正在寫入資料... 執行緒Thread-0寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-2寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-1寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-3正在寫入資料... java.util.concurrent.TimeoutException at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257) at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435) at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43) Thread-0所有執行緒寫入完畢,繼續處理其他任務... java.util.concurrent.BrokenBarrierException at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250) at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435) at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43) Thread-1所有執行緒寫入完畢,繼續處理其他任務... java.util.concurrent.BrokenBarrierException at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250) at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435) at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43) Thread-2所有執行緒寫入完畢,繼續處理其他任務... 執行緒Thread-3寫入資料完畢,等待其他執行緒寫入完畢 java.util.concurrent.BrokenBarrierException at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207) at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435) at CyclicBarrierTest$Writer.run(CyclicBarrierTest.java:43) Thread-3所有執行緒寫入完畢,繼續處理其他任務...
上面的程式碼在main方法的for迴圈中,故意讓最後一個執行緒啟動延遲,因為在前面三個執行緒都達到barrier之後,等待了指定的時間發現第四個執行緒還沒有達到barrier,就丟擲異常並繼續執行後面的任務。
另外CyclicBarrier是可以重用的,看下面這個例子:
public class CyclicBarrierTest { public static void main(String[] args) { int N = 4; CyclicBarrier barrier= new CyclicBarrier(N); for(int i=0;i<N;i++) { new Writer(barrier).start(); } try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CyclicBarrier重用"); for(int i=0;i<N;i++) { new Writer(barrier).start(); } } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("執行緒"+Thread.currentThread().getName()+"正在寫入資料..."); try { Thread.sleep(3000);//以睡眠來模擬寫入資料操作 System.out.println("執行緒"+Thread.currentThread().getName()+"寫入資料完畢,等待其他執行緒寫入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"所有執行緒寫入完畢,繼續處理其他任務..."); } } }
執行緒Thread-0正在寫入資料... 執行緒Thread-3正在寫入資料... 執行緒Thread-2正在寫入資料... 執行緒Thread-1正在寫入資料... 執行緒Thread-1寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-0寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-3寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-2寫入資料完畢,等待其他執行緒寫入完畢 Thread-2所有執行緒寫入完畢,繼續處理其他任務... Thread-1所有執行緒寫入完畢,繼續處理其他任務... Thread-3所有執行緒寫入完畢,繼續處理其他任務... Thread-0所有執行緒寫入完畢,繼續處理其他任務... CyclicBarrier重用 執行緒Thread-4正在寫入資料... 執行緒Thread-5正在寫入資料... 執行緒Thread-6正在寫入資料... 執行緒Thread-7正在寫入資料... 執行緒Thread-5寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-4寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-7寫入資料完畢,等待其他執行緒寫入完畢 執行緒Thread-6寫入資料完畢,等待其他執行緒寫入完畢 Thread-6所有執行緒寫入完畢,繼續處理其他任務... Thread-5所有執行緒寫入完畢,繼續處理其他任務... Thread-4所有執行緒寫入完畢,繼續處理其他任務... Thread-7所有執行緒寫入完畢,繼續處理其他任務...
從執行結果可以看出,在初次的4個執行緒越過barrier狀態後,又可以用來進行新一輪的使用。而CountDownLatch無法進行重複使用。
3 CyclicBarrier原始碼解析
先看一下CyclicBarrier中成員變數的組成:
/** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties;//攔截的執行緒數量 /* The command to run when tripped */ private final Runnable barrierCommand; //當屏障撤銷時,需要執行的屏障操作 //當前的Generation。每當屏障失效或者開閘之後都會自動替換掉。從而實現重置的功能。 private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation.It is reset to parties on each new * generation or when broken. */ private int count;
可以看出,CyclicBarrier是由ReentrantLock和Condition來實現的。具體每個變數都有什麼意義,我們在分析原始碼的時候具體說。
我們主要從CyclicBarrier的構造方法和它的await方法分析說起。
CyclicBarrier建構函式
CyclicBarrier有兩個建構函式:
//帶Runnable引數的函式 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties;//有幾個運動員要參賽 this.count = parties;//目前還需要幾個運動員準備好 //你要在所有執行緒都繼續執行下去之前要執行什麼操作,可以為空 this.barrierCommand = barrierAction; } //不帶Runnable引數的函式 public CyclicBarrier(int parties) { this(parties, null); }
其中,第二個建構函式呼叫的是第一個建構函式,這個 Runnable barrierAction 引數是什麼呢?其實在上面的小示例中我們就用到了這個Runnable引數,它就是在所有執行緒都準備好之後,滿足Barrier條件時,並且在所有執行緒繼續執行之前,我們可以執行這個Runnable。但是值得注意的是,這不是新起了一個執行緒,而是通過最後一個準備好的(也就是最後一個到達Barrier的)執行緒承擔啟動的。這一點我們在上面示例中列印的執行結果中也可以看出來:Thread-2執行緒是最後一個準備好的,就是它執行的這個barrierAction。
這裡parties和count不要混淆,parties是表示必須有幾個執行緒要到達Barrier,而count是表示目前還有幾個執行緒未到達Barrier。也就是說,只有當count引數為0時,Barrier條件即滿足,所有執行緒可以繼續執行。
count變數是怎麼減少到0的呢?是通過Barrier執行的await方法。下面我們就看一下await方法。
await方法
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
await方法呼叫的dowait方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//獲取ReentrantLock互斥鎖 try { final Generation g = generation;//獲取generation物件 if (g.broken)//如果generation損壞,丟擲異常 throw new BrokenBarrierException(); if (Thread.interrupted()) { //如果當前執行緒被中斷,則呼叫breakBarrier方法,停止CyclicBarrier,並喚醒所有執行緒 breakBarrier(); throw new InterruptedException(); } int index = --count;// 看到這裡了吧,count減1 //index=0,也就是說,有0個執行緒未滿足CyclicBarrier條件,也就是條件滿足, //可以喚醒所有的執行緒了 if (index == 0) {// tripped boolean ranAction = false; try { //這就是構造器的第二個引數,如果不為空的話,就執行這個Runnable的run方法, //你看,這裡是執行的是run方法,也就是說,並沒有新起一個另外的執行緒, //而是最後一個執行await操作的執行緒執行的這個run方法。 final Runnable command = barrierCommand; if (command != null) command.run(); //同步執行barrierCommand ranAction = true; nextGeneration(); //執行成功設定下一個nextGeneration return 0; } finally { if (!ranAction) . //如果barrierCommand執行失敗,進行屏障破壞處理 breakBarrier(); } } //如果當前執行緒不是最後一個到達的執行緒 // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); //呼叫Condition的await()方法阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //呼叫Condition的awaitNanos()方法阻塞 } catch (InterruptedException ie) { //如果當前執行緒被中斷,則判斷是否有其他執行緒已經使屏障破壞。若沒有則進行屏障破壞處理,並丟擲異常;否則再次中斷當前執行緒 if (g == generation && ! g.broken) { breakBarrier();//執行breakBarrier,喚醒所有執行緒 throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken)//如果當前generation已經損壞,丟擲異常 throw new BrokenBarrierException(); if (g != generation)//如果generation已經更新換代,則返回index return index; //如果是引數是超時等待,並且已經超時,則執行breakBarrier()方法 //喚醒所有等待執行緒。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
簡單來說,如果不發生異常,執行緒不被中斷,那麼dowait方法會呼叫Condition的await方法(具體Condition的原理請看前面的文章),直到所有執行緒都準備好,即都執行了dowait方法,(做count的減操作,直到count=0),即CyclicBarrier條件已滿足,就會執行喚醒執行緒操作,也就是上面的nextGeneration()方法。可能大家會有疑惑,這個Generation是什麼東西呢?其實這個Generation定義的很簡單,就一個布林值的成員變數:
private Generation generation = new Generation(); private static class Generation { boolean broken = false; }
Generation 可以理解成“代”,我們要知道,CyclicBarrier是可以重複使用的,CyclicBarrier中的同一批執行緒屬於同一“代”,當所有執行緒都滿足了CyclicBarrier條件,執行喚醒操作nextGeneration()方法時,會新new 出一個Generation,代表一下“代”。
nextGeneration的原始碼
private void nextGeneration() { // signal completion of last generation trip.signalAll();//呼叫Condition的signalAll方法,喚醒所有await的執行緒 // set up next generation count = parties;//重置count值 //生成新的Generation,表示上一代的所有執行緒已經喚醒,進行更新換代 generation = new Generation(); }
breakBarrier原始碼
再來看一下breakBarrier的程式碼,breakBarrier方法是在當前執行緒被中斷時執行的,用來喚醒所有的等待執行緒:
private void breakBarrier() { generation.broken = true;//表示當代因為執行緒被中斷,已經發成損壞了 count = parties;//重置count值 trip.signalAll();//呼叫Condition的signalAll方法,喚醒所有await的執行緒 }
isBroken方法
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
判斷此屏障是否處於中斷狀態。如果因為構造或最後一次重置而導致中斷或超時,從而使一個或多個參與者擺脫此屏障點,或者因為異常而導致某個屏障操作失敗,則返回true;否則返回false。
reset方法
//將屏障重置為其初始狀態。 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //喚醒所有等待的執行緒繼續執行,並設定屏障中斷狀態為true breakBarrier();// break the current generation //喚醒所有等待的執行緒繼續執行,並設定屏障中斷狀態為false nextGeneration(); // start a new generation } finally { lock.unlock(); } }
getNumberWaiting方法
//返回當前在屏障處等待的參與者數目,此方法主要用於除錯和斷言。 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
總結:
1.CyclicBarrier可以用於多執行緒計算資料,最後合併計算結果的應用場景。
2.這個等待的await方法,其實是使用ReentrantLock和Condition控制實現的。
3.CyclicBarrier可以重複使用。