本篇以Rxjava最简短的调用流程为例来分析,下面是要分析的实例代码:
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("hahaha"); //执行二
- e.onComplete(); //执行四
- //e.tryOnError(new Throwable("error")); //执行四
- }
- }).subscribe(
- new Observer
() { - @Override
- public void onSubscribe(Disposable d) {
- disposable = d; //执行1
- }
-
- @Override
- public void onNext(String s) {
- //执行三
- }
-
- @Override
- public void onError(Throwable e) {
- //执行五
- }
-
- @Override
- public void onComplete() {
- //执行五
- }
- });
无论是看官方文档还是自己打log验证,都会得出以上一二三四五的执行步骤(步骤四会在onError()和onComplete()选其一执行)。开始接触遇到这样的代码设计多多少少会感觉到有点反人类啊?当时我的第一感觉先不管设计合不合理,我就想知道它是怎么执行下来的?
很明显能发现这是链式调用,链式调用的核心就在于每个方法都返回相同的对象,当先省略掉其中的回调细节,我们可能会看的更清晰...
按照从左往右的顺序以及()优先级,代码的执行顺序如上图所示,接下来对每一步进行分析:
1、创建被观察者对象
- Observable.create(
- //创建ObservableOnSubscribe接口的实例对象,
- new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("hahaha");
- e.onComplete();
- //e.tryOnError(new Throwable("error")); //执行四
- }
- }
- )
- //Observable.java
-
- //将上一步创建的ObservableOnSubscribe接口的实例对象传入
- public static
Observable create(ObservableOnSubscribe source) { - ObjectHelper.requireNonNull(source, "source is null");
- //创建ObservableCreate对象
- return RxJavaPlugins.onAssembly(new ObservableCreate
(source)); - }
-
-
- //ObservableCreate.java
- public final class ObservableCreate
extends Observable { - //将出入的ObservableOnSubscribe实例进行保存
- final ObservableOnSubscribe
source; -
- public ObservableCreate(ObservableOnSubscribe
source) { - this.source = source;
- }
- ...
- }
- //RxJavaPlugins.java
- //将上一步创建的ObservableCreate对象传入(ObservableCreate继承自Observable)
- public static
Observable onAssembly(@NonNull Observable source) { - //这部分是用于对Rxjava所有操作符的监听,本例中没有设置,属于干扰项不用看
- Function super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
到此被观察者对象创建完毕,即 ObservableCreate对象,它内部持有ObservableOnSubscribe对象。
2.创建观察者对象
- new Observer
() { - @Override
- public void onSubscribe(Disposable d) {
- disposable = d;
- }
-
- @Override
- public void onNext(String s) {}
-
- @Override
- public void onError(Throwable e) {}
-
- @Override
- public void onComplete() {}
- }
这一步比较简单,只是创建了一个Observer
3、执行subscribe(),将以上两者关联(订阅)
- //Observerable.java
- //将观察者对象传入
- public final void subscribe(Observer super T> observer) {
- try {
- ...
- //此方法是抽象方法,我们创建的被观察者对象是ObservableCreate,所以到此类中找方法实现
- subscribeActual(observer);
- ...
- } catch (NullPointerException e) { // NOPMD} catch (Throwable e) {}}
- }
-
-
- //ObservableCreate.java
- //传入观察者对象
- protected void subscribeActual(Observer super T> observer) {
- //创建发射器,这里将观察者传入CreateEmitter的构造参数
- CreateEmitter
parent = new CreateEmitter(observer); - //调用观察者的onSubscribe(),即开始所说的 --- 执行一
- observer.onSubscribe(parent);
-
- try {
- //source 是 ObservableOnSubscribe 对象,在此调用到 --- 执行二
- source.subscribe(parent);
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
回顾一下执行二处的代码:
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - Log.e("aaaaa:", "subscribe()");
- e.onNext("hahaha"); //执行二
- e.onComplete(); //执行四
- //e.tryOnError(new Throwable("error"));
-
- }
- })
- //ObservableCreate.java ObservableCreate.CreateEmitter内部类
- public void onNext(T t) {
-
- if (!isDisposed()) {
- //observer为构造函数中传入的观察者对象
- observer.onNext(t); //这里调用到--- 执行三
- }
- }
到此被观察者中发射的onNext()事件流程执行完毕,然后继续看执行四处发射了onComplete()或onError()事件。
- //ObservableCreate.java CreateEmitter内部类
- @Override
- public void onComplete() {
- if (!isDisposed()) {
- try {
- //调用观察者的onComplete()
- observer.onComplete();
- } finally {
- dispose();
- }
- }
- }
到此,调用到执行五,整个调用流程执行完毕。
总结:分析rxjava的调用流程,最重要的就是一定要弄清每个方法(或者说是操作符)传入的或涉及到的observable或observer对象的真实类型,rxjava中涉及很多重载和重写,看错对象的类型就会转晕在rxjava的源码中。