0%

RxJava2 是如何实现线程切换的 (上)

前言

通过前一篇的从观察者模式出发,聊聊RxJava,我们大致理解了RxJava的实现原理,在RxJava中可以非常方便的实现不同线程间的切换。subscribeOn 用于指定上游线程,observeOn 用于指定下游线程,多次用 subscribeOn 指定上游线程只有第一次有效,多次用 observeOn 指定下次线程,每次都有效;简直太方便了,比直接使用Handler省了不少力气,同时也不用去关注内存泄漏的问题了。本篇就来看看在RxJava中上游是如何实现线程切换。

RxJava 基础原理

为了方便后面的叙述,这里通过下面的UML图简单回顾一下上一篇的内容。

此图并没有完整的展现图中各个接口和类之间的各种关系,因为那样会导致整个图错综复杂,不便于查看,这里只绘制出了RxJava各个类之间核心关系网络

从上面的UML图中可以看出,具体的实现类只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的内部类(PlantUML 怎么绘制内部类,没搞懂,玩的转的同学请赐教呀(^▽^))。

上篇说过Observable创建的过程,可以简化如下:

1
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())

结合图可以更直观的体现出这一点。ObservableCreate 内部持有ObservableOnSubscribe的引用。

当观察者订阅主题后:

1
mObservable.subscribe(mObserver);

ObservableCreate 中的subscribeActual()方法就会执行,

1
2
3
4
5
6
7
8
9
10
11
12
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

在这个过程中会创建CreateEmitter 的实例,而这个CreateEmitter实现了Emitter和Disposable接口,同时又持有Observer的引用(当然这个引用是ObservableCreate传递给他的)。接着就会执行ObservableOnSubscribe的subscribe 方法,方法的参数即为刚刚创建的CreateEmitter 的实例,接着一系列连锁反应,Emitter 接口中的方法(onNext,onComplete等)开始执行,在CreateEmitter内部,Observer接口中对应的方法依次执行,这样就实现了一次从主题(上游)到观察者(下游)的事件传递。

source.subscribe(parent)

这里的 source 是ObservableOnSubscribe的实例,parent是CreateEmitter的实例。上面加粗文本叙述的内容,就是这行代码,可以说这是整个订阅过程最核心的实现。

好了,回顾完基础知识后,马上进入正题,看看RxJava是如何实现线程切换的。

RxJava 之 subscribeOn

我们知道正常情况下,所有的内容都是在主线程执行,既然这里提到了线程切换,那么必然是切换到了子线程,因此,这里需要关注线程的问题,我们就带着下面这几个问题去阅读代码。

  • 1.是哪个对象在什么时候创建了子线程,是一种怎样的方式创建的?
  • 2.子线程又是如何启动的?
  • 3.上游事件是怎么跑到子线程里执行的?
  • 4.多次用 subscribeOn 指定上游线程为什么只有第一次有效 ?

示例

首先看一下,日常开发中实现线程切换的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

private void multiThread() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This msg from work thread :" + Thread.currentThread().getName());
sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: s= " + s);
}
});
}

这段代码,使用过RxJava的同学再熟悉不过了,上游事件会在一个名为 RxNewThreadScheduler-1 的线程执行,下游线程会切换回我们熟悉的Android UI线程。

我们就从subscribeOn(Schedulers.newThread()) 出发,看看这个代码的背后,到底发生了什么。

subscribeOn

这里我们先不管Schedulers.newThread() 是什么鬼,首先看看这个subscribeOn()方法。

Observable.java— subscribeOn(Scheduler scheduler)

1
2
3
4
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

可以看到,这个方法需要一个Scheduler 类型的参数。

RxJavaPlugins.java— onAssembly(@NonNull Observable source)

1
2
3
4
5
6
7
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

O(∩_∩)O哈哈~,是不是觉得似曾相识,和create操作符一个套路呀。因此,observeOn也可以简化如下:

1
new ObservableSubscribeOn<T>(this, Schedulers.newThread());

这里你也许会有疑问,这个this是什么呢?其实这个this就是Observable,具体到上面的代码来说就是ObservableCreate,总之就是一个具体的Observable。

接着看ObservableSubscribeOn 这个类

1
2
3
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
}

看一下 AbstractObservableWithUpstream.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

/** The source consumable Observable. */
protected final ObservableSource<T> source;

AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}

@Override
public final ObservableSource<T> source() {
return source;
}

}

再看一下 HasUpstreamObservableSource.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Interface indicating the implementor has an upstream ObservableSource-like source available
* via {@link #source()} method.
*
* @param <T> the value type
*/
public interface HasUpstreamObservableSource<T> {
/**
* Returns the upstream source of this Observable.
* <p>Allows discovering the chain of observables.
* @return the source ObservableSource
*/
ObservableSource<T> source();
}

饶了半天,ObservableSubscribeOn 原来和上一篇说的ObservableCreate一样,也是Observable的一个子类。只不过比ObservableCreate多实现了一个接口HasUpstreamObservableSource,这个接口很有意思,他的source()方法返回类型是ObservableSource(还记得这个类的角色吗?)。也就是说ObservableSubscribeOn这个Observable是一个拥有上游的Observable。他有一个非常关键的方法source(),这个方法的返回值就是ObservableSource,也就是说通过调用实现source方法,可以获取到上游的Observeable。AbstractObservableWithUpstream类继承HasUpstreamObservableSource 并实现了这个source方法,返回结果为ObservableSource,是通过其构造函数获取。,那么AbstractObservableWithUpstream的构造函数又是在哪里调用的呢?

我们接着看ObservableSubscribeOn的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// observer 调用onSubscribe方法,获取上游的控制权
s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
  • 首先看他的构造函数,参数source就是我们之前提到过的this,scheduler就是Schedulers.newThread()。同时调用了父类AbstractObservableWithUpstream的构造函数,这里结合之前的结论,我们可以确定通过这个构造函数,就创建出来了一个包含上游的ObservableSubscribeOn实例。
  • 再看实现订阅关系的关键方法subscribeActual(),在这里创建了一个SubscribeOnObserver的实例,SubscribeOnObserver 是AtomicReference的子类(保证原子性),同时实现了 Observer接口 和 Disposable 接口;你可以把他理解成一个Observer。

我们之前说过,subscribeActual()是实现上下游之间订阅关系的重要方法。因为只有真正实现了订阅关系,上下游之间才能连接起来。我们看这个方法的最后一句代码。

1
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

这句代码,可以说就是非常关键,因为从这里开始了一系列的连锁反应。首先看一下SubscribeTask

1
2
3
4
5
6
7
8
9
10
11
12
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;

SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}

@Override
public void run() {
source.subscribe(parent);
}
}

看到这句 **source.subscribe(parent)**,是不是觉得似曾相识呢?

SubscribeTask 实现了是Runnable接口,在其run方法中,定义了一个需要在线程中执行的任务。按照类的继承关系,很明显source 就是ObservableSubscribeOn 的上游Observable,parent是一个Observer。也就是说这个run方法要执行的内容就是实现ObservableSubscribeOn的上游和Observer的订阅。一旦某个线程执行了这个Runnable(SubscribeTask),就会触发了这个run方法,从而实现订阅,而一旦这个订阅实现,那么后面的流程就是上节所说的事情了。

这里可以解答第三个问题了,上游事件是怎么给弄到子线程里去的,这里很明显了,就是直接把订阅方法放在了一个Runnable中去执行,这样就一旦这个Runnable在某个子线程执行,那么上游所有事件只能在这个子线程中执行了。

好了,线程要执行的任务似乎创建完了,下面就接着找看看子线程是怎么创建的。回过头继续看刚才的方法,

1
scheduler.scheduleDirect(new SubscribeTask(parent))

Scheduler.java—-scheduleDirect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}


public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
// 对run进行了一次装饰
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}

@NonNull
// 抽象方法
public abstract Worker createWorker();

首先看一下Worker类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
*/
public abstract static class Worker implements Disposable {

@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}


@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);


}

Worker是Scheduler内部的一个静态抽象类,实现了Disposable接口,其schedule()方法也是抽象的。

再看一下DisposeTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;

Thread runner;

DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}

@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}

@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}

@Override
public boolean isDisposed() {
return w.isDisposed();
}
}

DisposeTask 又是一个Runnable,同时也实现了Disposable接口。可以看到在他的run方法中会执行decoratedRun的run方法,这个decoratedRun其实就是参数中传递进来的run,也就是说,执行了这个DisposeTask的run方法,就会触发SubscribeTask中的run方法,因此,我们就要关注是谁执行了这个DisposeTask。

回到scheduleDirect()方法

1
2
3
4
5
6
7
8
9
10
11
  public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
// 对run进行了一次装饰
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}

scheduleDirect()方法的实现我们总结一下:

  1. 创建一个Worker对象w,而在Scheduler类中createWorker()方法被定义为抽象方法,因此我们需要去Scheduler的具体实现中了解这个Worker的具体实现。
  2. 对参数run通过RxJavaPlugins进行一次装饰,生成一个decoratedRun的Runnable(通过源码可以发现,其实什么也没干,就是原样返回)
  3. 通过decoratedRun和w生成一个DisposeTask对象task
  4. 通过Worker的schedule方法开始执行这个task。

ε=(´ο`*)))唉,说了这么久,子线程是如何创建的依然不清楚,无论是SubscribeTask还是DisposeTask只是定义会在某个子线程中执行的任务,并不代表子线程已被创建。但是通过以上代码,我们也可以收获一些有价值的结论:

  • 最终的Runnable任务,将由某个具体的Worker对象的scheduler()方法执行。
  • 这个scheduleDirect会返回一个Disposable对象,这样我们就可以通过Observer去控制整个上游的执行了。

好了,到这里对于subscribeOn()方法的分析已经到了尽头,我们找了最终需要运行子任务的对象Worker,而这个Worker是个抽象类,因此我们需要关注Worker的具体实现了。

下面我们就从刚才丢下的Schedulers.newThread() 换个角度来分析,看看能不能找到这个Worker的具体实现。

Schedulers.newThread()

前面说了subscribeOn()方法需要一个Scheduler 类型的参数,然而通过前面的分析我们知道Scheduler是个抽象类,是无法被实例化的。因此,这里就从Schedulers类出发。

1
2
3
4
5
/**
* Static factory methods for returning standard Scheduler instances.
*/
public final class Schedulers {
}

注释很清楚,这个Schedulers就是一个用于生成Scheduler实例的静态工厂。

下面我们就来看看,在这个工厂中newThread() 生成了一个什么样的Scheduler实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
   @NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}

static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}

newThread() 方法经过层层委托处理(最终的创建方式,有点单例模式的意味),最终我们需要的就是一个NewThreadScheduler的实例。

NewThreadScheduler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final class NewThreadScheduler extends Scheduler {

final ThreadFactory threadFactory;

private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;

/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}

public NewThreadScheduler() {
this(THREAD_FACTORY);
}

public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}

不出所料NewThreadScheduler 是Scheduler的一个子类,在他的静态代码块中构造了一个Priority=5的线程工厂。而在我们最最关注的createWorker()方法中他又用这个线程工厂创建了一个NewThreadWorker 的实例。下面就让我们看看最终的NewThreadWorker 做了些什么工作。

NewThreadWorker.java(节选关键内容)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;

volatile boolean disposed;

public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}

@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}



@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}

}

众里寻他千百度,终于找到了Worker的实现了,同时再一次不出所料的又一次实现了Disposable接口,o(╥﹏╥)o。

在其构造函数中,通过NewThreadScheduler中提供的线程工厂threadFactory创建了一个ScheduledExecutorService。

ScheduledExecutorService.java —create

1
2
3
4
5
6
7
8
9

public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}

用大名鼎鼎的Executors(Executor的工具类),创建了一个核心线程为1的线程。

至此,我们终于找到了第一个问题的答案,子线程是谁如何创建的;在NewThreadScheduler的createWorker()方法中,通过其构建好的线程工厂,在Worker实现类的构造函数中创建了一个ScheduledExecutorService的实例,是通过SchedulerPoolFactory创建的。

同时可以看到,通过执行dispose 方法,可以使用ScheduledExecutorService的shutdown()方法,停止线程的执行。

线程已经创建好了,下面就来看看到底是谁启动了这个线程。前面我们说过,Worker的schedule()方法如果执行了,就会执行我们定义好的Runnable,通过这个Runnable中run方法的执行,就可以实现上下游订阅关系。下面就来看看这个scheduler()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

到这里,已经很明显了,在schedulerActual方法中,会通过刚才创建好的子线程对象executor通过submit或schedule执行一个Runnable任务(虽然这个Runnable对象再一次经过了各种装饰和包装,但其本质没有发生变化),并将执行结果封装后返回。而这个Runnable对象追根溯源来说,就是我们在ObservableSubscribeOn类中创建的一个SubscribeTask对象。因此,当这个子线程开始运行的时候就是执行SubscribeTask中run()方法的时机;一旦这个run方法执行,那么

1
source.subscribe(parent)

这句最关键的代码就开始执行了,一切的一切又回到了我们上一篇那熟悉的流程了。

好了,按照上面的流程捋下来,感觉还是有点分散,那么就用UML图看看整体的结构。

我们看最下面的ObservableSubscribeOn,他是subscribeOn 返回的Observable对象,他持有一个Scheduler 实例的引用,而这个Scheduler实例就是NewThreadScheduler(即Schedulers.newThreade())的一个实例。ObservableSubscribeOn 的subscribeActual方法,会触发NewThreadScheduler去执行SubscribeTask中定义的任务,而这个具体的任务又将由Worker类创建的子线程去执行。这样就把上游事件放到了一个子线程中实现。

至于最后一个问题,**多次用 subscribeOn 指定上游线程为什么只有第一次有效?**,看完通篇其实也很好理解了,因为上游Observable只有一个任务,就是subscribe(准确的来说是subscribeActual()),而subscribeOn 要做的事情就是把上游任务切换到一个指定线程里,那么一旦被切换到了某个指定的线程里,后面的切换不就是没有意义了吗。

好了,至此上游事件切换到子线程的过程我们就明白了。下游事件又是如何切换的且听下回分解,本来想一篇写完的,结果发现越写越多,只能分成两篇了!!!o(╯□╰)o。

写在后面的话

关于Disposable

在RxJava的分析中,我们经常会遇到Disposable这个单词,确切的说是接口,这里简单说一说这个接口。

1
2
3
4
5
6
7
/**
* Represents a disposable resource.
*/
public interface Disposable {
void dispose();
boolean isDisposed();
}

我们知道,在Java中,类实现某个接口,通俗来说就是代表这个类多了一项功能,比如一个类实现Serializable接口,代表这个类是可以序列化的。这里Disposable也是代表一种能力,这个能力就是Disposable,就是代表一次性的,用后就丢弃的,比如一次性筷子,还有那啥。

在RxJava中很多类都实现了这个接口,这个接口有两个方法,isDisposed()顾名思义返回当前类是否被抛弃,dispose()就是主动抛弃。因此,所有实现了这个接口的类,都拥有了这样一种能力,就是可以判断自己是否被抛弃,同时也可以主动抛弃自己。

上一篇我们说了,Observer通过onSubscribe(@NonNull Disposable d),会获得一个Disposable,这样就有能力控制上游的事件发送了。这样,我们就不难理解,为什么那么多类实现了这个接口,因为下游获取到的是一个拥有Disposable的对象,而一旦拥有了一个这样的对象,那么就可以通过下游控制上游了。可以说,这是RxJava对常规的观察者模式所做的最给力的改变。

关于各种ObservableXXX ,subscribeXXX,ObserverXXX

在查看RxJava的源码时,可能很多人都和我一样,有一个巨大的困扰,就是这些类的名字好他妈难记,感觉长得都差不多,关键念起来好像也差不多。但其实本质上来说,RxJava对类的命名还是非常规范的,只是我们不太习惯而已。按照英文单词翻译:

  • Observable 可观察的(主题 ,上游)
  • Observer 观察者 (订阅者,下游)
  • Subscribe 订阅

其实就这么三个主语,其他的什么ObservableCreate,ObservableSubscribeOn,AbstractObservableWithUpstream,还有上面提到的Disposable,都是对各种各样的Observable和Observer的变形和修饰结果,只要理解这个类的核心含义是什么,就不会被这些名字搞晕了。

RxJava 可以说是博大精深,以上所有分析完全是个人平时使用时的总结与感悟,有任何错误之处,还望各位读者提出,共同进步。

关于RxJava 这里墙裂推荐一篇文章一篇不太一样的RxJava介绍,感觉是自扔物线那篇之后,对RxJava思想感悟最深的一篇了。对RxJava 有兴趣的同学,可以多度几遍,每次都会有收获!!


加个鸡腿呗.