• 6. 线程调度 与 重试机制


    一. 线程调度

    1.subscribeOn()

    指定Observable自身在哪个调度器上执行,它指示Observable在一个指定的调度器上给观察者发通知决定上游线程

    2.observeOn()

    指定Observable在一个特定的调度器上发送通知给观察者决定下游线程 (调用观察者的onNext,onCompleted,onError方法)。

    • subscribeOn 给上游代码分配线程,上游切换到哪个线程,下游要是不改的话,rxJava就在这个线程一直运行,整个rxjava中严格说来真正只有一个上游,就是产生数据的位置,如just/creat其他任何变换 和 操作符, 注册都是下游。所以subscribeOn只有第一次切换有效作用范围也是最小的,就just/creat。

    • observeOn 给下游代码分配线程,基本上操作符都会生成一个新的observable处理,和之前的observable关联(其实也就是注册到之前的observable),所以在一个操作范围来看,前一个observable发送数据给我,算是上游,我这个操作符消费数据,产生新的observable算是下游,所以observeOn可以多次切换它之后的操作符的线程。

    1. Observable.create(new ObservableOnSubscribe() {
    2. @Override
    3. public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
    4. Log.d(TAG, "数据源" + Thread.currentThread().getName());//computation
    5. emitter.onNext("测试线程调度流程");
    6. }
    7. }).subscribeOn(Schedulers.computation()).map(new Function() {
    8. @Override
    9. public String apply(@NonNull String s) throws Exception {
    10. Log.d(TAG, "第1次变化" + Thread.currentThread().getName()); //computation
    11. return s;
    12. }
    13. }).subscribeOn(AndroidSchedulers.mainThread()).map(new Function() {
    14. @Override
    15. public String apply(@NonNull String s) throws Exception {
    16. Log.d(TAG, "第2次变化" + Thread.currentThread().getName());//main
    17. return s;
    18. }
    19. }).observeOn(Schedulers.io()).map(new Function() {
    20. @Override
    21. public String apply(@NonNull String s) throws Exception {
    22. Log.d(TAG, "第3次变化" + Thread.currentThread().getName());//computation
    23. return s;
    24. }
    25. }).observeOn(AndroidSchedulers.mainThread()).map(new Function() {
    26. @Override
    27. public String apply(@NonNull String s) throws Exception {
    28. Log.d(TAG, "第4次变化" + Thread.currentThread().getName()); //main
    29. return s;
    30. }
    31. }).subscribe(new Consumer() {
    32. @Override
    33. public void accept(String s) throws Exception {
    34. Log.d(TAG, "监听者" + Thread.currentThread().getName()); //main
    35. }
    36. });
    37. //执行结果
    38. 2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 数据源RxComputationThreadPool-1
    39. 2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第1次变化RxComputationThreadPool-1
    40. 2022-02-06 12:25:17.902 9891-10020/? D/KtActivity: 第2次变化RxComputationThreadPool-1
    41. 2022-02-06 12:25:17.903 9891-10021/? D/KtActivity: 第3次变化RxCachedThreadScheduler-1
    42. 2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 第4次变化main
    43. 2022-02-06 12:25:17.903 9891-9891/? D/KtActivity: 监听者main

    二. 错误重试

    retrywhen()

    观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。

    1. var count = 0
    2. @SuppressLint("CheckResult")
    3. private fun testRetryWhen() {
    4. //模拟网络错误 错误重试: 3次
    5. Observable.create<Int> { e ->
    6. count++
    7. Log.d(TAG, "onCreate: 数据源 count = $count")
    8. e.onNext(count)
    9. }.flatMap {
    10. if (it < 4) { //模拟网络错误
    11. Observable.error(Exception())
    12. } else {
    13. Observable.just(it)
    14. }
    15. }.retryWhen(object : Function, ObservableSource<*>?> {
    16. private var mRetryCount = 0
    17. override fun apply(throwableObservable: Observable<Throwable>): ObservableSource<*>? {
    18. return throwableObservable.flatMap { throwable: Throwable ->
    19. mRetryCount++
    20. if (mRetryCount <= 3) {
    21. Log.d(TAG, "获取数据失败重试第" + mRetryCount + "次错误-> $throwable")
    22. }
    23. if (throwable is Exception && mRetryCount <= 3) {
    24. return@flatMap Observable.timer(500, TimeUnit.MILLISECONDS)
    25. } else {
    26. return@flatMap Observable.error<Int>(throwable)
    27. }
    28. }
    29. }
    30. }).subscribe(
    31. { s -> Log.d(TAG, "onNext: 获取到的数据 $s") },
    32. { throwable -> Log.d(TAG, "onError: ${throwable.message}") })
    33. { Log.d(TAG, "onComplete") }
    34. }
    35. //执行结果
    36. 2022-02-06 14:20:16.680 20551-20551/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 1
    37. 2022-02-06 14:20:16.681 20551-20551/com.xzh.cdemo D/KtActivity: 获取数据失败重试第1次错误-> java.lang.Exception
    38. 2022-02-06 14:20:17.191 20551-20727/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 2
    39. 2022-02-06 14:20:17.192 20551-20727/com.xzh.cdemo D/KtActivity: 获取数据失败重试第2次错误-> java.lang.Exception
    40. 2022-02-06 14:20:17.693 20551-20728/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 3
    41. 2022-02-06 14:20:17.694 20551-20728/com.xzh.cdemo D/KtActivity: 获取数据失败重试第3次错误
    42. 2022-02-06 14:20:18.197 20551-20729/com.xzh.cdemo D/KtActivity: onCreate: 数据源 count = 4
    43. 2022-02-06 14:20:18.199 20551-20729/com.xzh.cdemo D/KtActivity: onNext: 获取到的数据 4

  • 相关阅读:
    [C# ] 执行ping命令,获得网络状态
    CCF刷题计划——训练计划(反向拓扑排序)
    Ubuntu LTS 坚持 10 年更新不动摇
    pdffactory pro 8中文破解版
    JVM运行参数之-X和-XX参数
    git配置1-不同的项目使用不同用户名或邮箱
    四级单词大全o-z
    Docker全解
    北京十大靠谱律师事务所最新排名(2022前十推荐)
    各大算法平台刷题数量统计网站
  • 原文地址:https://blog.csdn.net/x910131/article/details/126071061