RxJava2 原始碼分析二
文章目錄
- 前言
- RxJava2 執行緒排程
- RxJava2 怎麼進行執行緒排程
- 總結
前言
經過RxJava2原始碼分析一,我們瞭解了RxJava2是怎麼進行資料的傳送和接收。那麼這次我們主要是來分析,RxJava2是如何進行執行緒排程的。
RxJava2 執行緒排程
我們知道執行緒的排程還是很關鍵的,因為如果正常情況下,android給我們提供的方式是通過Hadler機制來進行執行緒間通訊的,當了解了RxJava2,我們就多了一種通訊方式,而且它進行執行緒間的切換是更加簡潔的。
直接通過鏈式呼叫,就解決了執行緒間的切換
RxJava2 怎麼進行執行緒排程
1、執行緒排程 subscribeOn
subscribeOn這裡會指定Observable的排程器,也就是指定事件產生的執行緒。比如我們的事件是emitter.onNext("1");那麼指定到哪個執行緒呢,這裡我們指定到Schedulers.io
Schedulers.io( )預設是一個CachedThreadScheduler,很像一個有執行緒快取的新執行緒排程器,這裡把它理解成一個IO執行緒就可以了
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1");//傳送訊息,事件產生的地方 emitter.onComplete(); Log.e(TAG, "subscribe called with: e = [" + emitter + "]" + Thread.currentThread()); } }).subscribeOn(Schedulers.io()) .map(new Function<String, String>() {//這裡加了一個map操作符,主要是看事件處理所在的執行緒 @Override public String apply(String s) throws Exception { Log.e(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread()); return s; } }) .subscribeOn(Schedulers.computation()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"onSubscribe() d=" + d ); } @Override public void onNext(String s) { Log.e(TAG,"onNext() s=" + s ); } @Override public void onError(Throwable e) { Log.e(TAG,"onError() e=" + e ); } @Override public void onComplete() { Log.e(TAG,"onComplete() " ); Log.e(TAG, "onComplete() called with:執行緒:" + Thread.currentThread()); } });
我進到subscribeOn 去檢視裡面的實現
Observable#subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));//主要部分 }
繼續看 ObservableSubscribeOn
ObservableSubscribeOn #subscribeActual
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); //主要部分 }
ObservableSubscribeOn#SubscribeTask
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent);//被觀察者subscribe 觀察者,這部分的任務已經被切換到子執行緒中了 } }
SunscribeTask 實現了Runnable,我們看到source.subscribe(parent); 這一步,是完成訂閱,而這部分內容是在子執行緒中處理的,這個時候parent是SubscribeOnObserver
我們看一下SubscribeOnObserver的實現
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { ... SubscribeOnObserver(Observer<? super T> downstream) { this.downstream = downstream; this.upstream = new AtomicReference<Disposable>(); } @Override public void onNext(T t) { downstream.onNext(t);//new Observer中的onNext被呼叫了 } @Override public void onError(Throwable t) { downstream.onError(t);//new Observer中的onError被呼叫了 } @Override public void onComplete() { downstream.onComplete();//new Observer中的onComplete被呼叫了 } @Override public void dispose() { DisposableHelper.dispose(upstream); DisposableHelper.dispose(this); } ... }
看程式碼中的註釋,我們發現完成了new Observer 那幾個方法的呼叫onNext,onError,onComplete。也就在那幾個方法中可以接收訊息。不同的是這部分內容是在指定的IO執行緒中進行的。
//接上前面的內容 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
看完SunscribeTask 繼續看scheduleDirect
Scheduler#scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
Scheduler#scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit);//開始執行任務 return task; }
w.schedule(task, delay, unit);這是最關鍵部分,在Worker的執行緒開始執行任務
順便我們看下DisposeTask,其實它主要是對之前的任務進行包裝,以便後面可以方便的取消執行緒任務,Disposable#dispose(用它取消任務)
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection { ... DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) { this.decoratedRun = decoratedRun; this.w = w; } @Override public void run() { runner = Thread.currentThread(); try { decoratedRun.run();//任務執行 } finally { dispose(); runner = null; } } ... }
最終我們看下我們的程式碼執行的結果:
apply() called with: s = [1]Thread[RxCachedThreadScheduler-1,5,main] onComplete() called with:執行緒:Thread[RxCachedThreadScheduler-1,5,main] subscribe() called with: e = [CreateEmitter{DISPOSED}]Thread[RxCachedThreadScheduler-1,5,main]
你可能會問 onComplete()怎麼也在IO執行緒呢,因為我們沒有指定Observer在哪裡執行,這是接下來要講的。
2、執行緒排程 observeOn
observeOn指定observer將會在哪個Scheduler觀察這個Observable.
具體點就是:ObserveOn 指定觀察者的onNext, onError和onCompleted方法在哪個執行緒被執行
這次我們增加observeOn(AndroidSchedulers.mainThread())
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); Log.e(TAG, "subscribe() called with: e = [" + emitter + "]" + Thread.currentThread()); } }).subscribeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { //依然是io執行緒 Log.e(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread()); return s; } }) .observeOn(AndroidSchedulers.mainThread())//新增加部分 .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"onSubscribe() d=" + d ); } @Override public void onNext(String s) { Log.e(TAG,"onNext() s=" + s ); } @Override public void onError(Throwable e) { Log.e(TAG,"onError() e=" + e ); } @Override public void onComplete() { Log.e(TAG,"onComplete() " ); Log.e(TAG, "onComplete() called with:執行緒:" + Thread.currentThread()); } });
我們去看observeOn的實現
Observable#observeOn
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }
繼續看observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));//關鍵部分 }
接下來我們看ObservableObserveOn
ObservableObserveOn #ObservableObserveOn
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler;//它就是AndroidSchedulers.mainThread() this.delayError = delayError; this.bufferSize = bufferSize; }
ObservableObserveOn #subscribeActual
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker();//scheduler就是上面的AndroidSchedulers.mainThread() source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));//主要部分 } }
這裡我們主要看ObserveOnObserver的實現
ObserveOnObserver#schedule
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { void schedule() { if (getAndIncrement() == 0) { worker.schedule(this);//this就是當前的ObserveOnObserver,它實現了Runnable,這把它放到了AndroidSchedulers.mainThread()建立的work 中處理 } } }
通過上面我們知道,worker.schedule(this) 把當前任務切換到了主執行緒中
apply() called with: s = [1]Thread[RxCachedThreadScheduler-1,5,main] subscribe() called with: e = [CreateEmitter{DISPOSED}]Thread[RxCachedThreadScheduler-1,5,main] onComplete() called with:執行緒:Thread[main,5,main]
從日誌中,我們能看到onComplete也確實已經被切換到了主執行緒中執行。
總結
- 我們看到了Observable主要是通過subscribeOn來指定被觀察者事件處理所在的執行緒
- Observer 主要是通過observeOn 來指定觀察者的事件處理所在的執行緒
- 都會在原始碼中做相應的轉換
如果對你有一點點幫助,那是值得高興的事情。:)