• RxJava 一篇文章就够了


    使用篇

    引入库:

    api 'io.reactivex.rxjava2:rxjava:2.1.3'
    1. public class MainActivity extends AppCompatActivity {
    2. @Override
    3. protected void onCreate(Bundle savedInstanceState) {
    4. super.onCreate(savedInstanceState);
    5. setContentView(R.layout.activity_main);
    6. Observable.create(new ObservableOnSubscribe() {//被观察者
    7. @Override
    8. public void subscribe(ObservableEmitter e) throws Exception {
    9. e.onNext("hello");
    10. }
    11. }).map(new Function() {//一个是传入参数类型,一个是返回参数类型
    12. @Override
    13. public String apply(String s) throws Exception {
    14. return "ok";
    15. }
    16. }).map(new Function() {//新的变化
    17. @Override
    18. public Integer apply(String s) throws Exception {
    19. return 100;
    20. }
    21. }).subscribe(new Consumer() {//观察者
    22. @Override
    23. public void accept(Integer integer) throws Exception {
    24. Log.d("lzy", integer + "");
    25. }
    26. });
    27. }
    28. }

    结果:

    D/lzy: 100

    被观察者经过一条链路的转化传递给观察者,是一种基于事件流的链式调用。

    原理篇

    从使用上来看RxJava是基于观察者模式设计的,那么就来分析一下其中的原理吧。

    我们先跳过变换流程,简化一下RxJava使用:

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. e.onNext("lzy真帅");
    5. }
    6. }).subscribe(new Observer() {
    7. @Override
    8. public void onSubscribe(Disposable d) {
    9. }
    10. @Override
    11. public void onNext(String value) {
    12. Log.e("lzy", value);
    13. }
    14. @Override
    15. public void onError(Throwable e) {
    16. }
    17. @Override
    18. public void onComplete() {
    19. }
    20. });

    输出结果:

    2022-09-21 03:47:05.925 5251-5251/com.example.retrofit_demo E/lzy: lzy真帅
    

    这样我们就将使用变成了最简单的被观察者,观察者,订阅。

    首先来看观察者:

    1. public interface Observer {
    2. void onSubscribe(Disposable d);
    3. void onNext(T value);
    4. void onError(Throwable e);
    5. void onComplete();
    6. }

    这个观察者采用了匿名内部类的方式,所以观察者的实现很简单,就是一个实现了Observer接口的对象。

    再来看subscribe做了什么事情:

    1. public final void subscribe(Observersuper T> observer) {
    2. ···
    3. try {
    4. ···
    5. subscribeActual(observer);//1
    6. } catch (NullPointerException e) { // NOPMD
    7. throw e;
    8. } catch (Throwable e) {
    9. ···
    10. }
    11. }

    subscribe里面调用了subscribeActual:

    protected abstract void subscribeActual(Observersuper T> observer);

    我们发现这是一个抽象方法,那么具体的实现类是什么呢?

    我们来看Observable.create:

    1. public static Observable create(ObservableOnSubscribe source) {
    2. ObjectHelper.requireNonNull(source, "source is null");
    3. return RxJavaPlugins.onAssembly(new ObservableCreate(source));
    4. }
    5. public static Observable onAssembly(Observable source) {
    6. Function f = onObservableAssembly;
    7. if (f != null) {
    8. return apply(f, source);
    9. }
    10. return source;
    11. }

    create方法返回了一个ObservableCreate对象:

    1. public final class ObservableCreate extends Observable {
    2. ···
    3. final ObservableOnSubscribe source;
    4. public ObservableCreate(ObservableOnSubscribe source) {
    5. this.source = source;
    6. }
    7. @Override
    8. protected void subscribeActual(Observersuper T> observer) {
    9. CreateEmitter parent = new CreateEmitter(observer);//1
    10. observer.onSubscribe(parent);//2
    11. try {
    12. source.subscribe(parent);//3
    13. } catch (Throwable ex) {
    14. Exceptions.throwIfFatal(ex);
    15. parent.onError(ex);
    16. }
    17. }
    18. ···
    19. }

    我们注意注释1处:对observer进行了一次封装,然后就调用了observer的onSubscribe方法,所以回调中的几个方法这个方法最先调用。接着看注释3:

    1. public interface ObservableOnSubscribe {
    2. void subscribe(ObservableEmitter e) throws Exception;
    3. }

    create方法传入的是一个实现这个接口的对象,调用实现类的subscribe方法:

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. e.onNext("lzy真帅");//调用
    5. }
    6. })

    而parent类型的定义是这样的:

    1. static final class CreateEmitter
    2. extends AtomicReference
    3. implements ObservableEmitter, Disposable {
    4. private static final long serialVersionUID = -3434801548987643227L;
    5. final Observersuper T> observer;
    6. CreateEmitter(Observersuper T> observer) {
    7. this.observer = observer;
    8. }
    9. @Override
    10. public void onNext(T t) {
    11. if (t == null) {
    12. onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
    13. return;
    14. }
    15. if (!isDisposed()) {
    16. observer.onNext(t);//1
    17. }
    18. }
    19. @Override
    20. public void onError(Throwable t) {
    21. if (t == null) {
    22. t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    23. }
    24. if (!isDisposed()) {
    25. try {
    26. observer.onError(t);
    27. } finally {
    28. dispose();
    29. }
    30. } else {
    31. RxJavaPlugins.onError(t);
    32. }
    33. }
    34. @Override
    35. public void onComplete() {
    36. if (!isDisposed()) {
    37. try {
    38. observer.onComplete();
    39. } finally {
    40. dispose();
    41. }
    42. }
    43. }
    44. @Override
    45. public void setDisposable(Disposable d) {
    46. DisposableHelper.set(this, d);
    47. }
    48. @Override
    49. public void setCancellable(Cancellable c) {
    50. setDisposable(new CancellableDisposable(c));
    51. }
    52. @Override
    53. public ObservableEmitter serialize() {
    54. return new SerializedEmitter(this);
    55. }
    56. @Override
    57. public void dispose() {
    58. DisposableHelper.dispose(this);
    59. }
    60. @Override
    61. public boolean isDisposed() {
    62. return DisposableHelper.isDisposed(get());
    63. }
    64. }

    关注一下注释1,整个调用流程就此实现。

    我们放一张图总结一下整个调用流程:

     我们最基本的流程了解了以后,继续拓展变换流程:

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. e.onNext("lzy真帅");
    5. }
    6. }).map(new Function() {
    7. @Override
    8. public String apply(String s) throws Exception {
    9. return "你也很帅!";
    10. }
    11. }).map(new Function() {
    12. @Override
    13. public String apply(String s) throws Exception {
    14. return "大家好 我是大帅哥!";
    15. }
    16. }).subscribe(new Observer() {
    17. @Override
    18. public void onSubscribe(Disposable d) {
    19. }
    20. @Override
    21. public void onNext(String value) {
    22. Log.e("lzy", value);
    23. }
    24. @Override
    25. public void onError(Throwable e) {
    26. }
    27. @Override
    28. public void onComplete() {
    29. }
    30. });

    打印结果:

    2022-09-21 04:50:06.519 5606-5606/com.example.retrofit_demo E/lzy: 大家好 我是大帅哥!

    我们从上向下来分析:

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. e.onNext("lzy真帅");
    5. }
    6. })

    这段代码我们前面已经分析过来,再来复习一遍:

    1. public static Observable create(ObservableOnSubscribe source) {
    2. ObjectHelper.requireNonNull(source, "source is null");
    3. return RxJavaPlugins.onAssembly(new ObservableCreate(source));
    4. }

    create方法传入一个实现ObservableOnSubscribe接口的对象并返回一个ObservableCreate。

    然后来看map方法:

    1. public final Observable map(Functionsuper T, ? extends R> mapper) {
    2. ObjectHelper.requireNonNull(mapper, "mapper is null");
    3. return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
    4. }

    我们只要关注一下这个this是什么?很明显就是调用这个map方法的对象,所以第一次这个this是

    一个ObservableCreate对象。我们再来细看一下这个ObservableMap:

    1. public final class ObservableMap extends AbstractObservableWithUpstream {
    2. final Functionsuper T, ? extends U> function;
    3. public ObservableMap(ObservableSource source, Functionsuper T, ? extends U> function) {
    4. super(source);
    5. this.function = function;
    6. }
    7. @Override
    8. public void subscribeActual(Observersuper U> t) {
    9. source.subscribe(new MapObserver(t, function));
    10. }
    11. static final class MapObserver extends BasicFuseableObserver {
    12. final Functionsuper T, ? extends U> mapper;
    13. MapObserver(Observersuper U> actual, Functionsuper T, ? extends U> mapper) {
    14. super(actual);
    15. this.mapper = mapper;
    16. }
    17. @Override
    18. public void onNext(T t) {
    19. if (done) {
    20. return;
    21. }
    22. if (sourceMode != NONE) {
    23. actual.onNext(null);
    24. return;
    25. }
    26. U v;
    27. try {
    28. v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    29. } catch (Throwable ex) {
    30. fail(ex);
    31. return;
    32. }
    33. actual.onNext(v);
    34. }
    35. @Override
    36. public int requestFusion(int mode) {
    37. return transitiveBoundaryFusion(mode);
    38. }
    39. @Override
    40. public U poll() throws Exception {
    41. T t = qs.poll();
    42. return t != null ? ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
    43. }
    44. }
    45. }

    再来看:

    1. abstract class AbstractObservableWithUpstream extends Observable implements HasUpstreamObservableSource {
    2. /** The source consumable Observable. */
    3. protected final ObservableSource source;
    4. /**
    5. * Constructs the ObservableSource with the given consumable.
    6. * @param source the consumable Observable
    7. */
    8. AbstractObservableWithUpstream(ObservableSource source) {
    9. this.source = source;
    10. }
    11. @Override
    12. public final ObservableSource source() {
    13. return source;
    14. }
    15. }

    所以ObservableMap也是一个Observable具体实现类。第二次调用map方法也是一样的,不同的是source对象变了,第一次是ObservableCreate类型的,第二次是ObservableMap类型的。

    好了,这段流程先放一边,来看subscribe,我们来看ObservableMap的方法:

    1. public void subscribeActual(Observersuper U> t) {
    2. source.subscribe(new MapObserver(t, function));
    3. }

    前面说过,第二层source其实是一个ObservableMap,我们发现这个类里面并没有subscribe方法,于是去父类去找,我们通过前面源码分析知道,ObservableMap的也是一个Observable,而前面分析过这个方法:

    1. public final void subscribe(Observersuper T> observer) {
    2. ···
    3. subscribeActual(observer);
    4. ···
    5. }

    第二层直接将观察者传进去,封装成一个MapObserver,但是第二层调用的时候这个source其实是第一层传过来的ObservableMap对象,调用了第一层ObservableMap对象的subscribe方法,而第一层的source其实是一个ObservableCreate对象,而且观察者又被封装了一层。

    最终触发:

    1. bservable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(ObservableEmitter e) throws Exception {
    4. e.onNext("lzy真帅");
    5. }
    6. })

    这个流程我们上面分析过,就不再赘述。其实就是调用了二级包裹的onNext方法。

    1. static final class MapObserver extends BasicFuseableObserver {
    2. final Functionsuper T, ? extends U> mapper;
    3. MapObserver(Observersuper U> actual, Functionsuper T, ? extends U> mapper) {
    4. super(actual);
    5. this.mapper = mapper;
    6. }
    7. @Override
    8. public void onNext(T t) {
    9. if (done) {
    10. return;
    11. }
    12. if (sourceMode != NONE) {
    13. actual.onNext(null);
    14. return;
    15. }
    16. U v;
    17. try {
    18. v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");//1
    19. } catch (Throwable ex) {
    20. fail(ex);
    21. return;
    22. }
    23. actual.onNext(v);//2
    24. }

    注释1执行二级包裹的apply方法,t是前面传过来的。注释二这个actual很有意思,我们接着看:

    1. public abstract class BasicFuseableObserver implements Observer, QueueDisposable {
    2. /** The downstream subscriber. */
    3. protected final Observersuper R> actual;
    4. ···
    5. public BasicFuseableObserver(Observersuper R> actual) {
    6. this.actual = actual;
    7. }
    8. ···}

    作为二级包裹来说这其实就是一级包裹,执行一级包裹的onNext,然后一级包裹再去执行真正观察者的onNext。注意一下这个注释2中的参数v,其实就是上一层的结果

    分析完RxJava的核心原理,不得不去感叹设计的巧妙,这里面的思想值得每一个开发者去思考,去领会,去融会贯通。

  • 相关阅读:
    在Windows Sever2012中配置NTP服务器
    自动化测试-Xpath
    Sqoop(二):Hive导出数据到Oracle
    中国芯片金字塔成形,商业化拐点将至
    2022/11/22 [easyx]关于字符和一些背景
    少数人的晚餐-数据
    【晶振专题】案例:晶振供应商提供的晶振匹配测试报告能看出什么?
    这部分代码有没有优化的空间:假如day天数不固定,pd.concat则也不固定?
    常用黄芪泡水喝,身体能得到什么?学会搭配,养生效果或会翻倍
    检测设备是否插入有线网
  • 原文地址:https://blog.csdn.net/qq_36428821/article/details/126964870