Android進階:四、RxJava2 原始碼解析 1
本文適合使用過Rxjava2或者瞭解Rxjava2的基本用法的同學閱讀
一.Rxjava是什麼
Rxjava在GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。
通俗來說,Rxjava是一個採用了觀察者模式設計處理非同步的框架。鏈式呼叫設計讓程式碼優雅易讀。
舉個例子:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("a"); } }); observable.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
這是Rxjava2最簡單的用法:
1.建立一個Observable,重寫subscribe方法,這裡主要處理被觀察的事件。
2.訂閱這個Observable,事件會回撥observer的方法,我們可以對事件做響應的處理
二.Rxjava原始碼解析
2.1. 建立Observable:
建立Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。這個方法的引數是ObservableOnSubscribe:
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(@NonNull ObservableEmitter<T> e) throws Exception; }
ObservableOnSubscribe是一個介面,唯一的方法是subscribe,引數是ObservableEmitter<T> e。ObservableEmitter是一個繼承了Emitter的介面,介面Emitter裡定義了onNext、onError、onComplete等方法,和Observer(觀察者)的方法相對應。
public interface Emitter<T> { /** * Signal a normal value. * @param value the value to signal, not null */ void onNext(@NonNull T value); /** * Signal a Throwable exception. * @param error the Throwable to signal, not null */ void onError(@NonNull Throwable error); /** * Signal a completion. */ void onComplete(); }
ObservableEmitter對介面Emitter進行擴充套件,增加了setDisposable、setCancellable等方法
基本引數瞭解了,現在看看create方法裡面做了什麼,程式碼如下:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
呼叫了RxJavaPlugins的onAssembly方法。又有一個新引數ObservableCreate<T>(source),我們看看它是什麼:
final class ObservableCreate<T> extends Observable<T> { public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } }
繼承了Observable,所以也是個被觀察物件,在建構函式中我們看到我們new的ObservableOnSubscribe物件,被存在了ObservableCreate的source裡面
那我們繼續看看onAssembly方法做什麼:
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
一個Hook方法。onObservableAssembly是一個靜態變數,我們沒有設定,預設為空,所以直接返回source物件。也就是說,Observable的create方法其實就是把我們ObservableOnSubscribe物件,儲存在ObservableCreate物件的source裡面,然後返回ObservableCreate物件。
我們知道ObservableCreate是繼承Observable的,所以建立了ObservableCreate物件,我們的Observable也就建立完了。
2.2 訂閱事件(被觀察者)
訂閱被觀察者的操作是observable.subscribe(new Observer<String>())。這個操作符其實是個“被動”,就是事件被觀察者觀察。因為subscribe方法裡的引數Observer才是觀察者。我們也會在Observer裡的各個會調方法裡接收到事件相關的返回值。
我們看看subscribe方法的原始碼:
public final void subscribe(Observer<? super T> observer) { try { subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { RxJavaPlugins.onError(e); } }
看程式碼我們知道最主要呼叫的方法是:subscribeActual(observer);,這個方法是Observable裡的抽象方法,而此時我們的Observable是一個ObservableCreate物件(前面create方法返回的物件)。所以我們去看一下ObservableCreate裡面是如何重寫這個方法的。程式碼如下:
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
我們一看到這個方法主要做了三件事:
①建立一個CreateEmitter物件parent。
②把parent傳給source的subscribe方法。上面我們知道source就是剛才存的ObservableOnSubscribe物件,subscribe也就是我們重寫的方法:
@Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("a"); }
所以我們在這個方法裡就能收到一個CreateEmmiter,通過CreateEmitter可以回撥相應的方法。CreateEmitter是實現ObservableEmitter介面,我們看看它內部實現,如:onNext原始碼如下:
@Override public void onNext(T t) { observer.onNext(t); }
也就是說,當我們在ObservableOnSubscribe的subscribe方法裡呼叫ObservableEmitter的onNext方法的時候,它裡面會呼叫observer的onNext。於是通過這樣的傳遞,我們就能在observer裡響應的回撥方法裡收到事件的相關狀態。
至此一個簡單Rxjava流式傳遞原理已經講完了,總結流程如下:
- 使用Observbable.create方法,產生一個ObservableCreate物件,物件裡存著ObservableOnSubscribe物件source。
- 呼叫ObservableCreate.subscribe方法,實際呼叫的是subscribeActual方法,傳入一個Observer物件。
- subscribeActual方法中建立一個CreateEmmiter物件,呼叫source.subscribe方法,傳入CreateEmmiter物件。
- 於是我們在ObservableOnSubscribe中就接收到了一個CreateEmmiter,CreateEmmiter是ObservableEmmiter的子類。我們可以在這裡呼叫CreateEmmiter的方法進行事件回撥。
- 呼叫CreateEmmiter方法,實際上會呼叫Observer的響應的方法。也就是CreateEmmiter把事件狀態傳遞給觀察者。