asp.net core microservices 架構之Task 事務一致性 事件源 詳解
一 aspnetcore之task的任務狀態-CancellationToken
我有一篇文章講解了asp.net的執行緒方面的知識。我們知道.net的針對於多執行緒的一個亮點就是Task,net clr維護了一個執行緒池,自動的分派給task執行,執行完成,迅速返回執行緒池,並且維護異常和狀態,針對於基礎的thread和其他兩種非同步程式設計,Task非常的靈巧,但是針對和應用生命週期關聯的非同步任務,還是使用Workbackgroup比較合適,或者甚至是基礎的thread,因為Task比較高階的執行緒類,操作也比較簡化,人為控制比較弱。那這一節為什麼要說執行緒尼?大家有沒有遇到過,部署或者人為的去重啟,往往會造成一些不必要的業務中斷,web api有這樣的情況,後臺程式也有這樣的情況。異常和系統硬體的故障已經讓我們防不勝防了,那麼就儘量的人為的情況少那麼一點點,系統的健壯性也就高那麼一點點。
目前有兩個技巧可以處理這一類事情,第一是讓主機graceful方式關閉,並且超時時間設定長一點,這樣就有足夠的時間,讓執行的請求執行完畢,看程式碼:
public static async Task Main(string[] args) { var host = new HostBuilder() .Build(); await host.RunAsync(); }
這是官方上的一段話: IHostedService 是執行程式碼的入口點。 每個 IHostedService
實現都按照 ConfigureServices 中服務註冊 的順序執行。 主機啟動時,每個 IHostedService
上都會呼叫 StartAsync ,主機正常關閉時,以反向註冊順序呼叫 StopAsync 。
//關閉超時值 ShutdownTimeout 設定 StopAsync 的超時值。 預設值為 5 秒。 Program.Main 中的以下選項配置將預設值為 5 秒的關閉超時值增加至 20 秒: C# //複製 var host = new HostBuilder() .ConfigureServices((hostContext, services) => { services.Configure<HostOptions>(option => { option.ShutdownTimeout = System.TimeSpan.FromSeconds(20); }); }) .Build();
而我們看看原始碼中StopAsync方法:
/// <summary> /// Attempts to gracefully stop the host with the given timeout. /// </summary> /// <param name="host"></param> /// <param name="timeout">The timeout for stopping gracefully. Once expired the /// server may terminate any remaining active connections.</param> /// <returns></returns> public static Task StopAsync(this IHost host, TimeSpan timeout) { return host.StopAsync(new CancellationTokenSource(timeout).Token); }
系統接受到Ctrl+c和sign,就會呼叫這個方法,以比較禮貌的方式關閉。
那麼看原始碼,這兩個都是具有阻塞功能的非同步方法,對應的非非同步方法,都是同步呼叫的這兩個方法:
/// <summary> /// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered. /// </summary> /// <param name="host">The <see cref="IHost"/> to run.</param> /// <param name="token">The token to trigger shutdown.</param> public static async Task RunAsync(this IHost host, CancellationToken token = default) { using (host) { await host.StartAsync(token); await host.WaitForShutdownAsync(token); } } /// <summary> /// Returns a Task that completes when shutdown is triggered via the given token. /// </summary> /// <param name="host">The running <see cref="IHost"/>.</param> /// <param name="token">The token to trigger shutdown.</param> public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default) { var applicationLifetime = host.Services.GetService<IApplicationLifetime>(); //當前token執行取消的時候,激發這個委託。 token.Register(state => { ((IApplicationLifetime)state).StopApplication(); //當程序取消的時候,通知註冊IApplicationLifetime的程序也取消。 }, applicationLifetime); var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); //應用程式生命週期中的停止應用token激發的時候,執行這個委託,去釋放阻塞,執行host的停止方法。 applicationLifetime.ApplicationStopping.Register(obj => { var tcs = (TaskCompletionSource<object>)obj; tcs.TrySetResult(null); }, waitForStop); await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);執行完畢。 // Host will use its default ShutdownTimeout if none is specified. await host.StopAsync(); //呼叫關閉 }
具體原理就是Host使用這個applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource這個類,使用這個類是可以控制task的取消執行的。
所以,兩個解決方案,如果是webapi,就將將超時時間設定大一點。
第二,如果在非webapi中,使用了超長執行的Task,就使用CancellationTokenSource吧,將它的Token傳進去,在外邊判斷是否執行中,如果不在執行中,就執行 Cancel方法,當然在task內部,也可以
判斷token,是否自己主動取消掉。
這是官方的一個例子,瞭解CancellationTokenSource這個類,那麼就會明白如何去處理Task
using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class Example { public static void Main() { // Define the cancellation token. CancellationTokenSource source = new CancellationTokenSource(); CancellationToken token = source.Token; Random rnd = new Random(); Object lockObj = new Object(); List<Task<int[]>> tasks = new List<Task<int[]>>(); TaskFactory factory = new TaskFactory(token); for (int taskCtr = 0; taskCtr <= 10; taskCtr++) { int iteration = taskCtr + 1; tasks.Add(factory.StartNew( () => { int value; int[] values = new int[10]; for (int ctr = 1; ctr <= 10; ctr++) { lock (lockObj) { value = rnd.Next(0,101); } if (value == 0) { source.Cancel(); Console.WriteLine("Cancelling at task {0}", iteration); break; } values[ctr-1] = value; } return values; }, token)); } try { Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(), (results) => { Console.WriteLine("Calculating overall mean..."); long sum = 0; int n = 0; foreach (var t in results) { foreach (var r in t.Result) { sum += r; n++; } } return sum/(double) n; } , token); Console.WriteLine("The mean is {0}.", fTask.Result); } catch (AggregateException ae) { foreach (Exception e in ae.InnerExceptions) { if (e is TaskCanceledException) Console.WriteLine("Unable to compute mean: {0}", ((TaskCanceledException) e).Message); else Console.WriteLine("Exception: " + e.GetType().Name); } } finally { source.Dispose(); } } } // Repeated execution of the example produces output like the following: //Cancelling at task 5 //Unable to compute mean: A task was canceled. // //Cancelling at task 10 //Unable to compute mean: A task was canceled. // //Calculating overall mean... //The mean is 5.29545454545455. // //Cancelling at task 4 //Unable to compute mean: A task was canceled. // //Cancelling at task 5 //Unable to compute mean: A task was canceled. // //Cancelling at task 6 //Unable to compute mean: A task was canceled. // //Calculating overall mean... //The mean is 4.97363636363636. // //Cancelling at task 4 //Unable to compute mean: A task was canceled. // //Cancelling at task 5 //Unable to compute mean: A task was canceled. // //Cancelling at task 4 //Unable to compute mean: A task was canceled. // //Calculating overall mean... //The mean is 4.86545454545455.
二 業務的事務一致性
因為微服務的理念中是犧牲了系統業務的一致性,我們知道事務的一致性都是靠的資料庫的本地事務,或者分散式事務來實現的,但是微服務是嚴禁使用分散式事務。那麼如何保證整個系統的事務完整性尼?舉個例子:比如訂單服務中,新接受一個訂單,這個訂單需要同步到庫房的訂單子系統,那麼在訂單服務中的這個訂單在最後更新自己訂單狀態的時候,是需要同時傳送非同步訊息給庫房訊息伺服器的,如果這時候網路斷了,本地訂單更新成功了,但是非同步訊息沒有傳送過去,這樣就會引起業務的缺失,目前有兩個方法可以實現:
第一:為本地資料庫建立事件源表,記錄下訊息和本地資料更新的全部狀態,比如訂單在更新前就可以新增事件,事件狀態可以有,準備更新訂單,訂單已更新,傳送訊息佇列,訊息傳送成功等。
這樣的好處就是最後跟蹤這個事務處理的時候,每個步驟都可以找到,而且完全不用事務。最後job去跟蹤失敗情況,然後根據情況處理。
第二:只是用本地事務,就是在訂單更新的時候,同時給事件源表新增訊息內容,然後讓後臺job去傳送訊息,這樣是最簡單和最穩定的方式。
當然,最合適的還是第一種方法,雖然程式碼能複雜點,但是最後的效果是一樣的,而且效率是比第二種方法更高效,但是考慮打事件源表並不是併發頻繁操作的表,所以這個看自己的喜好了。
針對一個系統,業務的一致性,也並不是全部,針對於一些關鍵業務做好一致性,但是很多其實可以設計成為在使用者ui層面去補償操作,唯一的壞處就是一部分資料需要重新填寫。
三 事件源
這個事件源並不是為了解決業務的一致性,而是為了應對大資料量的請求,比如,客戶管理,一個分類下有上萬條記錄需要處理,那麼往往我們需要對效能和實時反饋上有個折衷。
系統設計如下:
這樣看來,會增加1個api服務和一個後臺服務,但是對於系統的問題,卻得到了一個緩衝,或許這個設計不是最好的,但是卻可以做一個拋磚引玉的案例,現實中案例非常多變,所以設計也會有很多方案。
因為目前我們看到的大部分app,請求的時候,某些功能確實會有少許等待事件,這個都是為了折衷,當然這一篇內容並不是討論雲或者分散式計算,但是在後臺這塊處理越快,反饋也越快。
這套方案的設計理念其實就是非同步處理,可以有自己的優化空間,而並不會消耗api這個輕量級服務,後臺分散式計算越快,app反應也越快,到一定程度,就並不會感覺到有延遲,這就是大師比喻的鼻子與眼睛的關係。