RxJava
是一个 基于事件流、实现异步操作的库
用于实现异步操作,类似于 Android
中的 AsyncTask
、Handler+
new Thread的作用
由于 RxJava
的使用方式是:基于事件流的链式调用,所以使得 RxJava
:
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
Rxjava
原理 基于 一种扩展的观察者模式
Rxjava
的扩展观察者模式中有4个角色:
角色 | 作用 | 类比 |
---|---|---|
被观察者(Observable) | 产生事件 | 顾客 |
观察者(Observer) | 接收事件,并给出响应动作 | 厨房 |
订阅(Subscribe) | 连接 被观察者 & 观察者 | 服务员 |
事件(Event) | 被观察者 & 观察者 沟通的载体 | 菜式 |
导包
- implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
- implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
- implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'
三种方式:Observable.create、Observable.just、Observable.fromArray
- public void buyFood(){
- Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
- e.onNext("potato");
- e.onNext("tomato");
- e.onNext("noodles");
- e.onComplete();
- }
- });
-
- Observable<String> observable1 = Observable.just("potato","tomato","noodles");
- String [] foods = new String[]{"potato","tomato","noodles"};
- Observable<String> observable2 = Observable.fromArray(foods);
- }
使用observer或者subscriber
- Observer<String> mObserver = new Observer<String>() {
- @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull String s) {
-
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- };
- Subscriber<String> mStringSubscriber = new Subscriber<String>() {
- @Override
- public void onSubscribe(Subscription s) {
-
- }
-
- @Override
- public void onNext(String s) {
-
- }
-
- @Override
- public void onError(Throwable t) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- };
特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别
// 相同点:二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)
// 不同点:Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:
// 1\. onStart():在还未响应事件前调用,用于做一些初始化工作
// 2\. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件
// 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露
observable.subscribe(observer);
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(@NonNull ObservableEmitter
e) throws Exception { - e.onNext("potato");
- e.onNext("tomato");
- e.onNext("noodles");
- e.onComplete();
- }
- }).subscribe(new Observer
() { - @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull String s) {
-
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
observeOn 作用于该操作符之后直到出现新的observeOn操作符
subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制
实例
- //将字符串转换成double类型
- String path = "12.3";
- public void test(){
- //输入事件
- Observable.just(path)
- //对事件进行处理
- .map(new Function<String, Double>() {
- @Override
- public Double apply(@NonNull String s) throws Exception {
- return Double.parseDouble(s);
- }
- })
- //指定被观察者执行线程
- .subscribeOn(Schedulers.io())
- //指定观察者执行线程
- .observeOn(AndroidSchedulers.mainThread())
- //订阅观察者
- .subscribe(
- new Observer<Double>() {
- @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull Double aDouble) {
-
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });
- }
Rxjava中的Scheduler
相当于线程控制器,Rxjava通过它来指定每一段代码应该运行在什么样的线程。Rxjava提供了5种调度器:
另外,Android还有一个专用的AndroidSchedulers.mainThread()
指定操作将在Android主线程运行。Rxjava通过subscribeOn()
和observeOn()
两个方法来对线程进行控制,subscribeOn()
指定subscribe()
时间发生的线程,也就是事件产生的线程,observeOn()
指定Subscriber
做运行的线程,也就是消费事件的线程。
Schedulers.io() 用于I/O操作,比如:读写文件,数据库,网络交互等等。行为模式和newThread()
差不多,重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存
Schedulers.computation()计算工作默认的调度器,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。
Schedulers.immediate() 这个调度器允许你立即在当前线程执行你指定的工作。这是默认的Scheduler
。
Schedulers.newThread() 它为指定任务启动一个新的线程。
Schedulers.trampoline()
当我们想在当前线程执行一个任务时,并不是立即,我们可以用trampoline()
将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。
- private void test3() {
- RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer
- @Override
- public void onSubscribe(@NonNull Disposable d) {
-
- }
-
- @Override
- public void onNext(@NonNull Object o) {
- Log.e(TAG, "onNext: 响应事件....." );
- }
-
- @Override
- public void onError(@NonNull Throwable e) {
-
- }
-
- @Override
- public void onComplete() {
-
- }
- });