• rxjava 工作原理分析 调用链分析


    看代码

    以Rxjava的ObservableSubscribeOn举例

    public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
            super(source);//此source会保存到成员变量,变量的定义在它的父类
            this.scheduler = scheduler;
        }
    
    • 1
    • 2
    • 3
    • 4

    ObservableSource source是我们用rxjava调用Observable.create、map、subscribeOn等时,由rxjava自动创建的对象所传入的,其实就是调用方的Observable,,如下:

    public final Observable subscribeOn(@NonNull Scheduler scheduler) {
            Objects.requireNonNull(scheduler, "scheduler is null");
            //this就是调用方的对象
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    this由ObservableSubscribeOn构造传入,相当于上游的Observable传递到下游的Observable(ObservableSubscribeOn),也就是下游的Observable会持有上游的Observable对象。

    当最后调用subscribe,也就是Observable.subscribe开始订阅时,最终会调用subscribeActual,如下

    //此函数是所有Observable都会有的方法,是由rxjava实现
    public final void subscribe(@NonNull Observer observer) {
            Objects.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
                subscribeActual(observer);//真正调用的函数
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    再看一下subscribeActual方法,我还是以ObservableSubscribeOn为例,方便跟踪代码跟讲解

    public void subscribeActual(final Observer observer) {
            final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer);
    
            observer.onSubscribe(parent);
            //此处就是调用的上游的Observable,scheduler.scheduleDirect会调用SubscribeTask类里面的run方法
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    //是个Runnable,可以由线程池调用的,切换线程原理就是如此
    final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver parent;
    
            SubscribeTask(SubscribeOnObserver parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //source是上游的Observable
                source.subscribe(parent);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    总结

    Observable里的subscribe方法都会调用上游的(Observable的subscribe),最终会调用到Observable.create或者Observable.just的subscribeActual方法。
    
    • 1

    继续以Observable.create来说,方便理解

    看一下create调用

    //kotlin代码,{}大括号就是我们开发者实例化的Observable对象
    Observable.create {
                it.onNext("")
                it.onComplete()
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后rxjava会再给包装一下,如下

    //source就是我们自己new的对象
    public static  Observable create(@NonNull ObservableOnSubscribe source) {
            Objects.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    //这个构造传进来的source其实是最顶层的Observable,是由我们开发者自行创建实例的
    public ObservableCreate(ObservableOnSubscribe source) {
            this.source = source;
        }
    
    • 1
    • 2
    • 3
    • 4

    如下,看一下ObservableCreate类的subscribeActual方法

    protected void subscribeActual(Observer observer) {
            CreateEmitter parent = new CreateEmitter<>(observer);
            observer.onSubscribe(parent);
    
            try {
                //source是开发者创建的对象(ObservableOnSubscribe的匿名函数,会实现subscribe方法,就是下面调用的)
                source.subscribe(parent);//123
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    上面的source.subscribe(parent);//123,其中subscribe其实是调用的下面代码中大括号里的内容,因为参数是(ObservableEmitter emitter),所以上面实例化一个CreateEmitter()

    //kotlin代码,{}大括号就是我们开发者实例化的Observable对象
    Observable.create {
                it.onNext("")
                it.onComplete()
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    it.onNext(“”)其实就是调用CreateEmitter()对象里面的onNext

    public void onNext(T t) {
                if (!isDisposed()) {
                    //observer其实是下游的observer对象,由subscribe参数传给subscribeActual,再由new CreateEmitter<>(observer)构造传入
                    observer.onNext(t);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    依据上面代码可以看出,observer.onNext(t);是由上游的observer调用下游的observer里的onNext,下面理解一下

    首先每个Observable的子类(比如:ObservableSubscribeOn)都会有一个由Emitter或者Observer实现的一个类(比如:SubscribeOnObserver、CreateEmitter),它们都有onNext等方法。
    rxjava结尾会调用subscribe,参数是一个Observer,如果参数是Consumer的话,rxjava也会帮忙包装成Observer的,如下

    public final Disposable subscribe(@NonNull Consumer onNext, @NonNull Consumer onError,
                @NonNull Action onComplete) {
                    //包装类
            LambdaObserver ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
    
            subscribe(ls);
    
            return ls;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    rxjava结尾会调用subscribe的参数Observer是开发实例化的最下游的Observer,其中subscribe的调用者是属于最下游的Observable,它会将subscribe的参数Observer保存起来,并传给上游,如下:

    public void subscribeActual(final Observer observer) {
            final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer);
    
            observer.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
    final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver parent;
    
            SubscribeTask(SubscribeOnObserver parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    SubscribeTask里面执行source.subscribe(parent);
    parent是下游的Observer,通过subscribe参数传给上游,所以上面在create里面执行onNext时,里面会继续调用下游的onNext,如下

    public void onNext(T t) {
                if (!isDisposed()) {
                    //observer就是由subscribe参数传进来的
                    observer.onNext(t);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    总结:onNext(T)数据是由上向下传递的。
    上面也可以看出来为什么subscribeOn切换线程时,只有第一次有效,看下面伪代码

    subscribe{//最后调用的,最下游
        thread{
            subscribe{
                subscribe{
                    thread{
                        subscribe({//最上游
                            it.onNext()//开发者调用的,不是最下游匿名函数的onNext
                            it.onComplete()
                        })
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    开源的远程桌面软件RustDesk
    java毕业设计参考文献基于SSM学生成绩管理系统
    STC单片机23——T2定时器的使用
    RDD编程_第五章笔记
    macOS 安装brew
    Android intent的一些小使用
    “编辑微信小程序与后台数据交互与微信小程序wxs的使用“
    PHP自增构造_GET
    【突发】国内大量家用路由器网络访问异常和流量劫持事件分析
    激光焊接汽车尼龙塑料配件透光率测试仪
  • 原文地址:https://blog.csdn.net/buyaoshitududongwo/article/details/126027370