前言 通过前一篇的RxJava2 是如何实现线程切换的 (上) 我们已经知道了在RxJava中,subscribeOn 将上游线程切换到指定的子线程 是如何实现的。这里就接着来看,observeOn 是如何将下游线程切换到指定线程的。
RxJava - subscribeOn 这里可以通过UML图简单回顾一下subscribeOn的原理。
通过 subscribeOn 我们完成了以下操作:
创建了一个 ObservableSubscribeOn 对象,本质上来说他就是一个Observable,他同时实现了 AbstractObservableWithUpstream(HasUpstreamObservableSource )这样一个接口,是他变了一个拥有上游 的Observeable。
在 ObservableSubscribeOn 的 subscribeActual 方法中
1 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
将真正的 subscribe 操作安置在了SubscribeTask这样个一个Runnable当中,这个 Runnable 将由scheduler 这个调度器负责启动,因此就把上游操作放到了 scheduler 所在的线程中。
Schedulers.newThread()或者Schedulers.io() 都是通过工厂方法的模式创建了某种指定类型 的线程, 当这个特定的线程执行是,就是执行真实的 subscribe 方法,这样就把上游操作放到了一个特定的线程中去执行。
RxJava - observeOn 简单回顾完 subscribeOn 之后,我们就来看看 observeOn 是如何工作的。
其实,了解 subscribeOn 的原理之后,再来看 observeOn 就简单多了,类的命名及实现思路都有很多相似之处,可以对照着理解 。
RxJava的代码写的非常巧妙,可以说是百读不厌,可以学习的地方特别多。为了避免陷入只见树木不见森林的噩梦,我们就带着以下问题去探索 observeOn 的奥秘。
在 Android 中线程间传递消息会使用 Handler,这里是否使用?又是如何使用的?
AndroidSchedulers.mainThread() 做了什么 ?
下游任务是如何保证被分配到指定线程的。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void multiThread () { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe (ObservableEmitter<String> e) throws Exception { e.onNext("This msg from work thread :" + Thread.currentThread().getName()); sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName()); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept (String s) throws Exception { Log.e(TAG, "accept: s= " + s); } }); }
我们还是以这段代码为例,来看看 observeOn 的工作原理。这里通过observeOn(AndroidSchedulers.mainThread())将下游线程切换到了我们非常熟悉的 Android UI 线程。这样就可以确保我们在下游所有的操作都是在 UI 线程中完成。这里和讨论 subscribeOn 一样,我们就从这句代码出发,看看这背后到底发生了什么。
有了上一篇的经验,我们知道 AndroidSchedulers.mainThread() 一定去创建了某种类型的调度器,为了方便后面的叙述,这一次我们先从调度器的创建说起,后面再看 observeOn() 的具体实现。
需要注意的是 AndroidSchedulers 并不是 RxJava 的一部分,是为了在 Android 中方便的使用 RxJava 而专门设计的一个调度器实现,源码RxAndroid 设计非常巧妙;使用前记得在gradle文件中配置依赖。
AndroidSchedulers.mainThread() 下面就来看看 AndroidSchedulers.mainThread() 这个我们非常熟悉的 Scheduler 是如何创建的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final class AndroidSchedulers { private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call () throws Exception { return MainHolder.DEFAULT; } }); public static Scheduler mainThread () { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } }
这里我们可以认为,当调用AndroidSchedulers.mainThread() 时,返回了一个HandlerScheduler 的实例,而这个实例使用到了我们非常熟悉的 Handler。那么重点就来到HandlerScheduler 了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 final class HandlerScheduler extends Scheduler { private final Handler handler; HandlerScheduler(Handler handler) { this .handler = handler; } @Override public Disposable scheduleDirect (Runnable run, long delay, TimeUnit unit) { if (run == null ) throw new NullPointerException("run == null" ); if (unit == null ) throw new NullPointerException("unit == null" ); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, Math.max(0L , unit.toMillis(delay))); return scheduled; } @Override public Worker createWorker () { return new HandlerWorker(handler); } private static final class HandlerWorker extends Worker { private final Handler handler; private volatile boolean disposed; HandlerWorker(Handler handler) { this .handler = handler; } @Override public Disposable schedule (Runnable run, long delay, TimeUnit unit) { if (run == null ) throw new NullPointerException("run == null" ); if (unit == null ) throw new NullPointerException("unit == null" ); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this ; handler.sendMessageDelayed(message, Math.max(0L , unit.toMillis(delay))); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } @Override public void dispose () { disposed = true ; handler.removeCallbacksAndMessages(this ); } @Override public boolean isDisposed () { return disposed; } } private static final class ScheduledRunnable implements Runnable , Disposable { private final Handler handler; private final Runnable delegate; private volatile boolean disposed; ScheduledRunnable(Handler handler, Runnable delegate) { this .handler = handler; this .delegate = delegate; } @Override public void run () { try { delegate.run(); } catch (Throwable t) { IllegalStateException ie = new IllegalStateException("Fatal Exception thrown on Scheduler." , t); RxJavaPlugins.onError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } } @Override public void dispose () { disposed = true ; handler.removeCallbacks(this ); } @Override public boolean isDisposed () { return disposed; } } }
这个类虽然很简单,但是设计非常巧妙。
首先 HandlerScheduler 是一个 Scheduler ,通过构造函数他获取到了主线程所在的 Handler实例。而在他的 createWorker() 方法中,他又通过这个 Handler 实例创建了一个HandlerWorker 的实例,这个HandlerWorker 本质上就是一个 Worker。在他的 schedule 方法中,创建了一个 ScheduleRunnable 对象,并会把这个Runnable对象通过 handler 的 sendMessageDelayed 方法发送出去,而我们知道这个 Handler 是主线程,这样在下游中,就把任务从某个子线程转移到了UI线程。
ScheduleRunnable 不但实现了 Runnable ,而且实现了我们看到过无数次的 Disposable 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void run () { try { delegate.run(); } catch (Throwable t) { } } @Override public void dispose () { disposed = true ; handler.removeCallbacks(this ); }
这样,正确情况下 run 方法会正常执行线程中的任务,而一旦 disposable 对象执行了dispose()方法,那么 handler.removeCallbacks(this),就可确保在 handler 的 dispatchMessage 方法中,不会在执行任何操作,从而达到了 dispose 的效果。
observeOn 下面就来看看 Observable 中的 observeOn 方法
Observable.java — observeOn
1 2 3 4 5 6 7 8 9 10 public final Observable<T> observeOn (Scheduler scheduler) { return observeOn(scheduler, false , bufferSize()); } public final Observable<T> observeOn (Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); ObjectHelper.verifyPositive(bufferSize, "bufferSize" ); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this , scheduler, delayError, bufferSize)); }
这个方法的实现和 subscribeOn 的实现非常相似,多了两个参数 delayError 和 buffersize 。 buffersize 可以认为是RxJava内部的一个静态变量,默认情况下他的值是128。通过我们之前的经验,这里可以把 observeOn 的过程简化如下:
1 new ObservableObserveOn<T>(this , scheduler, delayError, bufferSize)
也就是说 observeOn 这个操作符给我们返回了一个 ObservableObserveOn 对象。很容易想到他也是一个 Observeable。那么我们就去看看这个 ObservableObserveOn 到底是什么?我们最关心的 subscribeActual 方法他又是怎样实现的。
ObservableObserveOn 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public final class ObservableObserveOn <T > extends AbstractObservableWithUpstream <T , T > { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn (ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super (source); this .scheduler = scheduler; this .delayError = delayError; this .bufferSize = bufferSize; } @Override protected void subscribeActual (Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } }
和 ObservableSubscribeOn 一样,他也继承了 AbstractObservableWithUpstream ,这样他也是一个拥有上游的 Observeable,他的构造函数很简单,没什么可以说。这里我们重点关注一下 subscribeActual 方法的实现。这里我们的使用的**Scheduler 实例是 AndroidSchedulers.mainThread()**,因此就按 else的逻辑分析。
1 2 3 4 Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
通过 scheduler.createWorker() 创建了 Worker 这个对象。这里结合之前对 AndroidSchedulers.mainThread() 的分析,此处的 worker 对象是就是一个持有主线程 handler 引用的 Worker。
接着用这个worker又创建了一个ObserveOnObserver对象。看看这个类的实现。
1 2 static final class ObserveOnObserver <T > extends BasicIntQueueDisposable <T >implements Observer <T >, Runnable { ....}
这个类功能非常强大,首先是一个 Observer ,同时也是一个Runnable,并且还继承了 BasicIntQueueDisposable(保证原子性、拥有操作队列功能和 Disposable功能)。
1 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
我们关注一下这行代码,根据之前的说法这里的 source 是其父类(AbstractObservableWithUpstream)中的成员变量,也就是说是上游,那么当前ObservableObserveOn 的上游是谁呢? 就是我们上一篇所说的 ObservableSubscribeOn 。
因此,当这里开始执行订阅方法 subscribe() 后,将以如下顺序响应:
Observable.subscribe—>Observable.subscribeActual—> ObservableObserveOn.subscribeActual—> ObservableSubscribeOn.subscribeActual—>ObservableCreate.subscribeActual
这些方法的参数均为 observer,通过层层回调,最后的 subscribeActual(Observer<? super T> observer) 执行时,这个 observer 持有之前几个 observer 的引用。
我们再看一下 ObservableCreate.subscribeActual
1 2 3 4 5 6 7 8 9 10 11 12 @Override protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
可以看到,这里首先会触发 observer.onSubscribe ,我们再看一下 ObservableSubscribeOn.subscribeActual
1 2 3 4 5 6 7 8 @Override public void subscribeActual (final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
好了,这样我们又回到了原点:
1 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
回到了最初的 Observer:ObserveOnObserver
这个 ObserveOnObserver 持有我们一开始创建的observer,也就是一个Consumer对象。
下面就来看看这个 ObserveOnObserver
1 2 3 4 5 6 ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this .actual = actual; this .worker = worker; this .delayError = delayError; this .bufferSize = bufferSize; }
这里指的注意的一点 ,actual 其实就是observer
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void onSubscribe (Disposable s) { if (DisposableHelper.validate(this .s, s)) { this .s = s; queue = new SpscLinkedArrayQueue<T>(bufferSize); actual.onSubscribe(this ); } }
在ObservableCreate.subscribeActual 中我们知道,当执行subscribe 方法后,首先会执行 observer的 onSubscribe 方法。这里的实现非常简单,就是创建了一个queue,并触发了这个 observer 自己的 onSubscribe 方法。
1 2 3 4 5 6 7 8 9 10 11 @Override public void onNext (T t) { if (done) { return ; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
在 onNext 中会执行 scheule() 方法。
1 2 3 4 5 void schedule () { if (getAndIncrement() == 0 ) { worker.schedule(this ); } }
这个地方就有意思了,前面说过这里的 worker 是一个持有主线程handler 的Worker对象,当他的 schedule 执行时,就会把特定的线程任务通过Handler.postDelay 方法转移到主线中去执行 。
那么这里的this 又是什么呢?前面我们说过,ObserveOnObserver 这个类功能非常强大,他是一个Runnable,那么这里就是执行他自己的run方法喽,我们赶紧看看。
1 2 3 4 5 6 7 8 @Override public void run () { if (outputFused) { drainFused(); } else { drainNormal(); } }
这里有一个参数 outputFused 他默认是false,至于他什么时候为true,不作为这里讨论的重点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 void drainNormal () { int missed = 1 ; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return ; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return ; } boolean empty = v == null ; if (checkTerminated(d, empty, a)) { return ; } if (empty) { break ; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0 ) { break ; } } }
这里大概就是通过一个死循环,不断从 onSubscribe 方法中创建的队列中取出事件,执行observer 的 onNext方法。而当为例为空时,就会执行worker.dispose 取消整个事件流,同时从Handler中移除所有消息。
最后在看一眼 onComplete ,onError 和整个类似
1 2 3 4 5 6 7 8 @Override public void onComplete () { if (done) { return ; } done = true ; schedule(); }
可以看到这里的处理也很简单,done 设置为 true .这样最后便完成了下游事件的执行。
最后 好了,由于一些无以诉说的原因,经历了很久终于把 RxJava 线程切换的下篇给完成了。