RxJava原始碼分析及版本對比(一)——1.x版本基本使用分析
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是使用Java VM實現的響應式程式設計庫:一個通過使用可觀察序列來編寫非同步和基於事件的程式的庫。
它擴充套件了觀察者模式以支援資料/事件序列,並添加了允許您以宣告方式組合序列的運算子,同時抽象出對低階執行緒、執行緒同步、執行緒安全和併發資料結構等問題的關注。
發展歷史
RxJava存在1.x版本和2.x版本,API改動較大,接入方法基本不相容,但實現思路類似,做了一定的優化,下面是官方對RxJava2的介紹。
Version 2.x (Javadoc)
- single dependency:Reactive-Streams
-
continued support for Java 6+ &Android 2.3+
-
performance gains through design changes learned through the 1.x cycle and throughReactive-Streams-Commons research project.
-
Java 8 lambda-friendly API
-
non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)
-
async or synchronous execution
-
virtual time and schedulers for parameterized concurrency
-
單一依賴:Reactive-Streams
-
繼續支援Java 6+和android 2.3+
-
通過1.x和Reactive-Streams-Commons專案的積累,實現設計變更,提高效能。
-
友好地支援Java 8 lambda表示式
-
靈活的處理併發,包括threads, pools, event loops, fibers, actors等
-
同步或非同步操作
-
為引數化的併發設計了排程器
原始碼地址
既然都點到這篇文章裡了,難道還不下載原始碼看看嗎??
這裡1.x版本和2.x版本在不同的分支上
1.x基本使用
public static void main(String[] args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }).subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } }); } 複製程式碼
拆分一下,分為兩部分
- 構建一個Observable物件
- 呼叫Observable物件的subscribe方法
// 通過Observable的create靜態方法,傳入一個OnSubscribe物件 // 這個OnSubscribe物件附帶了一個call方法,用於回撥 // 整個create方法返回了一個Observable物件的例項 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("test"); subscriber.onCompleted(); } } }); 複製程式碼
// 呼叫observable的subscribe方法,傳入一個Observer物件, // 這個Observer物件附帶了三個回撥方法 // 通過這裡subscribe方法呼叫上面Observable.OnSubscribe物件中的call方法 // 再在call方法中的subscriber物件呼叫這裡Observer中的onNext,onCompleted方法 observable.subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("onNext:" + s); } }); 複製程式碼
上面的程式打印出的效果是:
onNext:test
onCompleted
1.x原始碼解析
從Observable.create方法開始
@Deprecated public static <T> Observable<T> create(OnSubscribe<T> f) { // 接收一個OnSubscribe引數,呼叫建構函式返回一個Observable // 1)、先看Observable的建構函式做了什麼 // 2)、再看OnSubscribe類是怎樣的 return new Observable<T>(RxJavaHooks.onCreate(f)); } 複製程式碼
Observable的建構函式
// Obserbable類中存在一個final變數是OnSubscribe型別的 final OnSubscribe<T> onSubscribe; // 構造方法將傳入的OnSubscribe物件賦值給區域性變數onSubscribe儲存起來 protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; } 複製程式碼
再來看OnSubscribe類,其實就是一個包含有一個call方法的類,在基本使用的第2步,呼叫observable物件subscribe方法時觸發。
/** * 這裡的OnSubscribe是一個介面,繼承子Action1介面 * * 在Observable.subscribe方法被呼叫的時候執行 * 這裡的subscribe是第2部分,如果印象模糊可以回頭看一下上面的基本使用部分 * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity } /** * 一個call方法,observable物件subscribe方法時觸發 * 用上面的基本使用的例子,這裡的call攜帶的範型T就是Subscriber的例項subscriber * 上面call方法就是呼叫subscribe中的onNext方法和onCompleted * 可以看出這裡的onNext和onCompleted就是基本使用例子第二步的subscribe方法傳入Observer物件的方法 * * 繼續跟Action和 Action繼承的Function都沒有實現 * @param <T> the first argument type */ public interface Action1<T> extends Action { void call(T t); } 複製程式碼
可以看出例子第1步例項Observable.OnSubscribe物件的時候實現了一個call方法,call方法傳入的引數就是Subscriber例項,方法內呼叫的onNext方法和onCompleted方法執行了第2步中Observer例項的對應方法,所以這裡可以猜測Observer是一個介面,Subscriber實現了Observer。我們來看一下這兩個類。
/** * 很熟悉的三連,自己實現,完成業務 */ public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t); } /** * 和我們的猜測一致,Subscriber實現了Observer */ public abstract class Subscriber<T> implements Observer<T>, Subscription { ... } 複製程式碼
到這裡應該對整體有一個理解了,我們再來看一下observable.subscribe方法,結束後在進行總結。
public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } // 這裡我門傳入的是new Observer,直接進入下面部分 return subscribe(new ObserverSubscriber<T>(observer)); } ... public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... // 方法比較長,這裡只分析重要部分 // 這裡的RxJavaHooks.onObservableStart方法返回了observable.onSubscribe // 再呼叫observable類中的區域性變數onSubscribe的call方法,具體的實現就是我們再例子中實現的call方法,呼叫了subscriber.onNext和subscriber.onCompleted RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } // RxJavaHooks.onObservableStart分析 public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart; if (f != null) { // 會進入該方法,最後返回的就是Observable建構函式中儲存的區域性變數onSubscribe,不要將這個call和我門自己實現的call方法弄混了 return f.call(instance, onSubscribe); } return onSubscribe; } 複製程式碼
其實分析到這裡就結束了,將整個呼叫鏈連線了起來
我們再來看一下關於RxJavaHooks.onObservableStart分析,
分析這裡的返回值就是Observable建構函式中儲存的區域性變數onSubscribe。
對RxJavaHooks.onObservableStart方法進行斷點除錯,進入f!=null 判斷。
// Func2是一個介面,我們繼續使用斷點除錯檢視具體的實現 public interface Func2<T1, T2, R> extends Function { R call(T1 t1, T2 t2); } 複製程式碼
// 通過斷點除錯找到call方法的實現 onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() { @Override public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2); } }; 複製程式碼
// 返回了第二個引數 @Deprecated public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) { // pass through by default return onSubscribe; } 複製程式碼
最後返回了第二個引數,就是observable.onSubscribe。
1.x總結
第一步:在呼叫create方法構造了一個Observable物件,並且在Observable物件的構造方法中,將區域性變數onSubscribe賦值,該onSubscribe實現了call方法等待被回撥,call方法中提供了一個subscriber例項,該例項實現了Observer,有onCompleted,onError,onNext方法可以進行呼叫。
第二步:執行Observable例項的subscribe方法,傳入了Observer物件,實現了onCompleted,onError,onNext方法。再通過RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
方法回撥到了第一步中OnSubscribe的call方法,這裡的subscriber方法就是第二步中例項的Observer。
整體就是在第一步中寫了一個回撥,等待第二步subscribe方法調起。第二步中的Observer實現了一個回撥的三個方法,供第一步中的回撥函式內呼叫。