• Rxjava学习(一)简单分析Rxjava调用流程


    本篇以Rxjava最简短的调用流程为例来分析,下面是要分析的实例代码:

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. e.onNext("hahaha"); //执行二
    5. e.onComplete(); //执行四
    6. //e.tryOnError(new Throwable("error")); //执行四
    7. }
    8. }).subscribe(
    9. new Observer() {
    10. @Override
    11. public void onSubscribe(Disposable d) {
    12. disposable = d; //执行1
    13. }
    14. @Override
    15. public void onNext(String s) {
    16. //执行三
    17. }
    18. @Override
    19. public void onError(Throwable e) {
    20. //执行五
    21. }
    22. @Override
    23. public void onComplete() {
    24. //执行五
    25. }
    26. });

    无论是看官方文档还是自己打log验证,都会得出以上一二三四五的执行步骤(步骤四会在onError()和onComplete()选其一执行)。开始接触遇到这样的代码设计多多少少会感觉到有点反人类啊?当时我的第一感觉先不管设计合不合理,我就想知道它是怎么执行下来的?

    很明显能发现这是链式调用,链式调用的核心就在于每个方法都返回相同的对象,当先省略掉其中的回调细节,我们可能会看的更清晰...

    按照从左往右的顺序以及()优先级,代码的执行顺序如上图所示,接下来对每一步进行分析:

    1、创建被观察者对象

    1. Observable.create(
    2. //创建ObservableOnSubscribe接口的实例对象,
    3. new ObservableOnSubscribe() {
    4. @Override
    5. public void subscribe(ObservableEmitter e) throws Exception {
    6. e.onNext("hahaha");
    7. e.onComplete();
    8. //e.tryOnError(new Throwable("error")); //执行四
    9. }
    10. }
    11. )
    1. //Observable.java
    2. //将上一步创建的ObservableOnSubscribe接口的实例对象传入
    3. public static Observable create(ObservableOnSubscribe source) {
    4. ObjectHelper.requireNonNull(source, "source is null");
    5. //创建ObservableCreate对象
    6. return RxJavaPlugins.onAssembly(new ObservableCreate(source));
    7. }
    8. //ObservableCreate.java
    9. public final class ObservableCreate extends Observable {
    10. //将出入的ObservableOnSubscribe实例进行保存
    11. final ObservableOnSubscribe source;
    12. public ObservableCreate(ObservableOnSubscribe source) {
    13. this.source = source;
    14. }
    15. ...
    16. }
    1. //RxJavaPlugins.java
    2. //将上一步创建的ObservableCreate对象传入(ObservableCreate继承自Observable)
    3. public static Observable onAssembly(@NonNull Observable source) {
    4. //这部分是用于对Rxjava所有操作符的监听,本例中没有设置,属于干扰项不用看
    5. Functionsuper Observable, ? extends Observable> f = onObservableAssembly;
    6. if (f != null) {
    7. return apply(f, source);
    8. }
    9. return source;
    10. }

    到此被观察者对象创建完毕,即 ObservableCreate对象,它内部持有ObservableOnSubscribe对象。

    2.创建观察者对象

    1. new Observer() {
    2. @Override
    3. public void onSubscribe(Disposable d) {
    4. disposable = d;
    5. }
    6. @Override
    7. public void onNext(String s) {}
    8. @Override
    9. public void onError(Throwable e) {}
    10. @Override
    11. public void onComplete() {}
    12. }

    这一步比较简单,只是创建了一个Observer接口对象。

    3、执行subscribe(),将以上两者关联(订阅)

    1. //Observerable.java
    2. //将观察者对象传入
    3. public final void subscribe(Observersuper T> observer) {
    4. try {
    5. ...
    6. //此方法是抽象方法,我们创建的被观察者对象是ObservableCreate,所以到此类中找方法实现
    7. subscribeActual(observer);
    8. ...
    9. } catch (NullPointerException e) { // NOPMD} catch (Throwable e) {}}
    10. }
    1. //ObservableCreate.java
    2. //传入观察者对象
    3. protected void subscribeActual(Observersuper T> observer) {
    4. //创建发射器,这里将观察者传入CreateEmitter的构造参数
    5. CreateEmitter parent = new CreateEmitter(observer);
    6. //调用观察者的onSubscribe(),即开始所说的 --- 执行一
    7. observer.onSubscribe(parent);
    8. try {
    9. //source 是 ObservableOnSubscribe 对象,在此调用到 --- 执行二
    10. source.subscribe(parent);
    11. } catch (Throwable ex) {
    12. Exceptions.throwIfFatal(ex);
    13. parent.onError(ex);
    14. }
    15. }

    回顾一下执行二处的代码:

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. Log.e("aaaaa:", "subscribe()");
    5. e.onNext("hahaha"); //执行二
    6. e.onComplete(); //执行四
    7. //e.tryOnError(new Throwable("error"));
    8. }
    9. })
    1. //ObservableCreate.java ObservableCreate.CreateEmitter内部类
    2. public void onNext(T t) {
    3. if (!isDisposed()) {
    4. //observer为构造函数中传入的观察者对象
    5. observer.onNext(t); //这里调用到--- 执行三
    6. }
    7. }

    到此被观察者中发射的onNext()事件流程执行完毕,然后继续看执行四处发射了onComplete()或onError()事件。

    1. //ObservableCreate.java CreateEmitter内部类
    2. @Override
    3. public void onComplete() {
    4. if (!isDisposed()) {
    5. try {
    6. //调用观察者的onComplete()
    7. observer.onComplete();
    8. } finally {
    9. dispose();
    10. }
    11. }
    12. }

    到此,调用到执行五,整个调用流程执行完毕。 

    总结:分析rxjava的调用流程,最重要的就是一定要弄清每个方法(或者说是操作符)传入的或涉及到的observable或observer对象的真实类型,rxjava中涉及很多重载和重写,看错对象的类型就会转晕在rxjava的源码中。

  • 相关阅读:
    python 字典dict和列表list的读取速度问题, range合并
    spring security(二)--授权
    基于Springboot+Vue开发前后端端分离农产品进销存系统
    R语言的物种气候生态位动态量化与分布特征模拟实践技术
    Jaya算法在电力系统最优潮流计算中的应用(创新点)【Matlab代码实现】
    MySQL高可用搭建方案之(MHA)
    从0到1设计通用数据大屏搭建平台
    Android ThreadLocal
    LeetCode精选200道--双指针篇
    【LeetCode】118. 杨辉三角
  • 原文地址:https://blog.csdn.net/qq_27246079/article/details/127442152