
指定Observable自身在哪个调度器上执行,它指示Observable在一个指定的调度器上给观察者发通知决定上游线程。

指定Observable在一个特定的调度器上发送通知给观察者决定下游线程 (调用观察者的onNext,onCompleted,onError方法)。

subscribeOn 给上游代码分配线程,上游切换到哪个线程,下游要是不改的话,rxJava就在这个线程一直运行,整个rxjava中严格说来真正只有一个上游,就是产生数据的位置,如just/creat其他任何变换 和 操作符, 注册都是下游。所以subscribeOn只有第一次切换有效作用范围也是最小的,就just/creat。
observeOn 给下游代码分配线程,基本上操作符都会生成一个新的observable处理,和之前的observable关联(其实也就是注册到之前的observable),所以在一个操作范围来看,前一个observable发送数据给我,算是上游,我这个操作符消费数据,产生新的observable算是下游,所以observeOn可以多次切换它之后的操作符的线程。
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(@NonNull ObservableEmitter
emitter) throws Exception { - Log.d(TAG, "数据源" + Thread.currentThread().getName());//computation
- emitter.onNext("测试线程调度流程");
- }
- }).subscribeOn(Schedulers.computation()).map(new Function
() { - @Override
- public String apply(@NonNull String s) throws Exception {
- Log.d(TAG, "第1次变化" + Thread.currentThread().getName()); //computation
- return s;
- }
- }).subscribeOn(AndroidSchedulers.mainThread()).map(new Function
() { - @Override
- public String apply(@NonNull String s) throws Exception {
- Log.d(TAG, "第2次变化" + Thread.currentThread().getName());//main
- return s;
- }
- }).observeOn(Schedulers.io()).map(new Function
() { - @Override
- public String apply(@NonNull String s) throws Exception {
- Log.d(TAG, "第3次变化" + Thread.currentThread().getName());//computation
- return s;
- }
- }).observeOn(AndroidSchedulers.mainThread()).map(new Function
() { - @Override
- public String apply(@NonNull String s) throws Exception {
- Log.d(TAG, "第4次变化" + Thread.currentThread().getName()); //main
- return s;
- }
- }).subscribe(new Consumer
() { - @Override
- public void accept(String s) throws Exception {
- Log.d(TAG, "监听者" + Thread.currentThread().getName()); //main
- }
- });
- //执行结果
- 2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 数据源RxComputationThreadPool-1
- 2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第1次变化RxComputationThreadPool-1
- 2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第2次变化RxComputationThreadPool-1
- 2022-02-06 12:25:17.903 9891-10021/? D/KtActivity: 第3次变化RxCachedThreadScheduler-1
- 2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 第4次变化main
- 2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 监听者main


观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。

- var count = 0
- @SuppressLint("CheckResult")
- private fun testRetryWhen() {
- //模拟网络错误 错误重试: 3次
- Observable.create<Int> { e ->
- count++
- Log.d(TAG, "onCreate: 数据源 count = $count")
- e.onNext(count)
- }.flatMap {
- if (it < 4) { //模拟网络错误
- Observable.error(Exception())
- } else {
- Observable.just(it)
- }
- }.retryWhen(object : Function
, ObservableSource<*>?> { - private var mRetryCount = 0
- override fun apply(throwableObservable: Observable<Throwable>): ObservableSource<*>? {
- return throwableObservable.flatMap { throwable: Throwable ->
- mRetryCount++
- if (mRetryCount <= 3) {
- Log.d(TAG, "获取数据失败重试第" + mRetryCount + "次错误-> $throwable")
- }
- if (throwable is Exception && mRetryCount <= 3) {
- return@flatMap Observable.timer(500, TimeUnit.MILLISECONDS)
- } else {
- return@flatMap Observable.error<Int>(throwable)
- }
- }
- }
- }).subscribe(
- { s -> Log.d(TAG, "onNext: 获取到的数据 $s") },
- { throwable -> Log.d(TAG, "onError: ${throwable.message}") })
- { Log.d(TAG, "onComplete") }
- }
- //执行结果
- 2022-02-06 14:20:16.680 20551-20551/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 1
- 2022-02-06 14:20:16.681 20551-20551/com.xzh.cdemo D/KtActivity: 获取数据失败重试第1次错误-> java.lang.Exception
- 2022-02-06 14:20:17.191 20551-20727/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 2
- 2022-02-06 14:20:17.192 20551-20727/com.xzh.cdemo D/KtActivity: 获取数据失败重试第2次错误-> java.lang.Exception
- 2022-02-06 14:20:17.693 20551-20728/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 3
- 2022-02-06 14:20:17.694 20551-20728/com.xzh.cdemo D/KtActivity: 获取数据失败重试第3次错误
- 2022-02-06 14:20:18.197 20551-20729/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 4
- 2022-02-06 14:20:18.199 20551-20729/com.xzh.cdemo D/KtActivity: onNext: 获取到的数据 4