Java 多執行緒設計模式之 Single Threades Execution
所謂 Single Threades Execution 模式,意即“以一個執行緒執行”。就像獨木橋同一時間內只允許一個人通行一樣,該模式用於設定限制,以確保同一時間內只能讓一個執行緒執行處理。
Demo
不使用 Single Threades Execution 模式的程式
使用程式模擬三個人頻繁地通過一個只允許一個人經過的門情形。當人們通過門的時候,統計人數便會遞增。另外程式還會記錄通行者的“姓名和出生地”
類一覽表
名字 | 說明 |
---|---|
Main | 建立門,並讓三個人不斷地通過的類 |
Gate | 表示門的類。它會在人們通過門時記錄其姓名與出生地 |
UserThread | 表示人的類。人們不斷地通過門 |
// Main.java public class Main { public static void main(String[] args) { Gate gate = new Gate(); new UserThread(gate, "Bob", "Britain").start(); new UserThread(gate, "Cao", "China").start(); new UserThread(gate, "Uber", "USA").start(); } } 複製程式碼
// Gate.java public class Gate { private int counter = 0; private String name = "Nobody"; private String address = "NoWhere"; public void pass(String name, String address) { this.counter++; this.name = name; this.address = address; check(); } private void check() { if (this.name.charAt(0) != this.address.charAt(0)) { System.out.println("******** BROKEN ********** : " + toString()); } } @Override public String toString() { return "No. " + this.counter + " : " + this.name + " , " + this.address; } } 複製程式碼
// UserThread.java public class UserThread extends Thread { private final Gate gate; private final String name; private final String address; public UserThread(Gate gate, String name, String address) { this.gate = gate; this.name = name; this.address = address; } @Override public void run() { System.out.println(this.name + " BEGIN"); while (true) { gate.pass(this.name, this.address); } } } 複製程式碼
當這個程式執行時,時間點不同,生成的結果也會不一樣,以下是打印出來的 log
Bob BEGIN Cao BEGIN ******** BROKEN ********** : No. 59622 : Bob , Britain Uber BEGIN ******** BROKEN ********** : No. 77170 : Uber , USA ******** BROKEN ********** : No. 89771 : Uber , USA ******** BROKEN ********** : No. 93128 : Cao , China ******** BROKEN ********** : No. 95654 : Uber , USA ******** BROKEN ********** : No. 98440 : Cao , China ******** BROKEN ********** : No. 102283 : Cao , China ******** BROKEN ********** : No. 104491 : Cao , China ******** BROKEN ********** : No. 106791 : Uber , USA ******** BROKEN ********** : No. 110022 : Uber , USA ******** BROKEN ********** : No. 112073 : Uber , USA ******** BROKEN ********** : No. 113973 : Uber , USA ******** BROKEN ********** : No. 77170 : Uber , USA ******** BROKEN ********** : No. 116050 : Bob , China ******** BROKEN ********** : No. 117334 : Bob , Britain ******** BROKEN ********** : No. 119992 : Bob , USA ******** BROKEN ********** : No. 124427 : Uber , USA ******** BROKEN ********** : No. 117152 : Bob , Britain ******** BROKEN ********** : No. 129298 : Bob , China ******** BROKEN ********** : No. 130552 : Cao , Britain ******** BROKEN ********** : No. 147176 : Cao , China ******** BROKEN ********** : No. 148546 : Uber , USA 複製程式碼
通過 log 可以知道執行結果與預期不一致,所以說 Gate 類是不安全的,是非執行緒安全類。
如果仔細看一下 counter 的值,最開始顯示 BROKEN 的時候,counter 的值已經變為了 59622。也就是說,在檢察處第一個錯誤的時候 Gate 的 pass 方法已經運行了 5 萬多次了。在這裡,因為 UserThread 類的 run 方法執行的是無限迴圈,所以才檢查除了錯誤。但是如果只測試幾次,是根本找不出錯誤的。
這就是多執行緒程式設計的難點之一。如果檢察出錯誤,那麼說明程式並不安全。但是就算沒有檢察出錯誤,也不能說程式就一定是安全的。
除錯資訊也不可靠
仔細看 log 會發現還有一個奇怪的現象,比如:
******** BROKEN ********** : No. 59622 : Bob , Britain 複製程式碼
雖然此處輸出了 BROKEN 資訊,但是姓名和出生地首字母是一樣的。儘管顯示了 BROKEN,但是除錯資訊好像並沒有錯。
導致這種現象的原因是,在某個執行緒執行 check 方法時,其他執行緒不斷執行 pass 方法,改謝了 name 欄位和 address 欄位的值。
這也是多執行緒程式設計的難點之一。如果顯示除錯資訊的程式碼本身就是非執行緒安全的,那麼顯示的除錯資訊就很可能是錯誤的。
如果連操作測試和除錯資訊都無法確保安全性,那就進行程式碼評審吧。多個人一起仔細閱讀程式碼,確認是否會發生問題,這是確保程式安全性的一個有效方法。
修改 Gate 類使其執行緒安全
// Gate.java public class Gate { ... public synchronized void pass(String name, String address) { this.counter++; this.name = name; this.address = address; check(); } ... } 複製程式碼
之後程式就可以正常的執行,也不在列印 BROKEN 的 log 資訊了
Single Threaded Execution 模式歸納
SharedResource 共享資源
在剛才的示例中,Gate 類扮演 SharedResource 的角色
SharedResource 角色是可被多個執行緒訪問的類,包含很多方法,但這些方法主要分為如下兩類:
- safeMethod: 多個執行緒同時呼叫也不會發生問題的方法
- unsafeMethod:多個執行緒同時呼叫會發生問題,因此必須加以保護的方法
而 unsafeMethod 在被多個執行緒同時執行時,例項狀態有可能發生分歧。這時就需要保護該方法,使其不被多個執行緒同時訪問。Java 則是通過將 unsafeMethod 宣告為 synchronized 方法來進行保護
死鎖
在該模式下,滿足下列條件時,死鎖就會發生
- 存在多個 SharedResource 角色
- 執行緒在持有著某個 SharedResource 角色鎖的同時,還想獲取其他 SharedResource 角色的鎖
- 獲取 SharedResource 角色的鎖的順序並不固定
原子操作
不可分割的操作通常稱為原子操作。
上述示例中 Gate類是執行緒安全的 我們將 pass 宣告為了 synchronized 方法,這樣 pass 方法也就成為了原子操作
Java 程式設計規範中定義了一些原子操作。例如 char、int 等基本型別的賦值和引用操作都是原子的。另外,物件等引用型別的賦值和引用操作也是原子的。由於本身就是原子的,所以就算不加上 synchronized,這些操作也不會被分割。但是 long、double 的賦值和引用操作並不是原子的
總結如下:
- 基本型別、引用型別的賦值和引用是原子操作
- 但 long 和 double 的賦值和引用是非原子操作
- long 或 double 線上程間共享時,需要將其放入 synchronized 中操作,或者宣告為 volatile
計數訊號量和 Semaphore 類
上面介紹 Single Threaded Execution 模式用於確保某個區域“只能由一個執行緒”執行。下面我們將這種模式進一步擴充套件,以確保某個區域“最多隻能由 N 個執行緒”執行。這時就要用計數訊號量來控制執行緒數量。
java.util.concurrent 包提供了表示計數訊號量的 Semaphore 類
資源的許可個數將通過 Semaphore 的建構函式來指定
Semaphore 的 acquire 方法用於確保存在可用資源。當存在可用資源時,執行緒會立即從 acquire 方法返回,同時訊號量內部的資源個數會減 1 。 如無可用資源,執行緒阻塞在 acquire 方法內,直至出現可用資源。
Semaphore 的 release 方法用於釋放資源。釋放資源後,訊號量內部的資源個數會增加 1。另外如果 acquire 中存在等待的執行緒,那麼其中一個執行緒會被喚醒,並從 acquire 方法返回。
示例
// BoundedResource.java public class BoundedResource { private final int permits; private final Semaphore semaphore; private final Random random = new Random(314159); public BoundedResource(int permits) { this.semaphore = new Semaphore(permits); this.permits = permits; } public void use() throws InterruptedException { try { this.semaphore.acquire(); doUse(); } finally { this.semaphore.release(); } } private void doUse() throws InterruptedException { System.out.println(Thread.currentThread().getName() + " : BEGIN used = " + (this.permits - this.semaphore.availablePermits())); Thread.sleep(this.random.nextInt(500)); System.out.println(Thread.currentThread().getName() + " : END used = " + (this.permits - this.semaphore.availablePermits())); } } 複製程式碼
// SemaphoreThread.java public class SemaphoreThread extends Thread{ private final Random random = new Random(26535); private final BoundedResource resource; public SemaphoreThread(BoundedResource resource) { this.resource = resource; } @Override public void run() { try { while (true) { this.resource.use(); Thread.sleep(this.random.nextInt(2000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } 複製程式碼
// Main.java public class Main { public static void main(String[] args) { BoundedResource boundedResource = new BoundedResource(3); new SemaphoreThread(boundedResource).start(); new SemaphoreThread(boundedResource).start(); new SemaphoreThread(boundedResource).start(); } } 複製程式碼
列印結果:
Thread-0 : BEGIN used = 2 Thread-2 : BEGIN used = 3 Thread-1 : BEGIN used = 2 Thread-2 : END used = 3 Thread-1 : END used = 2 Thread-0 : END used = 1 Thread-2 : BEGIN used = 1 Thread-2 : END used = 1 Thread-1 : BEGIN used = 1 Thread-0 : BEGIN used = 2 Thread-1 : END used = 2 Thread-0 : END used = 1 Thread-2 : BEGIN used = 1 Thread-2 : END used = 1 Thread-1 : BEGIN used = 1 Thread-0 : BEGIN used = 2 Thread-2 : BEGIN used = 3 Thread-0 : END used = 3 複製程式碼