Kotlin協程原始碼分析(二)之Coroutine
這章理一下channel,先分享一句學習時候看到的話:Do not communicate by sharing memory; instead, share memory by communicating.
。本來好像是用在go
上的,但也有著異曲同工之妙啊
channel
顧名思義是管道,有入口與出口。因此最底層有sendChannel&receiveChannel
produce
Produce = Coroutine + Channel
example:
val channel: ReceiveChannel<Int> = produce<Int>(CommonPool) { for (i in 0 .. 100) { delay(1000) channel.send(i) } } launch(UI) { for (number in channel) { textView.text = "Latest number is $number" } }
produce
也是產生協程,跟普通的launch
不同他會返回一個receiveChannel
,後面會看到receiveChannel
是一個迭代器,同時會suspend
在hasNext和next()
上,因此另一個協程就可以使用for...in...
等待接受。
@ExperimentalCoroutinesApi public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> { val channel = Channel<E>(capacity) val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(newContext, channel) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine }
同時,produce
發射完成後是會自己關閉的,省的我們自己關閉通道:
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) { val cause = (state as? CompletedExceptionally)?.cause val processed = _channel.close(cause) if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause) }
通過job
的invokeOnCompletion
實現。
actor
example
val channel: SendChannel<View> = actor(UI) { for (number in channel) { textView.text = "A new click happend!" } } button.setOnClickListener { launch(CommonPool) { channel.send(it) } }
與produce
相反返回sendChannel
高階用法
fun Node.onClick(action: suspend (MouseEvent) -> Unit) { // launch one actor to handle all events on this node val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main) { for (event in channel) action(event) // pass event to action } // install a listener to offer events to this actor onMouseClicked = EventHandler { event -> eventActor.offer(event) } }
我們看這裡用了offer
而不是send
,我們可以把for..in..
先簡單的寫成以下形式:
while(iterator.hasNext()){ //suspend fuction val event = iterator.next() //suspend function action(event) } private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutine sc@ { cont -> val receive = ReceiveHasNext(this, cont) while (true) { if (channel.enqueueReceive(receive)) { channel.removeReceiveOnCancel(cont, receive) return@sc } // hm... something is not right. try to poll val result = channel.pollInternal() this.result = result if (result is Closed<*>) { if (result.closeCause == null) cont.resume(false) else cont.resumeWithException(result.receiveException) return@sc } if (result !== POLL_FAILED) { cont.resume(true) return@sc } } }
假設佇列裡沒有東西時,enqueue
一個receiveHasNext
進行等待。過會解釋一下channel
的原理。現在只要知道,當有sender.send
時,與receive
關聯的cont
就會被呼叫resume
,那麼顯而易見,當action
正在處理時佇列中沒有receiver
,而offer
是不會suspend
的,因此事件就被拋棄。
conflation事件合併
fun Node.onClick(action: suspend (MouseEvent) -> Unit) { // launch one actor to handle all events on this node val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- Changed here for (event in channel) action(event) // pass event to action } // install a listener to offer events to this actor onMouseClicked = EventHandler { event -> eventActor.offer(event) } }
這裡我們使用CONFALTED
,即合併所有事件,因此接受者永遠處理最近一個。原理如下:
result === OFFER_FAILED -> { // try to buffer val sendResult = sendConflated(element) when (sendResult) { null -> return OFFER_SUCCESS is Closed<*> -> { conflatePreviousSendBuffered(sendResult) return sendResult } } // otherwise there was receiver in queue, retry super.offerInternal }
當offer
失敗時需要suspend
等待,(說明還沒有接受者或者人家正忙著),插入sendBuffered
,同時移除前面已有的sendBuffered
var prev = node.prevNode while (prev is SendBuffered<*>) { if (!prev.remove()) { prev.helpRemove() } prev = prev.prevNode }
這樣永遠是最近一個生效。
大概channel原理
其實看abstractChannel
會先看到一個queue
,這時候顯而易見會把它當做是像linkedlist
那種塞資料的地方。但其實queue
是用來放receive/send node
。當佇列為空時,send
時會先從佇列取第一個receiveNode
,取不到就suspend
,把自己當成sendNode
放入;不然就把資料直接交給receiveNode
。
具體channel實現時,例如ArrayChannel(buffer)
,會多加一個buffer
佇列,當佇列為空時,send
時會先從佇列取第一個receiveNode
,取不到就放入buffer
佇列,如果buffer
佇列滿了,把自己當成sendNode
放入就suspend
;同時把不然就把資料直接交給receiveNode
。
select
suspend fun selectInsult(john: ReceiveChannel<String>, mike: ReceiveChannel<String>) { select<Unit> { //<Unit> means that this select expression does not produce any result john.onReceive { value ->// this is the first select clause println("John says '$value'") } mike.onReceive { value ->// this is the second select clause println("Mike says '$value'") } } }
select
可以等任何一個回來,也可以等await
:
fun adult(): Deferred<String> = async(CommonPool) { // the adult stops the exchange after a while delay(Random().nextInt(2000).toLong()) "Stop it!" } suspend fun selectInsult(john: ReceiveChannel<String>, mike: ReceiveChannel<String>, adult: Deferred<String>) { select { // [..] the rest is like before adult.onAwait { value -> println("Exasperated adult says '$value'") } } }
跟linux
裡的select
其實類似,(能知道是哪個嗎?):
final override val onReceive: SelectClause1<E> get() = object : SelectClause1<E> { override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) { registerSelectReceive(select, block) } } private fun <R> registerSelectReceive(select: SelectInstance<R>, block: suspend (E) -> R) { while (true) { if (select.isSelected) return if (isEmpty) { val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false) val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return when { enqueueResult === ALREADY_SELECTED -> return enqueueResult === ENQUEUE_FAILED -> {} // retry else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") } } else { val pollResult = pollSelectInternal(select) when { pollResult === ALREADY_SELECTED -> return pollResult === POLL_FAILED -> {} // retry pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException) else -> { block.startCoroutineUnintercepted(pollResult as E, select.completion) return } } } } }
能看到onReceive
是實現SelectCaluse1
,同時在selectBuilderImpl
環境下:
override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) { registerSelectClause1(this@SelectBuilderImpl, block) }
所以會往queue
中enqueue
兩個receive節點。
以
同時能看到如果任何一次select
節點獲取資料以後:
when { pollResult === ALREADY_SELECTED -> return pollResult === POLL_FAILED -> {} // retry pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException) else -> { block.startCoroutineUnintercepted(pollResult as E, select.completion) return } }
會呼叫block.startCoroutineUnintercepted
:
/** * Use this function to restart coroutine directly from inside of [suspendCoroutine], * when the code is already in the context of this coroutine. * It does not use [ContinuationInterceptor] and does not update context of the current thread. */ internal fun <R, T> (suspend (R) -> T).startCoroutineUnintercepted(receiver: R, completion: Continuation<T>) { startDirect(completion) {actualCompletion -> startCoroutineUninterceptedOrReturn(receiver, actualCompletion) } }
之前講過startCoroutineUnintercepted
其實就是function.invoke()
,所以就呼叫block.invoke(select的completion是自己)
,獲得值後通過uCont.resume
即可。
onAwait
這個和defered
即job(Support)
搞在一起:
private class SelectAwaitOnCompletion<T, R>( job: JobSupport, private val select: SelectInstance<R>, private val block: suspend (T) -> R ) : JobNode<JobSupport>(job) { override fun invoke(cause: Throwable?) { if (select.trySelect(null)) job.selectAwaitCompletion(select, block) } override fun toString(): String = "SelectAwaitOnCompletion[$select]" }
可以看到當任務成功後,select
會被繼續進行
broadcast
首先解決一個問題,一個sender
多個receiver
是怎麼處理的。
val channel = Channel<Int>() launch { val value1 = channel.receive() } launch { val value2 = channel.receive() } launch { channel.send(1) }
因為是1vs1消費。只有第一個會收到,因為它插在等待佇列的第一個。用broadcast
可以保證大家都收到。它維護一個subscribe
的user list
,所有消費者都能收到channel.send
的element
operation
map
public fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Dispatchers.Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> = GlobalScope.produce(context, onCompletion = consumes()) { consumeEach { send(transform(it)) } }
可以實現跟RX
一樣的操作符,接受者收到後經過轉換再進行傳送返回最終新的receiveChannel
hot or cold
channel
是hot
的。
When the data is produced by the Observable itself, we call it a cold Observable. When the data is produced outside the Observable, we call it a hot Observable.
Provide abstraction for cold streams
... 這個todo,後續再說。