引入库:
api 'io.reactivex.rxjava2:rxjava:2.1.3'
- public class MainActivity extends AppCompatActivity {
-
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_main);
- Observable.create(new ObservableOnSubscribe
() {//被观察者 - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("hello");
- }
- }).map(new Function
() {//一个是传入参数类型,一个是返回参数类型 - @Override
- public String apply(String s) throws Exception {
- return "ok";
- }
- }).map(new Function
() {//新的变化 - @Override
- public Integer apply(String s) throws Exception {
- return 100;
- }
- }).subscribe(new Consumer
() {//观察者 - @Override
- public void accept(Integer integer) throws Exception {
- Log.d("lzy", integer + "");
- }
- });
- }
-
- }
结果:
被观察者经过一条链路的转化传递给观察者,是一种基于事件流的链式调用。
从使用上来看RxJava是基于观察者模式设计的,那么就来分析一下其中的原理吧。
我们先跳过变换流程,简化一下RxJava使用:
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("lzy真帅");
- }
- }).subscribe(new Observer
() { - @Override
- public void onSubscribe(Disposable d) {
-
- }
-
- @Override
- public void onNext(String value) {
- Log.e("lzy", value);
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
输出结果:
2022-09-21 03:47:05.925 5251-5251/com.example.retrofit_demo E/lzy: lzy真帅
这样我们就将使用变成了最简单的被观察者,观察者,订阅。
首先来看观察者:
- public interface Observer
{ -
- void onSubscribe(Disposable d);
-
- void onNext(T value);
-
- void onError(Throwable e);
-
- void onComplete();
-
- }
这个观察者采用了匿名内部类的方式,所以观察者的实现很简单,就是一个实现了Observer接口的对象。
再来看subscribe做了什么事情:
- public final void subscribe(Observer super T> observer) {
- ···
- try {
- ···
- subscribeActual(observer);//1
- } catch (NullPointerException e) { // NOPMD
- throw e;
- } catch (Throwable e) {
- ···
- }
- }
subscribe里面调用了subscribeActual:
protected abstract void subscribeActual(Observer super T> observer);
我们发现这是一个抽象方法,那么具体的实现类是什么呢?
我们来看Observable.create:
- public static
Observable create(ObservableOnSubscribe source) { - ObjectHelper.requireNonNull(source, "source is null");
- return RxJavaPlugins.onAssembly(new ObservableCreate
(source)); - }
-
- public static
Observable onAssembly(Observable source) { - Function
f = onObservableAssembly; - if (f != null) {
- return apply(f, source);
- }
- return source;
- }
create方法返回了一个ObservableCreate对象:
- public final class ObservableCreate
extends Observable { -
- ···
-
- final ObservableOnSubscribe
source; -
- public ObservableCreate(ObservableOnSubscribe
source) { - this.source = source;
- }
-
- @Override
- protected void subscribeActual(Observer super T> observer) {
- CreateEmitter
parent = new CreateEmitter(observer);//1 - observer.onSubscribe(parent);//2
-
- try {
- source.subscribe(parent);//3
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- parent.onError(ex);
- }
- }
- ···
-
- }
我们注意注释1处:对observer进行了一次封装,然后就调用了observer的onSubscribe方法,所以回调中的几个方法这个方法最先调用。接着看注释3:
- public interface ObservableOnSubscribe
{ -
- void subscribe(ObservableEmitter
e) throws Exception; - }
create方法传入的是一个实现这个接口的对象,调用实现类的subscribe方法:
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("lzy真帅");//调用
- }
- })
而parent类型的定义是这样的:
- static final class CreateEmitter
- extends AtomicReference
- implements ObservableEmitter
, Disposable { -
-
- private static final long serialVersionUID = -3434801548987643227L;
-
- final Observer super T> observer;
-
- CreateEmitter(Observer super T> observer) {
- this.observer = observer;
- }
-
- @Override
- public void onNext(T t) {
- if (t == null) {
- onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
- return;
- }
- if (!isDisposed()) {
- observer.onNext(t);//1
- }
- }
-
- @Override
- public void onError(Throwable t) {
- if (t == null) {
- t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
- }
- if (!isDisposed()) {
- try {
- observer.onError(t);
- } finally {
- dispose();
- }
- } else {
- RxJavaPlugins.onError(t);
- }
- }
-
- @Override
- public void onComplete() {
- if (!isDisposed()) {
- try {
- observer.onComplete();
- } finally {
- dispose();
- }
- }
- }
-
- @Override
- public void setDisposable(Disposable d) {
- DisposableHelper.set(this, d);
- }
-
- @Override
- public void setCancellable(Cancellable c) {
- setDisposable(new CancellableDisposable(c));
- }
-
- @Override
- public ObservableEmitter
serialize() { - return new SerializedEmitter
(this); - }
-
- @Override
- public void dispose() {
- DisposableHelper.dispose(this);
- }
-
- @Override
- public boolean isDisposed() {
- return DisposableHelper.isDisposed(get());
- }
- }
关注一下注释1,整个调用流程就此实现。
我们放一张图总结一下整个调用流程:

我们最基本的流程了解了以后,继续拓展变换流程:
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("lzy真帅");
- }
- }).map(new Function
() { - @Override
- public String apply(String s) throws Exception {
- return "你也很帅!";
- }
- }).map(new Function
() { - @Override
- public String apply(String s) throws Exception {
- return "大家好 我是大帅哥!";
- }
- }).subscribe(new Observer
() { - @Override
- public void onSubscribe(Disposable d) {
-
- }
-
- @Override
- public void onNext(String value) {
- Log.e("lzy", value);
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
打印结果:
2022-09-21 04:50:06.519 5606-5606/com.example.retrofit_demo E/lzy: 大家好 我是大帅哥!
我们从上向下来分析:
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("lzy真帅");
- }
- })
这段代码我们前面已经分析过来,再来复习一遍:
- public static
Observable create(ObservableOnSubscribe source) { - ObjectHelper.requireNonNull(source, "source is null");
- return RxJavaPlugins.onAssembly(new ObservableCreate
(source)); - }
create方法传入一个实现ObservableOnSubscribe接口的对象并返回一个ObservableCreate。
然后来看map方法:
- public final
Observable map(Function super T, ? extends R> mapper) { - ObjectHelper.requireNonNull(mapper, "mapper is null");
- return RxJavaPlugins.onAssembly(new ObservableMap
(this, mapper)); - }
我们只要关注一下这个this是什么?很明显就是调用这个map方法的对象,所以第一次这个this是
一个ObservableCreate对象。我们再来细看一下这个ObservableMap:
- public final class ObservableMap
extends AbstractObservableWithUpstream { - final Function super T, ? extends U> function;
-
- public ObservableMap(ObservableSource
source, Function super T, ? extends U> function) { - super(source);
- this.function = function;
- }
-
- @Override
- public void subscribeActual(Observer super U> t) {
- source.subscribe(new MapObserver
(t, function)); - }
-
-
- static final class MapObserver
extends BasicFuseableObserver { - final Function super T, ? extends U> mapper;
-
- MapObserver(Observer super U> actual, Function super T, ? extends U> mapper) {
- super(actual);
- this.mapper = mapper;
- }
-
- @Override
- public void onNext(T t) {
- if (done) {
- return;
- }
-
- if (sourceMode != NONE) {
- actual.onNext(null);
- return;
- }
-
- U v;
-
- try {
- v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
- } catch (Throwable ex) {
- fail(ex);
- return;
- }
- actual.onNext(v);
- }
-
- @Override
- public int requestFusion(int mode) {
- return transitiveBoundaryFusion(mode);
- }
-
- @Override
- public U poll() throws Exception {
- T t = qs.poll();
- return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
- }
- }
- }
再来看:
- abstract class AbstractObservableWithUpstream
extends Observable implements HasUpstreamObservableSource { -
- /** The source consumable Observable. */
- protected final ObservableSource
source; -
- /**
- * Constructs the ObservableSource with the given consumable.
- * @param source the consumable Observable
- */
- AbstractObservableWithUpstream(ObservableSource
source) { - this.source = source;
- }
-
- @Override
- public final ObservableSource
source() { - return source;
- }
-
- }
所以ObservableMap也是一个Observable具体实现类。第二次调用map方法也是一样的,不同的是source对象变了,第一次是ObservableCreate类型的,第二次是ObservableMap类型的。
好了,这段流程先放一边,来看subscribe,我们来看ObservableMap的方法:
- public void subscribeActual(Observer super U> t) {
- source.subscribe(new MapObserver
(t, function)); - }
前面说过,第二层source其实是一个ObservableMap,我们发现这个类里面并没有subscribe方法,于是去父类去找,我们通过前面源码分析知道,ObservableMap的也是一个Observable,而前面分析过这个方法:
- public final void subscribe(Observer super T> observer) {
- ···
- subscribeActual(observer);
- ···
- }
第二层直接将观察者传进去,封装成一个MapObserver,但是第二层调用的时候这个source其实是第一层传过来的ObservableMap对象,调用了第一层ObservableMap对象的subscribe方法,而第一层的source其实是一个ObservableCreate对象,而且观察者又被封装了一层。
最终触发:
- bservable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
e) throws Exception { - e.onNext("lzy真帅");
- }
- })
这个流程我们上面分析过,就不再赘述。其实就是调用了二级包裹的onNext方法。
- static final class MapObserver
extends BasicFuseableObserver { - final Function super T, ? extends U> mapper;
-
- MapObserver(Observer super U> actual, Function super T, ? extends U> mapper) {
- super(actual);
- this.mapper = mapper;
- }
-
- @Override
- public void onNext(T t) {
- if (done) {
- return;
- }
-
- if (sourceMode != NONE) {
- actual.onNext(null);
- return;
- }
-
- U v;
-
- try {
- v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");//1
- } catch (Throwable ex) {
- fail(ex);
- return;
- }
- actual.onNext(v);//2
- }
注释1执行二级包裹的apply方法,t是前面传过来的。注释二这个actual很有意思,我们接着看:
- public abstract class BasicFuseableObserver
implements Observer, QueueDisposable { -
- /** The downstream subscriber. */
- protected final Observer super R> actual;
-
- ···
- public BasicFuseableObserver(Observer super R> actual) {
- this.actual = actual;
- }
-
- ···}
作为二级包裹来说这其实就是一级包裹,执行一级包裹的onNext,然后一级包裹再去执行真正观察者的onNext。注意一下这个注释2中的参数v,其实就是上一层的结果。
分析完RxJava的核心原理,不得不去感叹设计的巧妙,这里面的思想值得每一个开发者去思考,去领会,去融会贯通。