RxJava原始碼分析(一):基本語法
最近看了下網上的RxJava原始碼分析,發現所基於的原始碼版本和最新的略有不同,於是自己動手翻閱了一下最新的原始碼版本(rxjava:2.2.8,rxandroid:2.1.1),並寫分析部落格作分享。
//示例程式碼 private static void rxJavaTest() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) { //1 emitter.onNext("onNext"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { //2 Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { //3 Log.d(TAG, s); } @Override public void onError(Throwable e) { //4 Log.d(TAG, "onError"); } @Override public void onComplete() { //5 Log.d(TAG, "onComplete"); } }); }
上面RxJava最簡單的使用,主要涉及被觀察者Observable、觀察者Observer和事件訂閱subscribe()三個角色。
首先分析Observable的建立過程,即Observable的create()方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); //這裡傳入的source物件是我們傳入的匿名內部類 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
這裡先看一下我們傳入的匿名內部類型別ObservableOnSubscribe原始碼:
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; }
可見ObservableOnSubscribe是一個只含有一個抽象方法subscribe()的介面。
接著呼叫RxJavaPlugins的onAssembly()方法並傳入一個新建的ObservableCreate物件,而ObservableCreate的建構函式如下:
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
內部操作很簡單,只是把ObservableCreate的成員變數source賦值為傳入的ObservableOnSubscribe物件,即最開始我們建立的匿名內部類。
RxJavaPlugins的onAssembly()方法呼叫如下:
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
首先會建立一個Function f,賦值為onObservableAssembly,而onObservableAssembly預設為null(當使用轉換操作符時會進行賦值,在後面的文章中會進一步分析),所以會直接返回source。至此Observable建立完畢。
然後看Observer的內部實現,是一個包含4個抽象方法的介面:
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
最後來看重點subscribe()方法:
@Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer);//1 ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer..."); subscribeActual(observer);//2 } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
在1處呼叫RxJavaPlugins.onSubscribe(),將我們傳入的observer進行包裝:
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) { BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe; if (f != null) { return apply(f, source, observer); } return observer; }
和上文的onAssembly()方法一樣,這裡的onObservableSubscribe也預設為null,所以返回值還是我們傳入的observer本身。
重中之重在於2處的subscribeActual(observer):
//subscribeActual()是Observable中的抽象方法,本文示例的具體實現是在ObservableCreate類中 protected void subscribeActual(Observer<? super T> observer) { //1、建立事件發射器 CreateEmitter<T> parent = new CreateEmitter<T>(observer); //2、呼叫observer的onSubscribe()方法 observer.onSubscribe(parent); try { //3、呼叫source(即包裝過的observer)的onSubscribe()方法,傳入發射器 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
第1步,建立CreateEmitter,CreateEmitter類的主要程式碼如下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { ... final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { //將observer賦值給成員變數 this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } ... @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } ... //取消傳送事件 @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } ... }
第2步,呼叫observer的onSubscribe()方法,這樣會走到示例程式碼的2處打印出"onSubscribe"。
第3步,呼叫source.subscribe(parent),source實際上就是示例程式碼中的傳入的匿名內部類:
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) { //1 emitter.onNext("onNext"); emitter.onComplete(); } }
所以會走到示例程式碼的1處分別執行emitter的onNext()和onComplete()方法,而從CreateEmitter的內部實現可見emitter的onNext()和onComplete()方法的具體操作實際上就是呼叫觀察者observer的onNext()和onComplete()方法,observer即示例程式碼中的
new Observer<String>() { @Override public void onSubscribe(Disposable d) { //2 Log.d(TAG, "onSubscribe"); } @Override public void onNext(String s) { //3 Log.d(TAG, s); } @Override public void onError(Throwable e) { //4 Log.d(TAG, "onError"); } @Override public void onComplete() { //5 Log.d(TAG, "onComplete"); } }
於是就走到了示例程式碼3、4處,並在呼叫onNext()時傳入1處emitter.onNext()方法傳入的引數。
以上只是RxJava最基礎的用法的分析,主要是對觀察者模式的不同角色進行封裝,達到鏈式呼叫形式的目的,並且設計了發射器Emitter的概念,形成流式事件訂閱的模式。