RxJava的本质上能够了解为一个异步操作库,能用十分简略的逻辑去处理繁琐的异步操作库。所以,它能在必定程度上代替项目中的handler、AsyncTask等等。有的时分,开源结构光是会用是不行的,想做进一步扩展就还得了解其原理。本文旨在梳理RxJava3源码流程,如有不当之处,请纵情指出。如需了解使用办法等,点此直达
调查者形式概念
调查者规划形式是RxJava中的一个核心规划形式。举个简单了解的比如,前段时间最火的世界杯,决赛阿根廷VS法兰西。

很多球迷熬夜看球。这时你能够把这场决赛当作被调查者,而咱们这些熬夜看球的球迷呢便是调查者,每逢梅西或者姆巴佩进球的时分,球迷就会欢呼,这就类似调查者形式中数据的改动。当然,梅西球迷看到姆巴佩进球不必定会有反响,就像姆巴佩球迷看到梅西进球相同,反响不会很大(调查数据不同)。
在APP开发进程中也会发生类似情况,假如你需求关心一个目标的数据,同时页面上的UI状态也会跟这个目标所绑定,则咱们在这个目标发生改动的时分,咱们就需求通知一切页面去做相应改动,这便是调查者形式。
再说个不是很恰当的比如,咱们关于微信公众号,当微信公众号发生消息推送的时分,咱们会去重视去看他,假如对其内容感兴趣,咱们会去点开它、阅览它。这也是一种调查者形式,所以能够说:假如A目标对B目标的数据十分灵敏,当B目标发生改动的一会儿,A目标要做出反响,这时分A目标便是调查者,B目标便是被调查者。同时,能够多个调查者对应一个被调查者,就像一个公众号能够有很多人重视相同。
调查者形式便是咱们众多的调查者对咱们的被调查者的数据高度灵敏改动之后的反响,这是一种多对一的联系,多个调查者对应一个被调查者。
下面是调查者形式的类图:

RxJava中的调查者形式
RxJava中的调查者形式少不了以下内容:1、被调查者;2、调查者;3、订阅。
一 、 被调查者 :
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("something");
}
});
上述代码是一个最简略的被调查者,可见这一切的起始都是create(),咱们先看该办法的完结:

Objects.requireNonNull(source, "source is null");
作用仅是非空判别,假如是空则抛出反常,

return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
在这儿RxJavaPlugins.onAssembly()是全局 hook 办法,为什么这样说呢,查看其他被调查者目标声明的办法时,不难发现终究仍是调用的这个办法,如:




所以需求重视的创立进程在传入RxJavaPlugins.onAssembly()的参数中,经过ObservableCreate目标生成的自界说source:


二 、 调查者 :
public interface Observer<@NonNull T> {
/**
* Provides the {@link Observer} with the means of cancelling (disposing) the
* connection (channel) with the {@link Observable} in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the {@link Observer} with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
源码中Observer是个接口。创立调查者便是创立这个接口的实例,而接口中的办法也便是直接暴露给用户。
三、订阅
这儿要要点重视Observable与Observer订阅的进程,都在subscribe()里。调用Observable的subscribe(),其办法源码如下:



CreateEmitter<T> parent = new CreateEmitter<>(observer);
生成一个发射器,传入参数是咱们自界说的调查者。

而在创立新发射器之后、在source.subscribe(…)之前履行了observer.onSubscribe(parent);



两种调查者的差异
两点不同:
在标准的调查者规划形式中,是一个“被调查者”,多个“调查者”,而且需求“被调查者”宣布改动通知后,一切的“调查者”才干调查到;
在RxJava调查者规划形式中,是多个“被调查者”,一个“调查者”,而且需求起点和结尾在“订阅”一次后,才宣布改动通知,结尾(调查者)才干调查到。

map 操作符源码流程
map操作符能直接对发射出来的事情进行处理而且发生新的事情,然后再次发射,下面是一个很简略的代码比如:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
}
}).map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Throwable {
return null;
}
}).subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Bitmap bitmap) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
仍是了解的链式调用,了解的配方。仍是三个切入点:被调查者、调查者、订阅。其间被调查者构建流程已在上面讲过,这儿不赘述。终究仍是来到了map():







RxJava的操作符流程知晓Observable和map流程即可,其他操作符流程都可从中触类旁通。
线程切换
除了调查者形式,RxJava中另一个重要的核心点便是线程切换了。在RxJava中完结线程切换的主要是subscribeOn()和observerOn()。
subscribeOn 源码解析
作用一句话归纳,给subscribeOn()上面的代码分配线程。例如下述示例:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("test");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
代码很简略,这儿将发射器发射“test”事情丢到了io线程处理。这回咱们先看下Schedulers.io()中的源码。














持续调查原代码的subscribeOn()









而终究这个Task交给w.schedule()处理。而w,即worker目标,在战略是Scherdulers.IO()时,则是调用IoScheduler中的worker目标的schedule():






observeOn源码解析
作用一句话归纳,observeOn的作用是给observeOn()下面的代码分配线程。在源代码中咱们参加observeOn(AndroidSchedulers.mainThread())的设置:







而这个主线程handler又是在哪里起作用的呢?这儿要看HandlerWorker调用的HandlerScheduler()中的schedule()(详细怎样调用到此处,后边说observeOn的流程会提到):










void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
... //省略
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
可看到终究代码经过a.onNext(v)将成果传递出去,而HandlerScheduler.schedule()则将成果事情切换到主线程,终究的成果在主线程中传递。
总结
1、Worker便是RxJava完结线程切换的要害,以ObserveOn为例,在履行subscribe时,会创立Woker并传入ObserveOnObserver,在终究履行onNext等回调时,会调用Worker的schedule办法来切换线程。
2、咱们知道subscribeOn只有榜首次调用起作用,而ObserveOn每一次调用都起作用。那是因为每一次调用subscribeOn其实便是在包装一次observer与observable,不管包装多少次,都会以最里边一层也便是榜首次调用subscribeOn那一层为主,所以只有榜首次起作用。而ObserveOn是在subscribe后包装了observer,在observer的onXXX()的schedule()中进行的线程转化,所以每一次调用都有作用。