本文以下面代码为例逐步解析

Observable.just("数据源")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return 1;
                    }
                })
                .filter(integer -> {
                    return integer == 1;
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }
                    @Override
                    public void onNext(Object o) {
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }

从just开端

 public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

回来了一个将传入的参数封装成了一个 ObservableJust目标 其他的Rxjava创立操作符类似:比如create(), just(),fromArray(),fromIterable(),timer(),interval()等

ObservableJust类


public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {  //将传入的参数赋值给value
        this.value = value;
    }
   //要点办法  稍后看
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }
    @Override
    public T call() {
        return value;
    }
}

map办法

因为just办法回来了一个ObservableJust目标,所以调用链的map办法调用的ObservableJust目标的map办法 可是咱们看到ObservableJust类中并没有map办法,所以去看他的父类Observable

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

在他的父类Observable中看到,map()依然是回来了一个ObservableMap目标,这个目标将当时目标(也便是上一步的ObservableJust目标)和map()传入的参数一起封装了起来 从上面的调用链来看便是这一段代码:

ObservableMap类

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source); //这儿的source也便是上一步的ObservableJust目标
        this.function = function; //这儿的function便是map便是map()传入的参数
    }
    //这个办法相同待会剖析
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
            @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
    }
 这时候发现ObservableMap和上面的ObservableJust类相同,都完成了subscribeActual()

filter办法

接着持续剖析调用链上的办法filter,相同咱们去ObservableMap父类里去找这个办法,他的父类AbstractObservableWithUpstream里面没有这个办法,可是AbstractObservableWithUpstream跟ObservableJust相同承继自Observable

public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }
  看到没 filter和前两个办法还是相同的套路,回来了一个ObservableFilter目标,不出意外这个ObservableFilter里面肯定也有一个subscribeActual办法,并且也是直接或许直接承继自Observable
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
    final Predicate<? super T> predicate;
    public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
        super(source); 
        this.predicate = predicate;
    }
    @Override
    public void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new FilterObserver<T>(observer, predicate));
    }
    static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
        final Predicate<? super T> filter;
        FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
            super(actual);
            this.filter = filter;
        }
        @Override
        public void onNext(T t) {
            if (sourceMode == NONE) {
                boolean b;
                try {
                    b = filter.test(t);
                } catch (Throwable e) {
                    fail(e);
                    return;
                }
                if (b) {
                    downstream.onNext(t);
                }
            } else {
                downstream.onNext(null);
            }
        }
}
一模相同的套路

subscribeOn

 public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); 
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;
        final AtomicReference<Disposable> upstream;
        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }
        @Override
        public void onComplete() {
            downstream.onComplete();
        }
        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

这个subscribeOn用于切换上游线程: 主要是这一句parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); scheduler便是咱们传入的Schedulers.io(),上面代码能够看到SubscribeTask是一个Runnable,run()里调用的sourcesource.subscribe(parent),还记得source吗,source便是调用链上一步回来的目标,也便是上一步的 ObservableFilter; 去看看Schedulers.io()回来的是个什么类

    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
     看到他回来的是一个Scheduler,去Scheduler中找scheduleDirect
  @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }
    持续往下追踪会发现终究将这个Runable通过各种封装,终究提交到一个线程池(ScheduledExecutorService)中去履行使命,这样就完成了SubscribeOn上游数据源代码的线程切换

至于下流代码线程切换来看ObserveOn

  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

ObserveOnObserver类:

mplements Observer<T>, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;
        SimpleQueue<T> queue;
        Disposable upstream;
        Throwable error;
        volatile boolean done;
        volatile boolean disposed;
        int sourceMode;
        boolean outputFused;
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                downstream.onSubscribe(this);
            }
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (!outputFused && getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
        @Override
        public boolean isDisposed() {
            return disposed;
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    boolean d = done;
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
        void drainFused() {
            int missed = 1;
            for (;;) {
                if (disposed) {
                    return;
                }
                boolean d = done;
                Throwable ex = error;
                if (!delayError && d && ex != null) {
                    disposed = true;
                    downstream.onError(error);
                    worker.dispose();
                    return;
                }
                downstream.onNext(null);
                if (d) {
                    disposed = true;
                    ex = error;
                    if (ex != null) {
                        downstream.onError(ex);
                    } else {
                        downstream.onComplete();
                    }
                    worker.dispose();
                    return;
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
        @Override
        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }
        @Nullable
        @Override
        public T poll() throws Exception {
            return queue.poll();
        }
        @Override
        public void clear() {
            queue.clear();
        }
        @Override
        public boolean isEmpty() {
            return queue.isEmpty();
        }
    }

不想看代码直接总结,从ObserveOnObserver类中发现他的onSubscribe,onNext,onError,OnNext办法都调用了schedule(),追踪schedule()发现,终究相同是把使命交给了线程池处理,在本例子中因为传递的是AndroidSchedulers.mainThread(),所以下流是切换到主线程履行,这儿是用了Handler将使命提交给主线程

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;
    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }
    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }
}

Rxjava调用链解析

终于到了终究一步suscribe

这儿调用的是Observable的subscribe

 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            subscribeActual(observer);//要点看这儿
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

看到subscribeActual()办法没,原来subscribe()里会调用subscribeActual;

现在往回追溯:

在subscribe办法中会调用当时目标的subscribeActual(),所以往回追溯他首先会去调ObservableObserveOn的subscribeActual(),参数便是终究传入的Observer

回忆一下ObservableObserveOn的subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

持续将Observer封装成ObserveOnObserver,然后调用source.subcribe(),source还记得吧,便是调用链上一步的回来的目标,也便是ObservableSubscribeOn,这个类没有完成subscribe,可是他的父类有这个办法,那不便是Observable的subcribe()吗?是的,也便是跟调用链终究一步调用的subcribe()是同一个办法,只不过他的参数是基于下流的参数的进一步封装,那么相同我他会调用到susscribeActual()

 @Override
    public void subscribeActual(final Observer<? super T> observer) {  //这儿的Observer便是将下流封装后的Observer
        //将oberser持续封装
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
//通过方才的剖析 这儿是将使命交给线程池处理,所以去看SubscribeTask的run()
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
// SubscribeTask的run()
        @Override
        public void run() {
            source.subscribe(parent); //相同持续调用source.subscribe,那么他也是同意是调用到调用链上一步回来目标的subscribeActual(),,也便是ObservableFilter目标目标
        }

不出意外ObservableFilter目标里也是将Observer持续封装,然后调用source.subscribe

  @Override
    public void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new FilterObserver<T>(observer, predicate));
    }

现在来到了第一步ObservableJust的subscribeActual():

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //将Observer和value进行封装,value便是咱们第一步传入的数据源了
        observer.onSubscribe(sd);
        sd.run();
    }
    //ScalarDisposable的run办法
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);   //这儿开端把数据源往下流传, value指数据源  observer便是下流一步一步封装的Observer啦
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

还记得回溯时封装的那些Observer吗?别离是MapObserver,FilterObserver,SubscribeOnObserver,ObserveOnObserver以及调用链上终究一步咱们自己自定义的Observer 别离再看他们的onNext(),其他办法套路一致

MapObserver:


        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
            //处理数据源,将数据源转换成想要的类型
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            // 持续调用下流Observer的onNext
            downstream.onNext(v);
        }

FilterObserver

 @Override
        public void onNext(T t) {
            if (sourceMode == NONE) {
                boolean b;
                try {
                // 数据判别
                    b = filter.test(t);
                } catch (Throwable e) {
                    fail(e);
                    return;
                }
                //满足过滤条件持续调用下流onNext
                if (b) {
                    downstream.onNext(t);
                }
            } else {
                downstream.onNext(null);
            }
        }

SubscribeOnObserver

因为subscribeOn只是起到切换上游线程的效果,所以对下流他不做任何操作,持续调用下流的onNext

     @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

ObserveOnObserver:

    @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();  //切换下流线程,将使命交给线程池或许主线程handler,然后调用下流onNext
        }

终究就调到咱们自定义的onNext()啦,整个流程就完毕了

总结一下

Rxjava的链式调用整个流程便是从下到上,由上而下 每一步的操作符都是将上游目标作为source封装成新的Observable,然后持续往下传递,直到终究的subsribe办法反向开端调用source.subscribe然后调用到每个soource目标的subscriActual(),每一步的subscribActual()又会将下流传递来的Observer一步步封装,直到传递到最上游,在最上游开端再一步步调用封装好的Observe的相关办法,这样就完成了将数据源传递到下流。

切换上游线程: 创立一个Task,承继自Runable,在Runable的run()里调用source.subscribe(),然后将这个Runable进一步封装,根据传递的参数创立对应的线程池或许主线程Handler,将Runable提交给线程池或许Handler去履行

切换下流线程: 封装的Observer的onSubscribe,onNext,onError,OnNext办法都调用了schedule(),追踪schedule()发现,终究相同是把使命交给了线程池处理,在本例子中因为传递的是AndroidSchedulers.mainThread(),所以下流是切换到主线程履行