• 【Rxjava详解】(一)观察者模式的拓展


    RxJava引入

    在介绍RxJava之前先说一下Rx。全称是Reactive Extensions,直译过来就是响应式扩展

    Rx基于观察者模式,它是一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。Rx已经渗透到了各个语言中,有RxJavaRxJSRxSwift等等

    总结一下RxJava的作用就是:异步

    但是RxJava的好处是简洁。异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android的AsyncTaskHandler其实都是为了让异步代码更加简洁。虽然RxJava的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

    扩展的观察者模式

    RxJava的异步实现,是通过一种扩展的观察者模式来实现的。

    观察者模式面向的需求是:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应。

    观察者不需要时刻盯着被观察者(例如A不需要每过2ms就检查一次B的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。

    通用的观察者模式:

    image

    RxJava作为一个工具库,使用的就是通用形式的观察者模式。

    RxJava的观察者模式

    RxJava的基本概念:

    • Observable(可观察者,即被观察者):产生事件,例如去饭店吃饭的顾客。
    • Observer(观察者):接收事件,并给出响应动作,例如去饭店吃饭的厨房,会接受事件,并给出相应。
    • subscribe()(订阅):连接被观察者与观察者,例如去饭店吃饭的服务员。
      ObservableObserver通过subscribe() 方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer
    • Event(事件):被观察者与观察者沟通的载体,例如顾客点的菜。

    与传统观察者模式不同,RxJava的事件回调方法除了普通事件onNext()(相当于onClick()/onEvent())之外,还定义了两个特殊的事件:onCompleted()onError():

    但是RxJava与传统的观察者设计模式有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。

    • onCompleted(): 事件队列完结。

      RxJava不仅把每个事件单独处理,还会把它们看做一个队列。RxJava规定,当不会再有新的onNext()发出时,需要触发onCompleted() 方法作为标志。

    • onError(): 事件队列异常。
      在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。

    • 在一个正确运行的事件序列中, onCompleted()onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    RxJava的观察者模式大致如下图:

    image

    基本实现

    基于上面的概念, RxJava的基本实现主要有三点:

    • 创建Observable

      Observable即被观察者,它决定什么时候触发事件以及触发怎样的事件RxJava使用Observable.create()方法来创建一个Observable,并为它定义事件触发规则

    • 创建Observer

      观察者,它决定事件触发的时候将有怎样的行为

      RxJava中的Observer接口的实现方式:

      Observer<String> observer = new Observer<String>() {
          @Override
          public void onNext(String s) {
              Log.d("xoliu", "Item: " + s);
          }
       
          @Override
          public void onCompleted() {
              Log.d("xoliu", "Completed!");
          }
       
          @Override
          public void onError(Throwable e) {
              Log.d("xoliu", "Error!");
           }
      };
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      RxJava还内置了一个实现了Observer的抽象类:Subscriber

      SubscriberObserver接口进行了一些扩展,但他们的基本使用方式是完全一样的。

      Subscriber<String> subscriber = new Subscriber<String>() {
          @Override
          public void onNext(String s) {
              Log.d(tag, "Item: " + s);
          }
       
          @Override
          public void onCompleted() {
              Log.d(tag, "Completed!");
          }
       
          @Override
           public void onError(Throwable e) {
              Log.d(tag, "Error!");
          }
      };
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16

      不仅基本使用方式一样,实质上,在RxJavasubscribe()过程中,Observer也总是会先被转换成一个Subscriber再使用。所以如果你只想使用基本功能,选择ObserverSubscriber是完全一样的。它们的区别对于使用者来说主要有两点:

      • onStart(): 这是Subscriber增加的方法。它会在subscribe()刚开始而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart()就不适用了,因为它总是在subscribe() 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用doOnSubscribe()方法,具体可以在后面的文中看到。

      • unsubscribe(): 这是Subscriber所实现的另一个接口Subscription的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用isUnsubscribed()先判断一下状态。

        unsubscribe()这个方法很重要,因为在subscribe()之后,Observable会持有 Subscriber的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如onPause()、onStop()等方法中)调用unsubscribe()来解除引用关系,以避免内存泄露的发生。

    • 调用subscribe()方法(订阅)

      创建了一个ObservableObserver之后,再用subscribe()方法将它们联结起来:

      observable.subscribe(observer);  
      // 或者:
      observable.subscribe(subscriber);
      
      • 1
      • 2
      • 3

      subscribe()这个方法有点怪:它看起来是observalbe订阅了observer/subscriber而不是observer/subscriber订阅了observalbe,这让人读起来有点别扭

    RxJava入门示例

    一个Observable可以发出零个或者多个事件,知道事件结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext()方法,最后调用Subscriber.onComplete()或者Subscriber.onError()结束。

    // 创建被观察者、数据源
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            // 这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable      
            // 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted()
            // 这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
            subscriber.onNext("Hello ");
            subscriber.onNext("World !");
            subscriber.onCompleted();//结束
        }
    });
    // 创建观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onCompleted() {
            Log.i("xoliu", "onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.i("xoliu", "onError");
        }
    
        @Override
        public void onNext(String s) {
            Log.i("xoliu", "onNext : " + s);
        }
    };
    // 订阅
    observable.subscribe(subscriber);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    一旦subscriber订阅了observableobservable就会调用subscriber对象的onNextonComplete方法,subscriber就会打印出Hello World.

    subscriber(Subscriber subscriber)做了3件事:

    • 调用Subscriber.onStart()是一个准备方法。
    • 调用Observable对象中的onSubscribe.call(Subscriber)。在这里,事件发送的逻辑开始运行。从这也可以看出,在RxJava中,Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。
    • 将传入的Subscriber作为Subscription返回。这是为了方便unsubscribe().

    整个过程中对象间的关系如下图:

    image

    RxJava内置了很多简化创建Observable对象的函数,

    • Observable.just()用来创建只发出一个事件就结束的Observable对象
    Observable<String> observable = Observable.just("Fuck u ", "World !");
    
    • 1

    接下来看看如何简化Subscriber,上面的例子中,我们其实并不关心onComplete()onError,我们只需要在onNext的时候做一些处理,这时候就可以使用Action1类。

    Action

    什么是Action
    Action是RxJava 的一个接口,常用的有Action0Action1。虽然Action0Action1在API中使用最广泛,但RxJava是提供了多个ActionX形式的接口(例如Action2, Action3)的,它们可以被用以包装不同的无返回值的方法。

    • Action0: 它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
    • Ation1:它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj)onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj)onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调
    Action1<String> action1 = new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i("xoliu", "Action1 call : " + s);
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Observable.subscribe()方法有一个重载版本,接受三个Action1类型的参数

    image

    所以上面的代码最终可以写成这样:

    Observable.just("Hello ", "World !").subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.i("xoliu", "call : " + s);
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里顺便多提一些subscribe()的多个Action参数:

    Action1<String> onNextAction = new Action1<String>() {
        // onNext()
        @Override
        public void call(String s) {
            Log.d(tag, s);
        }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() {
        // onError()
        @Override
        public void call(Throwable throwable) {
            // Error handling
        }
    };
    Action0 onCompletedAction = new Action0() {
        // onCompleted()
        @Override
        public void call() {
            Log.d(tag, "completed");
        }
    };
    
    observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    GPGGA NTRIP RTCM 笔记
    Spring是一个怎样的体系结构呢?
    1358:中缀表达式值(expr)
    经济型EtherCAT运动控制器(八):轴参数与运动指令
    MySQL8 Group By 新特性
    基于Springboot实现商品进销存管理系统
    【Linux Shell脚本攻略】第1章 小试牛刀
    前端项目练习(练习-005-webpack-03)
    kubectl声明式资源管理命令
    钢铁企业智慧制造解决方案及典型应用----工业软件讲坛第六次讲座
  • 原文地址:https://blog.csdn.net/weixin_73871834/article/details/134538826