Spring Web-Flux中的背壓機制
響應式(反應式)程式設計的好處是背壓Backpressure,可以平衡請求或響應率,這點與非同步機制區別所在,也就是說,當響應堵塞時,會同時堵塞請求,因此reactive響應式=非同步+同步(背壓)。本文解釋了Spring Web-Flux中的背壓機制,假設我們編寫一個Spring Web-Flux的控制器程式碼如下:
@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just("I am First Mono")
}
}
這段程式碼背後的背壓工作機制是什麼?
為了理解Backpressure在WebFlux框架中如何工作,我們必須回顧一下當前預設使用的傳輸層。
我們可能還記得,瀏覽器和伺服器之間的正常通訊(伺服器到伺服器通訊通常也是一樣)是通過TCP連線完成的。WebFlux還是使用該傳輸進行客戶端和伺服器之間的通訊。然後,為了獲得背壓控制,我們必須從Reactive Streams規範的角度來概述背壓的含義。
規範的基本語義定義瞭如何通過背壓來調節流元素的傳輸。
因此,從該宣告中,我們可以得出結論,在Reactive Streams中,背壓是一種通過傳輸(通知)接收者可以消費多少元素來調節生產的機制(消費決定生產); 在這裡,我們有一個棘手的問題。TCP具有位元組抽象而不是邏輯元素抽象。我們通常所說的背壓控制是控制向網路傳送/接收的邏輯元件的數量。即使TCP有自己的流控制,這個流控制仍然是位元組而不是邏輯元素。
在WebFlux模組的當前實現中,背壓由傳輸流控制來調節,但它不會暴露接收方的實際需求。為了最終看到互動流程,請參見下圖:
為簡單起見,上圖顯示了兩個微服務之間的通訊,其中左側傳送生產資料流,右側消費該流。以下編號列表提供了該圖表的簡要說明:
1. 這是WebFlux框架,它正確地將邏輯元素轉換為位元組並返回並將它們傳輸到TCP /從TCP(網路)接收。
2. 這是資料長時間執行處理的開始,一旦作業完成,該資料就會請求下一個資料。
3. 在這裡,雖然沒有來自業務邏輯的需求,但WebFlux將來自網路的位元組排隊,而沒有他們的確認(業務邏輯沒有要求)。
4. 由於TCP流控制的性質,服務A仍然可以向網路傳送資料。
正如我們可能從上圖中注意到的那樣,接收方的請求與傳送方的請求不同(邏輯元素中的請求)。這意味著兩者的請求是獨立的,因此真正背壓僅僅僅適用於WebFlux < - >業務邏輯(服務)互動,服務A < - >服務B互動不是完整的背壓。
所有這些意味著背壓控制在WebFlux中並不像我們預期的那樣公平。
但我仍然想知道如何控制背壓
如果我們仍然希望對WebFlux中的背壓進行公平的控制,我們可以在Project Reactor支援下實現limitRate()。以下示例顯示了我們如何使用:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
正如我們從示例中看到的那樣,limitRate()運算子允許一次定義要預取的資料數。這意味著即使最終訂戶請求Long.MAX_VALUE元素,limitRate執行者也會將該請求拆分為塊,並且不允許一次消費更多。我們可以用資料元素的傳送過程來做同樣的事情:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
上面的示例顯示,即使WebFlux一次請求超過10個元素,也會limitRate()限制對預取大小的需求,並防止一次消費超過指定數量的元素。
另一種選擇是實現自己的Subscriber或擴充套件BaseSubscriber來自Project Reactor。例如,以下是我們如何做到這一點的簡單例子:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
// do business logic there
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
使用RSocket協議的公平背壓
為了通過網路邊界實現邏輯元素背壓,我們需要一個適當的協議。幸運的是,有一種稱為RScoket協議。RSocket是一種應用程式級協議,允許通過網路邊界傳輸實際需求。該協議有一個RSocket-Java實現,允許設定RSocket伺服器。在伺服器到伺服器通訊的情況下,相同的RSocket-Java庫也提供客戶端實現。
對於瀏覽器 - 伺服器通訊,有一個RSocket-JS實現,能通過WebSocket連線瀏覽器和伺服器之間的流通訊。