Spring Cloud 原始碼學習之 Hystrix 工作原理
參考資訊與文中連結請點選 閱讀原文,感興趣可移步PC端閱讀,移動端閱讀原始碼很不友好(截圖字太小,貼程式碼排版太亂)。
本文學習了 Hystrix 工作原理及原始碼,關注點在整體處理流程,不涉及具體的實現細節。後續將逐漸寫Metrics收集、斷路器、隔離、請求快取等,有興趣可以關注奧。
下面 流程圖 來源於 Hystrix Wiki,展現了 Hystrix 工作原理,官方 Wiki 中對每一步都做了詳細的描述,可以直接參考。
文中原始碼基於 Spring Cloud Finchley.SR1 、Spring Boot 2.0.6.RELEASE .
工作原理簡述
當需要完成某項任務時,通過 Hystrix 將任務包裹起來,交由 Hystrix 來完成任務,從而享受 Hystrix 帶來保護。這和古代鏢局生意有點類似,將任務委託給鏢局,以期安全完成任務。
上圖展示了 Hystrix 完成任務的處理流程,下面對1到9步驟進行簡述:
1.構建命令
Hystrix 提供了兩個Command, HystrixCommand
和 HystrixObservableCommand
,可以使用這兩個物件來包裹待執行的任務。
例如使用 @HystrixCommand 註解標記方法,Hystrix 將利用AOP自動將目標方法包裝成HystrixCommand來執行。
@HystrixCommand public String hello() { ... }
也可以繼承HystrixCommand或HystrixObservableCommand來建立Command,例如:
public class MyCommand extends HystrixCommand { public MyCommand(HystrixCommandGroupKey group) { super(group); } @Override protected Object run() throws Exception { // 需要做的事情及需要返回的結果 return null; } }
任務委託給 Hystrix 後,Hystrix 可以應用自己的一系列保護機制,在執行使用者任務的各節點(執行前、執行後、異常、超時等)做一系列的事情。
2.執行命令
有四種方式執行command。
-
R execute():同步執行,從依賴服務得到單一結果物件
-
Future
queue() :非同步執行,返回一個 Future 以便獲取執行結果,也是單一結果物件 -
Observable
observe() :hot observable,建立Observable後會訂閱Observable,可以返回多個結果 -
Observable
toObservable() :cold observable,返回一個Observable,只有訂閱時才會執行,可以返回多個結果
execute()的實現為 queue().get() ; queue() 的實現為 toObservable().toBlocking().toFuture() 。
最後Obserable都由toObservable()來建立,本文的主要內容就是toObservable()。
// 利用queue()拿到Future, 執行 get()同步等待拿到執行結果 public R execute() { ... return queue().get(); } // 利用toObservable()得到Observable最後轉成Future public Future<R> queue() { final Future<R> delegate = toObservable().toBlocking().toFuture(); ... } // 利用toObservable()得到Observable並直接訂閱它,立即執行命令 public Observable<R> observe() { ReplaySubject<R> subject = ReplaySubject.create(); final Subscription sourceSubscription = toObservable().subscribe(subject); ... }
3.檢查快取
第3到9步驟構成了 Hystrix 的保護能力,通過這一些列步驟來執行任務,從而起到保護作用。
如果啟用了 Hystrix Cache,任務執行前將先判斷是否有相同命令執行的快取。如果有則直接返回快取的結果;如果沒有快取的結果,但啟動了快取,將快取本次執行結果以供後續使用。
4.檢查斷路器是否開啟
斷路器(circuit-breaker)和保險絲類似,保險絲在發生危險時將會燒斷以保護電路,而斷路器可以在達到我們設定的閥值時觸發短路(比如請求失敗率達到50%),拒絕執行任何請求。
如果斷路器被開啟,Hystrix 將不會執行命令,直接進入Fallback處理邏輯。
5.檢查執行緒池/訊號量情況
Hystrix 隔離方式有執行緒池隔離和訊號量隔離。當使用Hystrix執行緒池時,Hystrix 預設為每個依賴服務分配10個執行緒,當10個執行緒都繁忙時,將拒絕執行命令。訊號量同理。
6.執行具體的任務
通過 HystrixObservableCommand.construct()
或者 HystrixCommand.run()
來執行使用者真正的任務。
7.計算鏈路健康情況
每次開始執行command、結束執行command以及發生異常等情況時,都會記錄執行情況,例如:成功、失敗、拒絕以及超時等情況,會定期處理這些資料,再根據設定的條件來判斷是否開啟斷路器。
8.命令失敗時執行 Fallback 邏輯
在命令失敗時執行使用者指定的 Fallback 邏輯。上圖中的斷路、執行緒池拒絕、訊號量拒絕、執行執行、執行超時都會進入 Fallback 處理。
9.返回執行結果
原始結果將以Observable形式返回,在返回給使用者之前,會根據呼叫方式的不同做一些處理。
下面是 Hystrix Return flow。
原始碼學習
小故事
由於最終入口都是 toObservable() ,就從 AbstractCommand的 Observable<R> toObservable()
方法開始。
Hystrix 使用觀察者模式, Observable 即被觀察者,被觀察者狀態變更時,觀察者可以做出各項響應。舉個例子:大廳中一位演講者正在分享,廳中有觀眾和工作人員,可能發生如下事情:
被觀察者 事件 觀察者 ----------------------------------- 演講者 分享到精彩處 -> 觀眾鼓掌 演講者 講的口乾舌燥 -> 工作人員遞上一瓶水 演講者 放出自己的二維碼 -> 觀眾掃描
因為 Hystrix 基於RxJava,RxJava 初次看會比較複雜。為了便於下文理解,可以將Observable理解為資料來源、資料發射器,上面例子中,演講者各種行為都可以抽象為資料來源在發射資料,而各種接收者可以做出各種響應。
toObservable()
toObservable()主要原始碼如下:
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // 命令執行結束後的清理者 final Action0 terminateCommandCleanup = new Action0() {...}; // 取消訂閱時處理者 final Action0 unsubscribeCommandCleanup = new Action0() {...}; // 重點:Hystrix 核心邏輯: 斷路器、隔離 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {...}; // 發射資料(OnNext表示發射資料)時的Hook final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...}; // 命令執行完成的Hook final Action0 fireOnCompletedHook = new Action0() {...}; // 通過Observable.defer()建立一個Observable return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); // 首先嚐試從請求快取中獲取結果 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 使用上面的Func0:applyHystrixSemantics 來建立Observable Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 如果啟用請求快取,將Observable包裝成HystrixCachedObservable並進行相關處理 if (requestCacheEnabled && cacheKey != null) { HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); ... } else { afterCache = hystrixObservable; } // 返回Observable return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } }); }
上面的程式碼可以換種思維方式來理解。平時開發時都是下面這種模式,按順序不斷的做事情,是一個很好的執行者。
public void methodA{ try { // 1. 做第一件事情 // 2. 呼叫methodB()做第二件事情 // 3. 做第三件事情 ... } catch (Exception e) { // 處理錯誤 } finally { // 最後一定要做的事情 } }
用一張圖來看 toObservable() 方法。這種方式是“軍師型”,排兵佈陣,先創造了各個處理者,然後創造被觀察者,再設定Observable發生各種情況時由誰來處理,完全掌控全域性。
解釋下Action0、Func1這種物件。Action、Func和Runnable、Callable類似,是一個可以被執行的實體。Action沒有返回值,Action0…ActionN表示有0..N個引數,Action0就表示沒有引數;Func有返值,0..N一樣表示引數。
public interface Action0 extends Action { void call(); } public interface Func1<T, R> extends Function { R call(T t); }
下面用核心的 applyHystrixSemantics 來闡述一下。
// applyHystrixSemantics 是一個Func0(理解為執行實體或處理者),表示沒有引數,返回值是Observable。 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { // Func0 做的事情如下 @Override public Observable<R> call() { // 如果未訂閱,返回一個"啞炮" Observable, 即一個不會發射任何資料的Observable if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } // 呼叫applyHystrixSemantics()來建立Observable return applyHystrixSemantics(_cmd); } };
因此,當執行Func0: applyHystrixSemantics時,可以得到一個Observable。 toObservable() 大量程式碼在準備處理者(觀察者),實際使用時是方法最後的 Observable.defer(new Func0<observable
Observable.defer
defer譯為延遲,表示演講者會等有觀眾來時才開始分享。Observable.defer() 就是說:必須有觀察者訂閱時, Observable 才開始發射資料。而defer()的引數是個Func0,是一個會返回Observable的執行實體。下面看看defer():
return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 再一次使用Observable.defer()技能,這次用的是applyHystrixSemantics這個Func0 Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); ... // 此處忽略了請求快取處理,上面已有提及 Observable<R> afterCache; ... // 為Observable繫結幾個特定事件的處理者,這都是上門建立的Action0 return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } });
applyHystrixSemantics()
接著看applyHystrixSemantics這個Func0,Func0的call()中呼叫的是applyHystrixSemantics()函式。
// Semantics 譯為語義, 應用Hystrix語義很拗口,其實就是應用Hystrix的斷路器、隔離特性 private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 原始碼中有很多executionHook、eventNotifier的操作,這是Hystrix拓展性的一種體現。這裡面啥事也沒做,留了個口子,開發人員可以拓展 executionHook.onStart(_cmd); // 判斷斷路器是否開啟 if (circuitBreaker.attemptExecution()) { // 獲取執行訊號 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() {...}; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {...}; // 判斷是否訊號量拒絕 if (executionSemaphore.tryAcquire()) { try { // 重點:處理隔離策略和Fallback策略 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } // 開啟了斷路器,執行Fallback else { return handleShortCircuitViaFallback(); } }
executeCommandAndObserve()
下面看 executeCommandAndObserve() 方法,處理隔離策略和各種Fallback.
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); final Action1<R> markEmits = new Action1<R>() {...}; final Action0 markOnCompleted = new Action0() {...}; // 利用Func1獲取處理Fallback的 Observable final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); // 拒絕處理 if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); // 超時處理 } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { ... return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext ... Observable<R> execution; // 利用特定的隔離策略來處理 if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) // 繫結Fallback的處理者 .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
executeCommandWithSpecifiedIsolation()
接著看隔離特性的處理:executeCommandWithSpecifiedIsolation()
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { // 執行緒池隔離 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // 再次使用 Observable.defer(), 通過執行Func0來得到Observable return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 收集metric資訊 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); ... try { ... // 獲取真正的使用者Task return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } ... } // 繫結各種處理者 }).doOnTerminate(new Action0() {...}) .doOnUnsubscribe(new Action0() {...}) // 繫結超時處理者 .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } // 訊號量隔離,和執行緒池大同小異,全部省略了 else { return Observable.defer(new Func0<Observable<R>>() {...} } }
getUserExecutionObservable()就不接著寫了,可以自己看下,就是拿到使用者真正要執行的任務。這個任務就是這樣被Hystrix包裹著,置於層層防護之下。
倒過來看
上面方法層層呼叫,倒過來看,就是先建立一個Observable,然後繫結各種事件對應的處理者,如下圖:
各類doOnXXXX,表示發生XXX事件時做什麼事情。