RxJava:執行緒切換
上一篇:RxJava:基本訂閱流程
我們在Rxjava中最常用的兩個方法:
-
subscribeOn(Scheduler scheduler)
-
observeOn(Scheduler scheduler)
通常,我們使用subscribeOn(Schedulers.io())
方法指定在IO執行緒中訂閱-----進行資料處理,observeOn(AndroidSchedulers.mainThread())
方法指定在主執行緒中觀察-----進行UI操作。
so,Rxjava是如何進行執行緒切換的?
Observable.subscribeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));//1 }
可以看到,subscribeOn()方法與Observable.create()類似,都返回了一個Observable<T>物件,其內部主要呼叫了ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler)
方法,構造了一個ObservableSubscribeOn物件。ObservableSubscribeOn類不長,我們貼出它的全部程式碼,如下:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {//2 super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //Disposable用來做資源處理,在這裡我們不用關心 parent.setDisposable(scheduler.scheduleDirect(new Runnable() {//3 @Override public void run() { source.subscribe(parent); } })); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } } }
ObservableSubscribeOn類繼承關係如下:
ObservableSubscribeOn<T> extends AbstractObservableWithUpstream
bstractObservableWithUpstream<T, U> extends Observable<U>
ObservableSubscribeOn類也是一個Observable,我們從名字上也可以看出,這是一個進行了SubscribeOn的Observable。那麼,他和我們上一篇提到的ObservableCreate類最大的區別在哪裡?
我們再貼出Observable實現類中的關鍵方法:ObservableCreate.subscribeActual(Observer<? super T> observer)
,做一個對比:
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
-
-
ObservableCreate內部定義了發射器類-----CreateEmitter,observer.onSubscribe()方法傳入的是CreateEmitter發射器物件。
-
ObservableSubscribeOn內部定義了一個SubscribeOnObserver物件,subscribeActual方法中,s.onSubscribe()傳入的是這個SubscribeOnObserver物件。即對Observer做一次重新包裝,放入一個Runnable中執行。
SubscribeOnObserver也是一個Observer,將我們傳入的Observeable進行了包裝
-
-
-
ObservableCreate.subscribeActual
中,執行了source.subscribe(parent)
即使用發射器執行我們定義的subscribe(ObservableEmitter<Bundle> e)
方法; -
ObservableSubscribeOn.subscribeActual
中,執行的是scheduler.scheduleDirect
,傳入一個Runnable,來執行source.subscribe(parent)
。
-
我們主要看一下關鍵邏輯步驟:scheduler.scheduleDirect()方法,瞭解Runnable是如何在我們指定執行緒中執行的。
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); w.schedule(new Runnable() { @Override public void run() { try { decoratedRun.run(); } finally { w.dispose(); } } }, delay, unit); return w; }
我們可以看到,其內部呼叫的了Worker.schedule()方法,來實際執行我們傳入的Runnable。而createWorker()
是一個抽象方法,其具體實現由我們傳入的Scheduler的實現類決定,即我們呼叫subscribeOn(Scheduler scheduler)
中傳入的Scheduler。在上文程式碼中,我用註釋//1 //2 //3標明瞭這條引用鏈。
我們就以Schedulers.io()
為例:我們具體看下它的實現。我們先來看一下Schedulers.io()
所代表的Scheduler:
public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } static { ...... IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }); ...... } static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
Schedulers.io()
返回的是一個Scheduler物件。在IoScheduler類中,createWorker()返回了一個EventLoopWorker物件,EventLoopWorker.schedule方法如下:
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }
scheduleActual方法中,對Runnable進行了一系列封裝,最終使用的是一個Android的一個ExecutorService實現類----ScheduledExecutorService來執行。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { parent.remove(sr); RxJavaPlugins.onError(ex); } return sr; }
這個executor構造如下:
public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); }
public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); if (exec instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec; POOLS.put(e, exec); } return exec; }
我們從執行緒池中獲取一個ExecutorService,使用該ExecutorService執行我們傳入的Runnable。總結以上:Observable.observeOn(Scheduler)
切換執行緒的原理在於:將目標 Observer 的 onNext(T)/onError(Throwable)/onComplete() 置於指定執行緒中執行
。
Observable. observeOn(Scheduler scheduler)
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); }
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
observeOn實際上返回的是一個ObservableObserveOn例項。我們知道,subscribe實際執行的是Observable實現類的subscribeActual方法,所以,我們需要關注的是ObservbleObserveOn.subscribeActual()
。
ObservableObserveOn.subscribeActual()
程式碼如下:
protected void subscribeActual(Observer<? super T> observer) { //如果傳入的scheduler指代當前執行緒,不進行執行緒切換,直接呼叫原observer的subscribe()方法,此處的//source是上游的ObservableSource(Observable) if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
我們重點關注的是else中的執行緒切換流程,和subscribeOn()方法進行執行緒切換類似,也是由傳入的Scheduler決定Worker。這裡引入ObserveOnObserver類,傳入的observer進行封裝,即決定observer的執行執行緒。
ObserveOnObserver類實現了Runnable介面,run方法如下:
//ObserveOnObserver實現的run方法,drainFused和drainNormal中執行了Observer的回撥方法。 public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
ObserveOnObserver的worker主要作用於schedule()
方法,而schedule()
如下,即呼叫worker.schedule()方法,執行當前ObserveOnObserver。
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
So,observeOn方法也是通過Scheduler來實現執行緒切換的。而AndroidSchedulers.mainThread()
所代表的Scheduler如下:
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
在schedle()方法中,我們看到是通過handler,將傳入的Runnable傳送到主執行緒執行。
HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); //將傳入的Runnable和handler封裝成一個ScheduledRunnable,作為Message的Callback ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //使用Handler將訊息傳送MainLooper Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }