JVM平臺上的響應式流(Reactive Streams)規範
//
Reactive Streams
//
響應式流是一個倡議,用來為具有非阻塞後壓的非同步流處理提供一個標準。大家努力的目標集中在執行時環境(JVM和JavaScript)和網路協議上。
注:響應式流其實就是一個規範,本文講解的正是這個規範,且這個規範已經被引入到JDK9裡了。
後壓:就是下游出現了問題,得不到解決時,這個問題就會逆流而上,繼而影響上游。
如果一個路口紅綠燈壞了造成堵車,如果不管的話,用不了太長時間,車就會堵到上一個路口,如果再不管的話,整條路都會被賭滿。
//
JDK9裡的java.util.concurrent.Flow
//
本規範裡的這些介面在JDK9的java.util.concurrent.Flow裡都已經可用,它們在語義上與響應式流的各介面基本上一比一相等。
這意味著將有一個遷移週期,直至第三方庫都採用JDK裡的新型別,這個週期自然希望短一些。
這取決於第三方庫的完整語義相等,和Reactive Streams和JDK的Flow之間的介面卡庫和一個與JDK的Flow型別可直接相容的TCK。
因為這個標準在JDK9才引入,在此之前一些第三方庫都已經存在,所以需要一個過渡階段,讓第三方庫慢慢採用JDK的標準。
TCK是一個工具,下文有介紹。
//
Goals, Design and Scope
//
處理流資料,尤其是線上資料,它們的量是無法預知的,在一個非同步系統中要求格外小心。
最重要的問題是資源的消耗需要被小心地控制,以便一個快速的資料來源不會淹沒流的目的地(下游)。
需要非同步的目的是為了並行地使用計算資源,如協調網路上多個主機,或一個機器的多個CPU核。
響應式流的主要目標是控制橫穿一個非同步邊界的流資料的交換。
考慮到向另一個執行緒或執行緒池傳遞元素,同時確保接收端不被強迫緩衝任意數量的資料。
換句話說,後壓是這個模型的一個必須部分,目的是允許佇列在被界定的執行緒之間進行調節(斡旋)。
如果後壓訊號是同步的,非同步處理的好處將被否定,因此對一個響應式流實現的所有方面的完全非阻塞和非同步行為的授權需要小心一些。
這個規範的意圖就是允許建立許多種一致的實現,它們憑藉遵守規則將能夠平滑地互操作,在一個流應用的整個處理圖中保留前文提到的好處和特徵。
需要注意的是流操作的精確特性(轉化,分割,合併等)並沒有被這個規範包括。響應式流只關心在不同的API元件間調節流資料。在他們的開發中,已經非常細心地確保所有組合流的基本方式都能夠被表達。
總之,響應式流是JVM上面向流的庫的一個標準和規範:
處理一個潛在的無限數目元素,
依次地,
非同步地在元件間傳遞元素,
帶有強制的非阻塞後壓。
響應式流規範由以下部分組成:
1、API規定了需要實現的響應式流型別,並且在不同的實現間完成互操作性。
2、技術相容性工具(TCK)是一個標準的測試套件,用於各種實現的一致性測試。
各種實現可以自由地實現規範中沒有提到的額外特性,只要它們遵從API要求和在TCK中通過測試。
//
API Components
//
API由以下元件組成,響應式流的實現必須提供它們:
1、Publisher,釋出者(生產者)
2、Subscriber,訂閱者(消費者)
3、Subscription,訂閱
4、Processor,處理者
它們其實是4個介面,先睹為快:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
一個釋出者是一個潛在的無限數量的序列元素的一個提供者,按照收到的來自於它的訂閱者的需要來發布這些元素。
onSubscribe onNext* (onError | onComplete)?
這意味著onSubscribe方法總是被呼叫,後面跟著一個可能的無限數量onNext方法呼叫(因為訂閱者的請求)。如果失敗的話,後跟一個onError方法呼叫,或當沒有更多的元素可用時,是一個onComplete方法呼叫,只要這個Subscription(訂閱關係)沒有被取消。
//
Glossary
//
術語,釋義
Signal,本義是訊號。作為一個名詞,指的是這些方法onSubscribe,onNext,onComplete,onError,request(n)或cancel中的一個。作為一個動詞,指的是呼叫這些方法中的一個。
表面上可以理解為發訊號進行通知,本質上也是通過方法呼叫來實現的。
Demand,本義是需求。作為一個名詞,指的是一個訂閱者(向釋出者)請求的一定數量的元素,它還沒有被髮布者分發。作為一個動詞,指的是請求更多元素的行為動作。
可以看作是訂閱者向釋出者發出的需求/動作,想要獲取更多的元素。釋出者暫時還沒有迴應。
Synchronous(ly),本義是同步的。指的是在呼叫執行緒上執行(沒有新開執行緒)。
Return normally,本義是正常返回。指的是僅返回已宣告過的型別的值給呼叫者。如果想傳送一個失敗給訂閱者,唯一合法的方式是通過onError(回撥)方法。
Responsivity,本義是響應度。指的是已準備就緒有能力來做出響應。在這個文件裡用來指示不同的元件不應該互相削弱響應的能力。
Non-obstructing(堵塞),本義是不堵塞。指的是描述一個方法的質量(品質),即在呼叫執行緒上儘可能快地執行完。這意味著,例如,避免重的計算和其它將拖住呼叫者執行緒執行的事情(因為沒有新開執行緒)。
Terminal state,本義是終止狀態。對於一個釋出者,指的是當onComplete或者onError已經被呼叫。對於一個訂閱者,指的是當一個onComplete或onError(回撥方法)已經收到。
NOP,指的是執行對於呼叫執行緒來說沒有可檢測到的影響,能夠像這樣安全地被呼叫任意次。
External synchronization,本義是外部同步。為了執行緒安全的目的,協調訪問在這個規範裡定義的結構之外被實現,使用的技術像但不限於atomics,monitors或locks。
Thread-safe,能夠安全地被同步或非同步呼叫,不需要外部的同步來確保程式的正確性。
//
SPECIFICATION
//
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
1、一個釋出者對一個訂閱者的onNext呼叫總次數必須總是小於或等於訂閱者的Subscription請求的元素總數。
2、一個釋出者可能呼叫的onNext次數比要求的少,然後通過呼叫onComplete或onError來終止Subscription。
3、對一個訂閱者的onSubscribe,onNext,onError和onComplete呼叫必須以一個執行緒安全的方式進行,如果被多個執行緒執行,使用external synchronization。
4、如果一個釋出者失敗,它必須呼叫一個onError。
5、如果一個釋出者成功地終止(對於有限流),它必須呼叫一個onComplete。
6、如果一個釋出者呼叫一個訂閱者上的onError或onComplete方法,那個訂閱者的Subscription必須認為已被取消。
7、一旦一個terminal state已經被呼叫(onError,onComplete),它要求沒有進一步的呼叫發生。
8、如果一個訂閱被取消,它的訂閱者必須最終停止被呼叫。
9、釋出者的subscribe方法裡必須在早於對訂閱者上的任何方法呼叫之前先呼叫onSubscribe方法,且必須return normally。當訂閱者是null的時候,此時必須向呼叫者丟擲java.lang.NullPointerException異常。對於其它任何情況,通知失敗(或拒絕訂閱者)的唯一合法方式是呼叫onError。
10、釋出者的subscribe方法可能被呼叫任意多次,但是每次必須使用一個不同的訂閱者。
Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
1、一個訂閱者必須通過訂閱(Subscription)的request(long n)方法宣告需求,然後接收onNext呼叫。
2、如果一個訂閱者懷疑它的呼叫處理將消極地影響它的釋出者的響應度,建議非同步地分發它的呼叫。
3、訂閱者的onComplete()和onError(Throwable t)這兩個方法裡禁止呼叫訂閱或釋出者上的任何方法。
4、訂閱者的onComplete()和onError(Throwable t)這兩個方法在接收到呼叫後必須認為訂閱已經被取消。
5、一個訂閱者必須在收到onSubscriber之後呼叫指定訂閱上的cancel()方法取消該訂閱,如果它已經有一個活動的訂閱。
6、一個訂閱者必須呼叫訂閱的cancel()方法,如果這個訂閱不再需要的話。
7、一個訂閱者必須確保所有對訂閱發生的呼叫都來自於同一個執行緒或為它們各自提供external synchronization。
8、一個訂閱者必須準備好接收一到多個onNext呼叫,在已經呼叫過訂閱的cancel()方法之後如果還有請求的元素即將傳送。訂閱的cancel()方法並不保證立即執行底層的清理操作。
9、一個訂閱者必須準備好接收一個onComplete呼叫,不管之前有或沒有呼叫過訂閱的request(long n)方法。
10、一個訂閱者必須準備好接收一個onError呼叫,不管之前有或沒有呼叫過訂閱的request(long n)方法。
11、一個訂閱者必須確保它的所有的方法呼叫發生在它們各自的處理之前。該訂閱者必須小心合適地釋出呼叫到它的處理邏輯。
12、訂閱者的onSubscribe方法必須最多被呼叫一次,對於一個給定的訂閱者。
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
1、訂閱的request和cancel方法必須在它的訂閱者上下文裡被呼叫。
2、訂閱必須允許訂閱者在onNext或onComplete方法裡同步地呼叫訂閱的request方法。
3、訂閱的request方法必須放置一個關於釋出者和訂閱者間的同步遞迴呼叫的上界。
4、訂閱的request方法應該尊重它的呼叫者的響應度,通過以一個適時的方式返回。
5、呼叫的cancel方法必須尊重它的呼叫者的響應度,通過以一個適時的方式返回,必須是冪等的和執行緒安全的。
6、在訂閱取消之後,額外的request(long n)呼叫必須是NOP。
7、在訂閱取消之後,額外的cancel()呼叫必須時NOP。
8、當訂閱沒有被取消時,request(long n)方法必須註冊給定數目的額外元素,這些元素將被生產並分發給各自的訂閱者。
9、當訂閱沒有被取消時,request(long n)必須使用一個java.lang.IllegalArgumentException異常來呼叫onError,如果引數小於等於0。引起的原因應該解釋為不是正數的呼叫是非法的。
10、當訂閱沒有被取消,request(long n)可以同步地呼叫這個(或其它)訂閱者上的onNext。
11、當訂閱沒有被取消,request(long n)可以同步地呼叫這個(或其它)訂閱者上的onComplete或onError。
12、當訂閱沒有被取消,cancel()必須請求釋出者最終停止呼叫它的訂閱者上的方法。這個操作不要求立即影響訂閱。
13、當訂閱沒有被取消,cancel()必須請求釋出者最終刪除對相關訂閱者的任何引用。
14、當訂閱沒有被取消,呼叫cancel()可以引起釋出者(如果是有狀態的)進入關閉狀態,如果在此刻沒有其它的訂閱存在。
15、呼叫訂閱的cancel方法必須是return normally。
16、呼叫訂閱的request方法必須是return normally。
17、一個訂閱必須支援無數次地呼叫request方法,必須支援到2^63 - 1(Long.MAX_VALUE)次。如果一個需求等於或大於2^63 - 1(Long.MAX_VALUE),或許被髮布者認為是真正的無界。
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
1、一個處理器表示一個處理階段,它既是一個訂閱者又是一個釋出者,必須遵守它們兩者的契約。
2、一個處理器可以選擇恢復一個onError呼叫。如果它選擇這樣做,必須認為訂閱被取消,否則必須立即傳播onError呼叫到它的訂閱者。
在不被強制時,當最後一個訂閱者取消它的訂閱時,取消一個處理器的上游訂閱是一個好主意,可以讓這個取消呼叫往上游傳播。
//
Asynchronous vs Synchronous Processing
//
響應式流API要求所有的元素處理(onNext呼叫)或終止呼叫(onError,onComplete)禁止阻塞釋出者。然而,每一個on*(以on開頭的方法)處理器可以同步地處理事件,也可以非同步地處理。
看下面這個示例:
nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)
它有一個非同步的來源和非同步的目的地。讓我們假設來源和目的地都是selector事件迴圈。Subscription.request(n)必須是一個連結從目的地到來源。這就是現在每一個實現都能選擇如何做這些的地方。
下面使用管道操作符(|)來呼叫非同步邊界(佇列和排程器),R#表示資源(可能是執行緒)。
nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ----| - R2 - | -- R3 --- | ---------- R4 ----------------
在這個示例中,這3個消費者中的每一個,map,filter和consumeTo非同步地排程work。它們可能在同一個事件迴圈上,也可能是分離的執行緒。
看下面這個示例
nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------
這裡只有最後一步是非同步地排程,通過把work新增到NioSelectorOutput事件迴圈。map和filter步驟都在來源執行緒上同步地執行。
或者另一種實現把這些操作都安裝到最終消費者那裡:
nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------
所有這些變體都是“非同步流”。它們都有自己的位置,每一個有不同的權衡包括效能和實現複雜度。
響應式流允許實現靈活性來管理資源和排程,混合非同步和同步處理,在一個非阻塞,非同步,動態的推拉式流的限制內。
為了允許所有參與API元素(Publisher/Subscription/Subscriber/Processor)的完全的非同步實現,這些介面定義的所有方法都返回void。
//
Subscriber controlled queue bounds
//
其中一個底層設計原則是,所有緩衝區大小都是有界的,這些界限必須是知道的,且由訂閱者控制。這些界限用元素數目(它依次轉化為onNext的呼叫次數)這樣的術語來表達。任何實現的目標都是為了支援無限流(尤其是高輸出速率流),一般需要強迫界限都沿著避免OOM錯誤和限制資源使用的方式。
因為後壓是強制的,使用無界緩衝區能夠被避免。一般來說,只有在當一個佇列可能無界增長時,此時也是釋出者端比訂閱者端保持一個更高的速率,且持續了一段較大的時間,但是這種情形是被後壓來處理的。
佇列界限能夠被控制通過一個訂閱者為了適合數目的元素而呼叫需求。在任何時候訂閱者都知道:
請求的總元素數目:P
已經處理的元素數目:N
然後最大數量的元素可能達到是P - N,直到更多的需求被髮送通知給釋出者。這種情況下,訂閱者也知道在它的輸入緩衝區裡的元素數目B,然後這個界限可以被重新精確為P - B - N。
這些界限必須被一個釋出者尊重,獨立於無論它表示的源是能夠被後壓的或不能。在這種源的生產速率不能被影響的情形下,例如鐘錶的滴滴答答或滑鼠的移動,釋出者必須選擇要麼緩衝元素或拋棄元素來遵守這個強加的界限。
訂閱者在接收一個元素後發一個請求獲取另一個元素,可以有效地實現一個停止和等待協議,這裡這個需求訊號和一個ACK(迴應)相等。通過提需求的方式獲取多個元素,ACK花銷是分期償還的。
這是值的注意的,訂閱者被允許在任何時間點發訊號提出需求,允許它是為了避免釋出者和訂閱者之間不必要的延遲(例如,保持它的輸入緩衝區是滿的,不需要等待完整的往返時間)。
//
Legal
//
這個工程是一個協作成果,完成它的工程師來自於Kaazing,Lightbend,Netflix,Pivotal,Red Hat,Twitter和許多其他公司。程式碼被開放到公共領域是為了允許感興趣的團體自由地使用,它們想建立一個相容性的實現。詳細內容請檢視COPYING。
程式設計新說
用獨特的視角說技術