AQS同步元件--CyclicBarrier
CyclicBarrier
CyclicBarrier也是一個同步輔助類,它允許一組執行緒相互等待直到到達某個工作屏障點,通過他可以完成多執行緒之間的相互等待。每個執行緒都就緒之後才能執行後面的操作。和CountLatch有相似的地方都是通過計數器來實現的。當某個執行緒執行了await()方法後就進入等待狀態,計數器進行加1操作,當增加後的值達到我們設定的值後,執行緒被喚醒,繼續執行後續操作。CyclicBarrier是可重用的計數器,CyclicBarrier的使用場景和CountDownLatch的使用場景很相似,可以用於多執行緒計算資料最後總計結果。
CyclicBarrier和CountDownLatch的使用區別:
- CountDownLatch的計數器只能使用一次,CyclicBarrier可以用reset方法重置。
- CountDownLatch是一個執行緒等待其他執行緒完成某個操作後才能繼續執行。也就是一個或個多執行緒等待其他的關係,而CyclicBarrier是實現了多個執行緒之間的相互等待,所有執行緒都滿足了條件之後才能繼續使用。
演示程式碼
@Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
輸出結果如下:
20:43:46.324 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 0 is ready 20:43:47.322 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 1 is ready 20:43:48.323 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 2 is ready 20:43:49.323 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 3 is ready 20:43:50.325 [pool-1-thread-5] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 4 is ready 20:43:50.325 [pool-1-thread-5] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 4 continue 20:43:50.325 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 0 continue 20:43:50.325 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 1 continue 20:43:50.325 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 3 continue 20:43:50.325 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 2 continue 20:43:51.325 [pool-1-thread-6] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 5 is ready 20:43:52.326 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 6 is ready 20:43:53.326 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 7 is ready 20:43:54.326 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 8 is ready 20:43:55.327 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 9 is ready 20:43:55.327 [pool-1-thread-3] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 9 continue 20:43:55.327 [pool-1-thread-6] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 5 continue 20:43:55.327 [pool-1-thread-1] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 6 continue 20:43:55.327 [pool-1-thread-2] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 7 continue 20:43:55.327 [pool-1-thread-4] INFO com.concurrency.example.aqs.CyclicBarrierExample1 - 8 continue
我們定義了 private static CyclicBarrier barrier = new CyclicBarrier(5),每次呼叫await後加1 知道等於5就一起執行接下來的操作。
我們在建立CyclicBarrier的時候是可以傳入一段runable的,下面看一下程式碼
@Slf4j public class CyclicBarrierExample3 { private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
線上程到達=執行屏障時,執行緒會優先執行這個runable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); });