C# 8中的Async Streams
關鍵要點
- 非同步程式設計技術提供了一種提高程式響應能力的方法。
- Async/Await模式在C# 5中首次亮相,但只能返回單個標量值。
- C# 8添加了非同步流(Async Streams),允許非同步方法返回多個值,從而擴充套件了其可用性。
- 非同步流提供了一種用於表示非同步資料來源的絕佳方法。
- 非同步流是Java和JavaScript中使用的反應式程式設計模型的替代方案。
C# 5引入了Async/Await,用以提高使用者介面響應能力和對Web資源的訪問能力。換句話說,非同步方法用於執行不阻塞執行緒並返回一個標量結果的非同步操作。
微軟多次嘗試簡化非同步操作,因為Async/Await模式易於理解,所以在開發人員當中獲得了良好的認可。
現有非同步方法的一個重要不足是它必須提供一個標量返回結果(一個值)。比如這個方法async Task<int> DoAnythingAsync(),DoAnythingAsync的結果是一個整數(一個值)。
由於存在這個限制,你不能將這個功能與yield關鍵字一起使用,並且也不能將其與async IEnumerable<int>(返回非同步列舉)一起使用。
如果可以將Async/Await特性與yield操作符一起使用,我們就可以使用非常強大的程式設計模型(如非同步資料拉取或基於拉取的列舉,在F#中被稱為非同步序列)。
C# 8中新提出的Async Streams去掉了標量結果的限制,並允許非同步方法返回多個結果。
這個變更將使非同步模式變得更加靈活,這樣就可以按照延遲非同步序列的方式從資料庫中獲取資料,或者按照非同步序列的方式下載資料(這些資料在可用時以塊的形式返回)。
例如:
foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); }
Reactive Extensions(Rx)是解決非同步程式設計問題的另一種方法。Rx越來越受到開發人員的歡迎。很多其他程式語言(如Java和JavaScript)已經實現了這種技術(RxJava、RxJS)。Rx基於推送式程式設計模型(Push Programming Model),也稱為反應式程式設計。反應式程式設計是事件驅動程式設計的一種型別,它處理的是資料而不是通知。
通常,在推送式程式設計模型中,你不需要控制Publisher。資料被非同步推送到佇列中,消費者在資料到達時消費資料。與Rx不同,Async Streams可以按需被呼叫,並生成多個值,直到達到列舉的末尾。
在本文中,我將對拉取模型和推送模型進行比較,並演示每一種技術各自的適用場景。我將使用很多程式碼示例向你展示整個概念和它們的優點,最後,我將討論Async Streams功能,並向你展示示例程式碼。
拉取式程式設計模型與推送式程式設計模型
圖-1-拉取式程式設計模型與推送式程式設計模型
我使用的例子是著名的生產者和消費者問題,但在我們的場景中,生產者不是生成食物,而是生成資料,消費者消費的是生成的資料,如圖-1所示。拉取模型很容易理解。消費者詢問並拉取生產者的資料。另一種方法是使用推送模型。生產者將資料釋出到佇列中,消費者通過訂閱佇列來接收所需的資料。
拉取模型更合適“快生產者和慢消費者”的場景,因為消費者可以從生產者那裡拉取其所需的資料,避免消費者出現溢位。推送模型更適合“慢生產者和快消費者”的場景,因為生產者可以將資料推送給消費者,避免消費者不必要的等待時間。
Rx和ofollow,noindex" target="_blank">Akka Streams (流式程式設計模型)使用了回壓技術(一種流量控制機制)。它使用拉取模型或推送模型來解決上面提到的生產者和消費者問題。
在下面的示例中,我使用了一個慢消費者從快生產者那裡非同步拉取資料序列。消費者在處理完一個元素後,會向生產者請求下一個元素,依此類推,直到到達序列的末尾。
動機和背景
要了解我們為什麼需要Async Streams,讓我們來看下面的程式碼。
// 對引數(count)進行迴圈相加操作 static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }
方法呼叫:
const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
我們可以通過使用yield運算子讓這個方法變成惰性的,如下所示。
static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } }
呼叫方法:
const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
正如你在輸出視窗中看到的那樣,結果被分成幾個部分返回,而不是作為一個值返回。以上顯示的累積結果被稱為惰性列舉。但是,仍然存在一個問題,即sum方法阻塞了程式碼的執行。如果你檢視執行緒,可以看到所有東西都在主執行緒中執行。
現在,讓我們將async應用於第一個方法SumFromOneToCount上(沒有yield關鍵字)。
static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; }
呼叫方法:
const int count = 5; ConsoleExt.WriteLine("async example starting."); // 相加操作是非同步進行得!這樣還不夠,我們要求不僅是非同步的,還必須是惰性的。 var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
我們可以看到計算過程是在另一個執行緒中執行,但結果仍然是作為一個值返回!
想象一下,我們可以按照命令式風格將惰性列舉(yield return)與非同步方法結合起來。這種組合稱為Async Streams。這是C# 8中新提出的功能。這個新功能為我們提供了一種很好的技術來解決拉取式程式設計模型問題,例如從網站下載資料或從檔案或資料庫中讀取記錄。
讓我們嘗試使用當前的C# 版本。我將async關鍵字新增到SumFromOneToCountYield方法中,如下所示。
圖-2 組合使用async關鍵字和yield發生錯誤
我們試著將async新增到SumFromOneToCountYield,但直接出現錯誤,如上所示!
讓我們試試別的吧。我們可以將IEnumerable放入任務中並刪除yield關鍵字,如下所示:
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; }
呼叫方法:
const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { // 這不是我們想要的,結果將作為塊返回!!!! ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
可以看到,我們非同步計算所有的內容,但仍然存在一個問題。結果(所有結果都在集合中累積)作為一個塊返回,但這不是我們想要的惰性行為,我們的目標是將惰性行為與非同步計算風格相結合。
為了實現所需的行為,你需要使用外部庫,如Ix(Rx的一部分),或者你必須使用新提出的C#特性Async Streams。
回到我們的程式碼示例。我使用了一個外部庫 來顯示非同步行為。
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); // 模擬延遲! Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; // 模擬延遲! Task.Delay(TimeSpan.FromSeconds(0.5)).Wait(); yield return sum; } }
呼叫方法:
const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); // 啟動一個新任務,用於生成非同步資料序列! IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); // 啟動另一個新任務,用於消費非同步資料序列! var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); // 出於演示目的,等待任務完成! consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
輸出:
最後,我們實現了我們想要的行為!我們可以在列舉上進行非同步迭代。
原始碼在這裡 。
客戶端/伺服器端的非同步拉取
我將使用一個更現實的例子來解釋這個概念。客戶端/伺服器端架構是演示這一功能優勢的絕佳方法。
客戶端/伺服器端同步呼叫
客戶端向伺服器端傳送請求,客戶端必須等待(客戶端被阻塞),直到伺服器端做出響應,如圖-3所示。
圖-3 同步資料拉取,客戶端等待請求完成
非同步資料拉取
客戶端發出資料請求然後繼續執行其他操作。一旦有資料到達,客戶端就繼續處理達到的資料。
圖-4 非同步資料拉取,客戶端可以在請求資料時執行其他操作
非同步序列資料拉取
客戶端發出資料塊請求,然後繼續執行其他操作。一旦資料塊到達,客戶端就處理接收到的資料塊並詢問下一個資料塊,依此類推,直到達到最後一個數據塊為止。這正是Async Streams想法的來源。圖-5顯示了客戶端可以在收到任何資料時執行其他操作或處理資料塊。
圖-5 非同步序列資料拉取(Async Streams),客戶端未被阻塞!
Async Streams
與IEnumerable<T>和IEnumerator<T>類似,Async Streams提供了兩個新介面IAsyncEnumerable<T>和IAsyncEnumerator<T>,定義如下:
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } // Async Streams Feature可以被非同步銷燬 public interface IAsyncDisposable { Task DiskposeAsync(); }
Jonathan Allen已經在InfoQ網站上介紹過這個主題,我不想在這裡再重複一遍,所以我建議你也閱讀一下他的文章。
關鍵在於Task<bool> MoveNextAsync()的返回值(從bool改為Task<bool>,bool IEnumerator.MoveNext())。這樣可以讓整個計算和迭代都保持非同步。大多數情況下,這仍然是拉取模型,即使它是非同步的。IAsyncDisposable介面可用於進行非同步清理。有關非同步的更多資訊,請點選此處 。
語法
最終語法應如下所示:
foreach await (var dataChunk in asyncStreams) { // 處理資料塊或做一些其他的事情! }
如上所示,我們現在可以按順序計算多個值,而不只是計算單個值,同時還能夠等待其他非同步操作結束。
重寫微軟的示例
我重寫了微軟的演示程式碼,你可以從我的GitHub下載相關程式碼 。
這個例子背後的想法是建立一個大的MemoryStream(20000位元組的陣列),並按順序非同步迭代集合中的元素或MemoryStream。每次迭代從陣列中拉取8K位元組。
在(1)處,我們建立了一個大位元組陣列並填充了一些虛擬值。在(2)處,我們定義了一個叫作checksum的變數。我們將使用checksum來確保計算的總和是正確的。陣列和checksum位於記憶體中,並通過一個元組返回,如(3)所示。
在(4)處,AsEnumarble(或者叫AsAsyncEnumarble)是一種擴充套件方法,用於模擬由8KB塊組成的非同步流( (6)處所示的BufferSize = 8000)。
通常,你不必繼承IAsyncEnumerable,但在上面的示例中,微軟這樣做是為了簡化演示,如(5)處所示。
(7)處是“foreach”,它從非同步記憶體流中拉取8KB的塊資料。當消費者(foreach程式碼塊)準備好接收更多資料時,拉取過程是順序進行的,然後它從生產者(記憶體流陣列)中拉取更多的資料。最後,當迭代完成後,應用程式將’c’的校驗和與checksum進行比較,如果它們匹配,就打印出“Checksums match!”,如(8)所示!
微軟演示的輸出視窗:
概要
我們已經討論過Async Streams,它是一種出色的非同步拉取技術,可用於進行生成多個值的非同步計算。
Async Streams背後的程式設計概念是非同步拉取模型。我們請求獲取序列的下一個元素,並最終得到答覆。這與IObservable<T>的推送模型不同,後者生成與消費者狀態無關的值。Async Streams提供了一種表示非同步資料來源的絕佳方法,例如,當消費者尚未準備好處理更多資料時。示例包含了Web應用程式或從資料庫中讀取記錄。
我已經演示瞭如何生成非同步列舉資料,並使用外部非同步序列庫來消費列舉資料。我也演示瞭如何將這個功能用於從Web站點下載內容。最後,我們看到了新的Async Streams語法和一個完整的示例,該示例是基於微軟的Build Demo Code(2018年5月7日至9日,西雅圖,華盛頓州 )。
關於作者
Bassam Alugili 是STRATEC AG的高階軟體專家和資料庫專家。STRATEC是全球領先的全自動分析儀系統、實驗室資料管理軟體和智慧耗材的合作伙伴。
檢視英文原文:Async Streams in C# 8