• rxjava2源码分析


    目录

    一,Observable调用流程

    1.1 简单Observable.create()创建调用流程

    1.2 map操作符

    1.3 flatmap操作符

    1.4 subscribeOn操作符

    1.5 observeOn操作符



    一,Observable调用流程

    1.1 简单Observable.create()创建调用流程

    上面的这个流程图是下面这段代码调用流程图

    1. Observable.create(object :ObservableOnSubscribe{
    2. override fun subscribe(emitter: ObservableEmitter<String>) {
    3. emitter.onNext("a")
    4. }
    5. }).subscribe({
    6. Log.d(TAG,"======== onNext value = $it")
    7. },{
    8. Log.d(TAG,"======== onError ")
    9. },{
    10. Log.d(TAG,"======== onComplete")
    11. },{
    12. Log.d(TAG,"======== onSubscribe")
    13. })
    1.2 map操作符

    1. Observable.create(object :ObservableOnSubscribe<Int>{
    2. override fun subscribe(emitter: ObservableEmitter<Int>) {
    3. emitter.onNext(1)
    4. }
    5. }).map { result->
    6. result.toString()+"map转换"
    7. }.subscribe({
    8. Log.d(TAG,"======== onNext value = ${it.toString()}")
    9. },{
    10. Log.d(TAG,"======== onError ")
    11. },{
    12. Log.d(TAG,"======== onComplete")
    13. },{
    14. Log.d(TAG,"======== onSubscribe")
    15. })
    1.3 flatmap操作符

    1. Observable.create(object : ObservableOnSubscribe<Int> {
    2. override fun subscribe(emitter: ObservableEmitter<Int>) {
    3. emitter.onNext(1)
    4. }
    5. }).flatMap(object : Function<Int, ObservableSource<out String>> {
    6. override fun apply(t: Int): ObservableSource<out String> {
    7. return Observable.just("${t.toString()}flatmap")
    8. }
    9. }).subscribe({
    10. Log.d(TAG, "======== onNext value = ${it.toString()}")
    11. }, {
    12. Log.d(TAG, "======== onError ")
    13. }, {
    14. Log.d(TAG, "======== onComplete")
    15. }, {
    16. Log.d(TAG, "======== onSubscribe")
    17. })
    1.4 subscribeOn操作符

    1. Observable.create(object : ObservableOnSubscribe<Int> {
    2. override fun subscribe(emitter: ObservableEmitter<Int>) {
    3. Log.d(TAG, "======== subscribe 当前线程是${Thread.currentThread().name} ")
    4. emitter.onNext(1)
    5. }
    6. }).subscribeOn(Schedulers.io())/* .observeOn(AndroidSchedulers.mainThread())*/
    7. .subscribeOn(Schedulers.newThread())
    8. .subscribe(MyObserver())

    通过多次切换订阅线程,发现只有第一次生效,

    看下日志:

     D  ======== setOnObservableAssembly apply
    2023-10-09 20:30:20.440 28448-28448 com.exampl...deActivity com.example.myapplication            D  ======== setOnObservableAssembly apply
    2023-10-09 20:31:53.456 28448-28448 com.exampl...deActivity com.example.myapplication            D  ======== setOnObservableAssembly apply
    2023-10-09 20:32:41.050 28448-28448 com.exampl...deActivity com.example.myapplication            D  ======== onSubscribe 当前线程是main

    根本原因是在订阅的时候

    会根据这个订阅源不断的去找上个源,那么到这里就明白了,当前切换线上的会被新的订阅源替代,所以就是为什么多次切换线程只有第一次生效原因

    1.5 observeOn操作符

    对于observeOn多次切换只有最后一次生效,根本原因是

    在ObservableObserveOn对下里面,每次调用onNext的时候都会进行一次线程切换,如果还有后续的Observe,那么就会继续调用这个对应的Observe的onNext方法,虽然每调用一次当前的线程会切换一次,但是最终的消费地方是在业务定义的那个Observe,这个是业务的地方,也就是我们业务真正关系的地方的线程切换

    1. Observable.create(object : ObservableOnSubscribe<Int> {
    2. override fun subscribe(emitter: ObservableEmitter<Int>) {
    3. Log.d(TAG, "======== subscribe 当前线程是${Thread.currentThread().name} ")
    4. emitter.onNext(1)
    5. }
    6. }).observeOn(AndroidSchedulers.mainThread())
    7. .observeOn(Schedulers.io())
    8. .subscribe(MyObserver())

    对于这种多次切换的时候,那么看日志

     ======== setOnObservableAssembly apply
    2023-10-10 20:08:28.113 10087-10087 com.exampl...deActivity com.example.myapplication            D  ======== setOnObservableAssembly apply
    2023-10-10 20:08:28.117 10087-10087 com.exampl...deActivity com.example.myapplication            D  ======== onSubscribe 当前线程是main
    2023-10-10 20:08:28.117 10087-10087 com.exampl...deActivity com.example.myapplication            D  ======== subscribe 当前线程是main 
    2023-10-10 20:08:28.301 10087-10132 com.exampl...deActivity com.example.myapplication            D  ======== onNext value = 1 当前线程是RxCachedThreadScheduler-1

    最后左右一次切换生效到最终的业务上

    二,背压原理分析

  • 相关阅读:
    [足式机器人]Part3机构运动微分几何学分析与综合Ch02-1 平面机构离散运动鞍点综合——【读书笔记】
    ssh使用
    【POJ No. 3368】 最频繁值 Frequent values
    计算物理专题----蒙特卡洛积分实战
    C语言经典题目之字符串逆序
    Vue中给对象添加新属性时,界面不刷新怎么办?
    Visual Studio 2022 启用CodeLens - 程序抬头显示(查看字段、方法、类的引用)
    leetcode 第388场周赛第三题
    猿创征文 | Linux运维工程师的10个日常使用工具分享
    【docker安装Mysql并配置主从复制】
  • 原文地址:https://blog.csdn.net/qq_18757557/article/details/133699415