掌握Kotlin Coroutine之 資料共享
Coroutine 既然是非同步操作,並且可以通過多執行緒的dispatcher
來併發執行。所以同樣會遇到多執行緒併發非同步操作的各種問題。其中最重要的一個問題就是如何控制對shared mutable state(可變的共享資料)
的訪問。多執行緒中的一些解決方法在Coroutine 中也能使用,但是 Coroutine 也有自己特有的解決方式。
下面先來看一些在其他多執行緒中所採用的解決資料共享問題的方法,這些方法同樣也可以在 Coroutine 中使用。
問題
我們使用下面的示例當做需要解決的問題,通過這個示例來演示各種解決方式。
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) { val n = 100 // 子 coroutines 的數目 val k = 1000 // 每個子 coroutine 重複執行 action 函式的次數 val time = measureTimeMillis { val jobs = List(n) { launch { repeat(k) { action() } } } jobs.forEach { it.join() } } // measureTimeMillis 函式可以計算裡面程式碼塊所執行的時間, // 然後列印計算的時間 println("Completed ${n * k} actions in $time ms") }
下面使用一個非常簡單的任務 —- 遞增一個共享可變的變數。注意下面的程式碼使用的是GlobalScope
來執行的,使用的是預設的Dispatchers.Default
來執行:
GlobalScope.massiveRun { counter++ } println("Counter = $counter")
下面是在Pixel手機上多次執行的結果:
03-16 13:40:46.101 I Completed 100000 actions in 417 ms 03-16 13:40:46.101 I Counter = 97752 03-16 13:47:44.759 I Completed 100000 actions in 330 ms 03-16 13:47:44.759 I Counter = 95835 03-16 13:47:50.163 I Completed 100000 actions in 75 ms 03-16 13:47:50.163 I Counter = 90270 03-16 13:47:55.725 I Completed 100000 actions in 236 ms 03-16 13:47:55.725 I Counter = 94529
請注意:如果你的測試手機或者電腦CPU只有2核或者1核的話,列印的結果可能是”Counter = 100000″,這樣因為Coroutine 都在同一個執行緒內執行導致的。
volatile 變數無法解決上面的問題
通常使用volatile
修飾符來解決一個變數多執行緒訪問的問題。但是對於上面的示例是無效的,比如:
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() { @Volatile // 在 Kotlin 中`volatile` 是通過註解實現的 var counter = 0 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) launch { GlobalScope.massiveRun { counter++ } println("Counter = $counter") } } }
上面的程式碼執行的稍微慢一點,但是列印的結果依然不是Counter = 100000
。原因在於 volatile 變數只能保證原子操作,而上面的counter++
是遞增操作,是先遞增再賦值 不是原子操作。
使用執行緒安全的資料型別
執行緒安全(Thread-safe)的資料型別是另外一種常見的解決方式,比如AtomicInteger
類,裡面的一些函式都是執行緒安全的,對裡面的狀態做了保護,下面是使用這種方式的示例:
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() { var counter = AtomicInteger() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) launch { GlobalScope.massiveRun { counter.incrementAndGet() } println("Counter = ${counter.get()}") } } }
執行結果如下:
03-16 16:58:49.346 I Completed 100000 actions in 421 ms 03-16 16:58:49.346 I Counter = 100000
上面這種使用執行緒安全的資料結構的方式,可以解決大部分的簡單計算問題以及集合操作問題。但是對於複雜的狀態管理和複雜的運算方法如果找不到對應的封裝好的執行緒安全的資料結構可以使用的話,則是需要另尋他法的。
Thread confinement fine-grained(細粒度執行緒限制)
執行緒限制(Thread confinement)是一種把訪問可變共享資料限制到同一個執行緒去訪問的方法。通常在 UI 框架中都是這樣的,比如 安卓系統只有一個 UI執行緒,所有針對 UI 的操作都需要在該執行緒完成。 使用一個單執行緒執行環境就可以實現了:
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() { val counterContext = newSingleThreadContext("CounterContext") var counter = 0 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) launch { GlobalScope.massiveRun { withContext(counterContext) { //在單執行緒中執行對共享變數的訪問 counter++ } } println("Counter = ${counter}") } } }
下面是執行結果:
03-16 16:05:28.945 I Completed 100000 actions in 3386 ms 03-16 16:05:28.945 I Counter = 100000 03-16 16:06:04.649 I Completed 100000 actions in 3526 ms 03-16 16:06:04.649 I Counter = 100000
可以看到上面執行結果是正確的,但是執行的非常慢。之前執行完畢只需要 400毫秒,上面的單執行緒執行結果需要 3秒多。每次在計算counter
值的時候都需要從另外一個執行緒切換到計算執行緒,所以比較耗時。
Thread confinement coarse-grained(粗粒度執行緒限制)
上面細粒度的執行緒限制導致每次執行到遞增操作的時候,都需要切換一下執行緒,導致執行比較慢。而通過粗粒度的執行緒控制,可以把更大範圍的計算邏輯放到同一個執行緒中去執行,避免執行緒的頻繁切換。如下示例中把massiveRun
函式在同一個執行緒中去執行:
CoroutineScope(counterContext).massiveRun { counter++ } println("Counter = $counter")
這樣的話,執行的就快多了,並且結果依然是正確的:
03-16 16:09:28.145 I Completed 100000 actions in 281 ms 03-16 16:09:28.145 I Counter = 100000
Mutual exclusion(互斥鎖)
互斥鎖(Mutual exclusion)的解決方法是通過一個鎖來保護關鍵的程式碼不會同時被多執行緒執行。在 Java 中通常使用synchronized
關鍵字或者ReentrantLock
物件來實現。在 Coroutine 中使用Mutex
,使用lock
和unlock
函式來保護關鍵的程式碼。Mutex.lock()
是一個suspending function
所以不會阻塞執行緒的執行。
由於使用mutex
程式碼通常會先獲取鎖,執行完程式碼後再釋放鎖,所以程式碼通常是這麼寫的:mutex.lock(); try { ... } finally { mutex.unlock() }
,而為了方便使用鎖,Kotlin 提供了一個withLock
擴充套件函式,這樣使用鎖就很簡單了:
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() { val mutex = Mutex() var counter = 0 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) launch { GlobalScope.massiveRun { mutex.withLock { // 使用擴充套件函式來簡化 mutex 的使用 counter++ } } println("Counter = ${counter}") } } }
上面示例中也是在細粒度上使用鎖。所以同樣會涉及到頻繁獲取釋放鎖的操作導致比較耗時。 當你必須要週期性的修改共享資料並且找不到其他更好方式的時候,可以採用上面這種解決方案。
Actors
Actor 模型是 Carl Hewitt 於1973年發表的《A Universal Modular Actor Formalism for Artificial Intelligence 》論文中提出的一種用於併發計算的模型。雖然該模型出現很早,但是由於早期硬體發展達不到該模型的執行條件,所以 Actor 在最近10來年才有所發展。特別是在2000年以後,CPU速度達到了頂點,CPU速度無法成指數級增長了。隨著業務的複雜,人們對計算能力的要求越來越高,當CPU時鐘速度無法快速增長後,人們提出了多核CPU的概念,多個CPU來分工協作,從而提高一個任務的執行效率。這樣多執行緒併發的計算場景就出現了。
而 Actor 模型就是解決多執行緒併發計算的。早期在Smalltalk
語言中實現,後來在 Java 中也通過akka
框架實現。
Actor 模型和麵向物件程式設計(OOP)概念類似,在 OOP 中everything is an object
;而在 Actor 模型中everything is an actor
。
一個 Actor 是一個獨立的計算單元,根據其受到的訊息來做出不同的響應,當一個 Actor 接收到一個訊息的時候可以做出如下不同的響應:
– 建立更多的 Actor
– 把訊息傳送給其他 Actor
– 指定如何處理下一條訊息
上面的操作並沒有先後順序,並且可以併發的執行。所以 Actor 模型有兩個重要的概念: Actor 和訊息佇列(mailbox)。
每個 Actor 都是獨立的計算單元,和其他的 Actor 沒有任何依賴關係。每個 Actor 也有一個識別符號,這樣其他 Actor 可以向它傳送訊息。
Actor 有個訊息佇列(在 Actor 模型中有個專業的叫法 — mailbox),傳送給一個 Actor 的訊息都處於 mailbox 中,Actor 每次從 mailbox 中拿出一個訊息來處理。
注意:一個 Actor 處理訊息是按順序一個一個處理的,如果想要併發效果,就需要把多個訊息分別傳送給不同的 Actor。
Actor 有自己的狀態,相互不影響,Actor 收到訊息後可以更新自己的狀態,然後繼續處理下一條訊息。由於 Actor 是獨立的,相互不影響的,所以具有很好的容錯性 ,如果一個 Actor 出錯了,不會影響其他的 Actor, 這個出錯的 Actor 可以有父 Actor來處理它,可以讓他恢復初始狀態繼續工作,也可以把他隔離開來,不再給他傳送訊息。
可以舉個產品開發測試的例子來說明 Actor 模型。在產品開發中會涉及到下面一些人員:產品經理PM、開發人員RD、測試人員Tester,每個人員都是一個 Actor,而產品每個需求、每個bug都是一個要處理的訊息。每個訊息可以傳送給不同的 Actor 來處理,比如有個需求不明確的問題被開發人員發現了,開發人員把這個問題傳送給了測試人員,測試人員收到後發現這個訊息他處理不了,所以找到產品經理把這個訊息又轉發過去了(轉發訊息給 Actor),然後產品經理處理這個訊息。當產品經理收到這個訊息後,發現這個問題比較複雜,自己處理不過來,則可以指定更多的產品經理(建立更多的 Actor)來處理這個訊息。
下面通過示例看看如何使用 Coroutine 提供的actor
函式來使用 Actor 模型。
首先需要定義一些不同的訊息型別。Actor 針對不同的訊息由不同的處理方式。在 Kotlin 中訊息型別可用sealed classes
來定義。下面定義了一個 sealed classCounterMsg
,裡面有兩個不同的類IncCounter
和GetCounter
。IncCounter
代表遞增的訊息,GetCounter
代表獲取其值的訊息。在GetCounter
中使用CompletableDeferred
來獲取結果。
// counterActor 的訊息型別 sealed class CounterMsg // 實現遞增操作的訊息型別 object IncCounter : CounterMsg() // 請求當前值的訊息型別 class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
然後使用actor
函式來建立一個counterActor
函式來處理不同的訊息:
// 該函式建立一個 counter actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor 的狀態 for (msg in channel) { // 遍歷 actor mailbox 中的訊息 when (msg) { is IncCounter -> counter++ // 如果是 IncConter 訊息,則執行遞增操作 is GetCounter -> msg.response.complete(counter) // 如果是取值的訊息,則返回當前的值 } } }
下面是具體呼叫的程式碼:
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) launch { val counter = counterActor() // 建立一個 actor GlobalScope.massiveRun { counter.send(IncCounter) // 給 Actor 傳送訊息 } // 傳送獲取最終結果的訊息 val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // 關閉 actor } } }
上面這個示例可以解決我們最前面提出的這個問題,但是其實並沒有完全應用 Actor 模型的能力,上面的示例中只有一個 Actor 來處理不同的訊息。只不過,不同的訊息在不同的執行緒中被處理。在 Kotlin 中一個 Actor 就是一個按順序執行所收到訊息的 coroutine ,所以把共享的資料放到一個 Actor 上作為一種併發共享資料的解決方案。
在 Kotlin 中使用 Actor 比使用鎖更高效,actor 獨立封裝了其狀態可以在不同的執行緒中執行,所以 actor 不需要來回切換執行的執行緒。
上面只是簡單介紹了 Actor 模型的概念,以及在 Coroutine 中 actor 的基礎用法,請搜尋 Actor 模型 來了解更詳細的介紹。
總結
本文介紹了在 Coroutine 中共享資料的一些方法。這是本系列的最後一篇文章,到此關於 Kotlin 語言 Coroutine 的介紹就結束了,為了進一步熟練掌握這些概念,後面呢計劃改進安卓的 Todo 示例專案,用 Coroutine 來實現。具體啥時候能改進完成目前還不清楚,到時候有空弄完了再告訴大家。
Mutex API 文件:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
withLock 擴充套件函式文件:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
Actor API 文件:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
CompletableDeferred API 文件:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html
Carl Hewitt 論文地址:http://worrydream.com/refs/Hewitt-ActorModel.pdf
Akka 官網:https://akka.io/