public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
super(source);//此source会保存到成员变量,变量的定义在它的父类
this.scheduler = scheduler;
}
ObservableSource source是我们用rxjava调用Observable.create、map、subscribeOn等时,由rxjava自动创建的对象所传入的,其实就是调用方的Observable,,如下:
public final Observable subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
//this就是调用方的对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
this由ObservableSubscribeOn构造传入,相当于上游的Observable传递到下游的Observable(ObservableSubscribeOn),也就是下游的Observable会持有上游的Observable对象。
当最后调用subscribe,也就是Observable.subscribe开始订阅时,最终会调用subscribeActual,如下
//此函数是所有Observable都会有的方法,是由rxjava实现
public final void subscribe(@NonNull Observer super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);//真正调用的函数
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
再看一下subscribeActual方法,我还是以ObservableSubscribeOn为例,方便跟踪代码跟讲解
public void subscribeActual(final Observer super T> observer) {
final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
//此处就是调用的上游的Observable,scheduler.scheduleDirect会调用SubscribeTask类里面的run方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//是个Runnable,可以由线程池调用的,切换线程原理就是如此
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver parent;
SubscribeTask(SubscribeOnObserver parent) {
this.parent = parent;
}
@Override
public void run() {
//source是上游的Observable
source.subscribe(parent);
}
}
Observable里的subscribe方法都会调用上游的(Observable的subscribe),最终会调用到Observable.create或者Observable.just的subscribeActual方法。
看一下create调用
//kotlin代码,{}大括号就是我们开发者实例化的Observable对象
Observable.create {
it.onNext("")
it.onComplete()
}
然后rxjava会再给包装一下,如下
//source就是我们自己new的对象
public static Observable create(@NonNull ObservableOnSubscribe source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
//这个构造传进来的source其实是最顶层的Observable,是由我们开发者自行创建实例的
public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
如下,看一下ObservableCreate类的subscribeActual方法
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
//source是开发者创建的对象(ObservableOnSubscribe的匿名函数,会实现subscribe方法,就是下面调用的)
source.subscribe(parent);//123
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
上面的source.subscribe(parent);//123,其中subscribe其实是调用的下面代码中大括号里的内容,因为参数是(ObservableEmitter emitter),所以上面实例化一个CreateEmitter()
//kotlin代码,{}大括号就是我们开发者实例化的Observable对象
Observable.create {
it.onNext("")
it.onComplete()
}
it.onNext(“”)其实就是调用CreateEmitter()对象里面的onNext
public void onNext(T t) {
if (!isDisposed()) {
//observer其实是下游的observer对象,由subscribe参数传给subscribeActual,再由new CreateEmitter<>(observer)构造传入
observer.onNext(t);
}
}
依据上面代码可以看出,observer.onNext(t);是由上游的observer调用下游的observer里的onNext,下面理解一下
首先每个Observable的子类(比如:ObservableSubscribeOn)都会有一个由Emitter或者Observer实现的一个类(比如:SubscribeOnObserver、CreateEmitter),它们都有onNext等方法。
rxjava结尾会调用subscribe,参数是一个Observer,如果参数是Consumer的话,rxjava也会帮忙包装成Observer的,如下
public final Disposable subscribe(@NonNull Consumer super T> onNext, @NonNull Consumer super Throwable> onError,
@NonNull Action onComplete) {
//包装类
LambdaObserver ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
return ls;
}
rxjava结尾会调用subscribe的参数Observer是开发实例化的最下游的Observer,其中subscribe的调用者是属于最下游的Observable,它会将subscribe的参数Observer保存起来,并传给上游,如下:
public void subscribeActual(final Observer super T> observer) {
final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver parent;
SubscribeTask(SubscribeOnObserver parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask里面执行source.subscribe(parent);
parent是下游的Observer,通过subscribe参数传给上游,所以上面在create里面执行onNext时,里面会继续调用下游的onNext,如下
public void onNext(T t) {
if (!isDisposed()) {
//observer就是由subscribe参数传进来的
observer.onNext(t);
}
}
总结:onNext(T)数据是由上向下传递的。
上面也可以看出来为什么subscribeOn切换线程时,只有第一次有效,看下面伪代码
subscribe{//最后调用的,最下游
thread{
subscribe{
subscribe{
thread{
subscribe({//最上游
it.onNext()//开发者调用的,不是最下游匿名函数的onNext
it.onComplete()
})
}
}
}
}
}