Java多執行緒11 同步工具類Exchanger
1 Exchanger 介紹
前面分別介紹了CyclicBarrier、CountDownLatch、Semaphore,現在介紹併發工具類中的最後一個Exchange。
Exchanger 是一個用於執行緒間協作的工具類,Exchanger用於進行執行緒間的資料交換,它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange 方法交換資料,如果第一個執行緒先執行exchange 方法,它會一直等待第二個執行緒也執行exchange 方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料。
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
在以上的描述中,有幾個要點:
- 此類提供對外的操作是同步的;
- 用於成對出現的執行緒之間交換資料;
- 可以視作雙向的同步佇列;
-
可應用於基因演算法、流水線設計等場景。
接著看api文件,這個類提供對外的介面非常簡潔,一個無參建構函式,兩個過載的範型exchange方法:
public V exchange(V x) throws InterruptedException public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
2 Exchanger 例項
public class ExchangerTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); executor.execute(new Runnable() { String data = "data1"; @Override public void run() { doExchangeWork(data, exchanger); } }); executor.execute(new Runnable() { String data = "data2"; @Override public void run() { doExchangeWork(data, exchanger); } }); executor.shutdown(); } private static void doExchangeWork(String data, Exchanger exchanger) { try { System.out.println(Thread.currentThread().getName() + "正在把資料 " + data + " 交換出去"); Thread.sleep((long) (Math.random() * 1000)); String exchangeData = (String) exchanger.exchange(data); System.out.println(Thread.currentThread().getName() + "交換得到資料" + exchangeData); } catch (InterruptedException e) { e.printStackTrace(); } } }
pool-1-thread-1正在把資料 data1 交換出去 pool-1-thread-2正在把資料 data2 交換出去 pool-1-thread-2交換得到資料data1 pool-1-thread-1交換得到資料data2
當執行緒A呼叫Exchange物件的exchange()方法後,他會陷入阻塞狀態,直到執行緒B也呼叫了exchange()方法,然後以執行緒安全的方式交換資料,之後執行緒A和B繼續執行。
exchange等待超時
public class ExchangerTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); executor.execute(new Runnable() { String data = "data1"; @Override public void run() { doExchangeWork(data, exchanger); } }); executor.execute(new Runnable() { String data = "data2"; @Override public void run() { try { Thread.sleep((long) (3000)); } catch (InterruptedException e) { e.printStackTrace(); } doExchangeWork(data, exchanger); } }); executor.shutdown(); } private static void doExchangeWork(String data, Exchanger exchanger) { try { System.out.println(Thread.currentThread().getName() + "正在把資料 " + data + " 交換出去"); //遠小於3秒丟擲異常 String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName() + "交換得到資料" + exchangeData); } catch ( TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
pool-1-thread-1正在把資料 data1 交換出去 java.util.concurrent.TimeoutException at java.util.concurrent.Exchanger.exchange(Exchanger.java:626) at ExchangerTest.doExchangeWork(ExchangerTest.java:37) at ExchangerTest.access$000(ExchangerTest.java:3) at ExchangerTest$1.run(ExchangerTest.java:12) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) pool-1-thread-2正在把資料 data2 交換出去 java.util.concurrent.TimeoutException at java.util.concurrent.Exchanger.exchange(Exchanger.java:626) at ExchangerTest.doExchangeWork(ExchangerTest.java:37) at ExchangerTest.access$000(ExchangerTest.java:3) at ExchangerTest$2.run(ExchangerTest.java:26) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
實戰場景
設計一個定時任務,每日凌晨執行。在定時任務中啟動兩個執行緒,一個執行緒負責對業務明細表(xxx_info)進行查詢統計,把統計的結果放置在記憶體緩衝區,另一個執行緒負責讀取緩衝區中的統計結果並插入到業務統計表(xxx_statistics)中。
親,這樣的場景是不是聽起來很有感覺?沒錯!兩個執行緒在記憶體中批量交換資料,這個事情我們可以使用Exchanger去做!
3 實現原理
Exchanger(交換者)是一個用於執行緒間協作的工具類。Exchanger用於進行執行緒間的資料交換。它提供一個同步點,在這個同步點兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料, 如果第一個執行緒先執行exchange方法,它會一直等待第二個執行緒也執行exchange,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方。因此使用Exchanger的重點是成對的執行緒使用exchange()方法,當有一對執行緒達到了同步點,就會進行交換資料。因此該工具類的執行緒物件是成對的。
Exchanger類提供了兩個方法,String exchange(V x):用於交換,啟動交換並等待另一個執行緒呼叫exchange;String exchange(V x,long timeout,TimeUnit unit):用於交換,啟動交換並等待另一個執行緒呼叫exchange,並且設定最大等待時間,當等待時間超過timeout便停止等待。