• RxJava介绍及基本原理


    随着互联网的迅猛发展,Java已成为最广泛应用于后端开发的语言之一。而在处理异步操作和事件驱动编程方面,传统的Java多线程并不总是最佳选择。这时候,RxJava作为一个基于观察者模式、函数式编程和响应式编程理念的库,为我们提供了一种强大而灵活的解决方案。

    简介

    RxJava是 ReactiveX 家族的重要一员, ReactiveXReactive Extensions 的缩写,一般简写为 RxReactiveX官方给Rx的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口。
    在这里插入图片描述

    ReactiveX 不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。它拓展了观察者模式,使你能够自由组合多个异步事件,而不需要去关心线程同步,线程安全并发数据以及I/O阻塞

    RxJava在Java环境下使用,它通过Observable(可观测对象)和Subscriber(订阅者)来实现异步编程模型。Observable可以发射出一系列的数据流,而Subscriber则负责处理这些数据流。利用各种操作符,我们可以对数据流进行变换、过滤、合并等操作,从而完成复杂的异步任务。

    GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
    在这里插入图片描述
    结论: RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列组合异步和基于事件的程序的库;是一个 基于事件流、实现异步操作的库。

    Observables · ReactiveX文档中文翻译

    RxJava 是轻量级的

    RxJava尽力做到非常轻巧。它仅关注Observable的抽象和与之相关的高层函数,实现为一个单独的JAR文件。

    RxJava 是一个多语言实现

    RxJava 支持Java 6或者更新的版本,以及其它的JVM语言如 Groovy, Clojure, JRuby, KotlinScala。RxJava 可用于更多的语言环境,而不仅仅是Java和Scala,而且它致力于尊重每一种JVM语言的习惯。

    RxJava 第三方库

    下面是可与RxJava协作的第三方库:

    使用指南

    你可以在Maven Central http://search.maven.org 找到用于Maven, Ivy, Gradle, SBT和其它构建工具需要的二进制文件和依赖信息.

    Maven示例:

    <dependency>
          <groupId>io.reactivex.rxjava3groupId>
          <artifactId>rxjavaartifactId>
          <version>3.1.7version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    RxJava使用三步曲

    RxJava的使用可以概括为三个步骤:创建 Observable,定义 Observer 处理数据流,最后订阅(Subscribe)Observable。

    创建 Observable

    • 可以直接使用 Observable.just() 方法来创建一个发射固定数据项的 Observable;

    • 也可以通过 Observable.fromIterable() 方法来创建包含多个数据项的 Observable。

    Observable<String> observable = Observable.just("Hello", "World");
    
    • 1

    定义 Observer

    创建一个 Observer 对象并实现它的各个方法。在这些方法中,你可以处理每个发射的数据项、对错误进行处理,或者在数据全部发射完毕时执行一些操作。

    Observer<String> observer = new Observer<String>() {
    	@Override
    	public void onSubscribe(Disposable d) {
    		// 在此方法中进行一些初始化操作或资源管理
    	}
    
    	@Override
    	public void onNext(String s) {
    		// 处理每个发射的数据项
    		System.out.println(s);
    	}
    
    	@Override
    	public void onError(Throwable e) {
    		// 处理发生的异常情况
    	}
    
    	@Override
    	public void onComplete() {
    		// 完成所有的数据发射操作
    	}
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    订阅 Observable

    observable.subscribe(observer);
    
    • 1

    RxJava基本原理

    生活例子引入

    用一个生活例子引入,来介绍 RxJava的基本原理: 顾客到饭店吃饭
    在这里插入图片描述

    RxJava原理介绍

    • RxJava原理 基于 一种扩展的观察者模式

    • RxJava的扩展观察者模式中有4个角色:

    角色作用类比
    被观察者(Observable)产生事件顾客
    观察者(Observer)接收事件,并给出响应动作厨房
    订阅(Subscribe)连接 被观察者 & 观察者服务员
    事件(Event)被观察者 & 观察者 沟通的载体

    请结合上述 顾客到饭店吃饭 的生活例子理解:
    在这里插入图片描述
    RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。具体如下图:
    在这里插入图片描述

    代码实现

    步骤1:创建被观察者 (**Observable**** )& 生产事件**

    • 即 顾客入饭店 - 坐下餐桌 - 点菜
    // 步骤1:创建被观察者 (Observable )& 生产事件
    // 即 顾客入饭店 - 坐下餐桌 - 点菜
    // 1. 创建被观察者 Observable 对象
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    // create() 是 RxJava 最基本的创造事件序列的方法
    // 此处传入了一个 OnSubscribe 对象参数
    // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
    // 即观察者会依次调用对应事件的复写方法从而响应事件
    // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
    
    // 2. 在复写的subscribe()里定义需要发送的事件
    	@Override
    	public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    		// 通过 ObservableEmitter类对象产生事件并通知观察者
    		// ObservableEmitter类介绍
    		// 2.1 定义:事件发射器
    		// 2.2 作用:定义需要发送的事件 & 向观察者发送事件
    		emitter.onNext("event01");
    		emitter.onNext("event02");
    		emitter.onNext("event03");
    		emitter.onComplete();
    	}
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    步骤2:创建观察者 (**Observer**** )并 定义响应事件的行为**

    • 即 开厨房 - 确定对应菜式

    • 发生的事件类型包括:Next事件、Complete事件 & Error事件。具体如下:

    事件类型定义作用使用规则使用方法
    Next普通事件向观察者发送需要响应事件的信号被观聚者可发送无限个Next事件;观察者可接受无限个Next事件onNext()
    Complete表示所有的事件都已经成功完成(RxJava把所有时间当作队列处理)标志 被观察者 不再发送普通事件(Next)当被观察者发送了一个Complete事件后,被观察者在Complete事件后的事件将会继续发送,但观察者收到Complete事件后将不再继续接收任何事件;被观察者可以不发送Complete事件。onComplete()
    Error事件队列异常事件标志 事件处理过程中出现异常(此时队列自动终止,不允许再有事件发出)当被观察者发送了一个Error事件后,被观察者在Error事件后的事件将会继续发送,但观察者收到Error事件后将不再继续接收任何事件;被观察者可以不发送Error事件。onError()
    // 1. 创建观察者 (Observer )对象
    Observer<String> observer = new Observer<String>() {
    	// 2. 创建对象时通过对应复写对应事件方法 从而 响应对应事件
    	// 观察者接收事件前,默认最先调用复写 onSubscribe()
    	@Override
    	public void onSubscribe(Disposable d) {
    
    	}
    	// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
    	@Override
    	public void onNext(String value) {
    		System.out.println("对Next事件作出响应" + value);
    	}
    	// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
    	@Override
    	public void onError(Throwable e) {
    
    	}
    	// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
    	@Override
    	public void onComplete() {
    
    	}
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    步骤3:通过订阅(**Subscribe**)连接观察者和被观察者

    • 即 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调
    observable.subscribe(observer);
    // 或者 observable.subscribe(subscriber);
    
    • 1
    • 2

    Subject

    来看⼀个⾮常特殊的类型- Subject ,为什么说它特殊呢?原因很简单:它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

    由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是"冷"的–就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个"冷"的Observable变成"热"的。

    Subject的种类

    针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在RxScala中Subject被称作PublishSubject)。

    AsyncSubject

    一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。

    AsyncSubject asyncSubject = AsyncSubject.create();
    // 发送事件
    asyncSubject.onNext(1);
    // 订阅
    asyncSubject.subscribe(event -> {
    	System.out.println(event);
    });
    asyncSubject.onNext(3);
    // 再次发送事件
    asyncSubject.onNext(4);
    asyncSubject.onComplete();
    // 只会监听到 事件4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述
    PublishSubject

    可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素。

    // 初始化⼀个PublishSubject
    PublishSubject publishSubject = PublishSubject.create();
    // 发送事件
    publishSubject.onNext(1);
    // 订阅
    publishSubject.subscribe(event -> {
    	System.out.println(event);
    });
    // 再次发送事件
    publishSubject.onNext(2);
    publishSubject.onNext(3);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 事件1是无法被订阅的,只接受订阅之后的响应
      在这里插入图片描述
      BehaviorSubject

    当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
    在这里插入图片描述

  • 相关阅读:
    Java线程的实现
    图片系列(6)不同版本上 Bitmap 内存分配与回收原理对比
    Worthington丨Worthington 蛋白酶K(Proteinase K)说明书
    【微服务架构组件】Apollo
    【NodeJs入门学习】node服务环境搭建
    【SwiftUI模块】0003、SwiftUI搭建瀑布流-交错网格
    Vue - 虚拟DOM的简单理解
    对于Redis,如何根据业务需求配置是否允许远程访问?
    写好技术书籍
    CMake教程-第 12 步:打包调试和发布
  • 原文地址:https://blog.csdn.net/tuyuan2012/article/details/133768849