• RxJava 复刻简版之四,线程切换


    补充知识点:Rx 即 Reactive Extensions,因此 RxJava 是为 java 扩展的响应式编程库,使数据可以异步地运用

    1、线程类型

    1. Schedulers.io:常用,读写文件/数据库/网络请求
    2. Schedulers.newThread:耗时操作
    3. Schedulers.computation:CUP密集计算,图片压缩/数据格式解析
    4. Schedulers.trampoline:当前线程
    5. AndroidSchedulers.mainThread:主线程,更新UI

    2、例子

            Single.just(777)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(getSingleObserver(tag));
    
    • 1
    • 2
    • 3
    • 4

    3、流程

    1、just(777) ,声明数据处理器

    class SingleJust<T>(private val value: T) : Single<T>() {
        override fun subscribeActual(observer: SingleObserver<in T>) {
            observer.onSubscribe(Disposable.disposed())
            observer.onSuccess(value)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2、subscribeOn、observeOn,声明处理的观察者

    class SingleSubscribeOn<T>(
        private val source: SingleSource<out T>,
        private val scheduler: Scheduler
    ) : Single<T>() {
        override fun subscribeActual(observer: SingleObserver<in T>) {
            val parent = SubscribeOnObserver(observer, source)
            observer.onSubscribe(parent)
            val f = scheduler.scheduleDirect(parent)
            parent.task.replace(f)
        }
    
        internal class SubscribeOnObserver<T>(
            private val downstream: SingleObserver<in T>,
            private val source: SingleSource<out T>
        ) :
            AtomicReference<Disposable>(), SingleObserver<T>, Disposable,
            Runnable {
            val task = SequentialDisposable()
    
            override fun onSubscribe(d: Disposable) {
                DisposableHelper.setOnce(this, d)
            }
    
            override fun onSuccess(t: T) {
                downstream.onSuccess(t)
            }
    
            override fun onError(e: Throwable) {
                downstream.onError(e)
            }
    
            override fun dispose() {
                DisposableHelper.dispose(this)
                task.dispose()
            }
    
            override fun isDisposed() = DisposableHelper.isDisposed(get())
    
            override fun run() {
                source.subscribe(this)
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    class SingleObserveOn<T>(
        private val source: SingleSource<out T>,
        private val scheduler: Scheduler
    ) : Single<T>() {
        override fun subscribeActual(observer: SingleObserver<in T>) {
            source.subscribe(ObserveOnSingleObserver(observer, scheduler))
        }
    
        internal class ObserveOnSingleObserver<T>(
            private val downstream: SingleObserver<in T>,
            private val scheduler: Scheduler
        ) :
            AtomicReference<Disposable>(), SingleObserver<T>, Disposable,
            Runnable {
            var value: T? = null
            var error: Throwable? = null
    
            override fun onSubscribe(d: Disposable) {
                if (DisposableHelper.setOnce(this, d)) {
                    downstream.onSubscribe(this)
                }
            }
    
            override fun onSuccess(t: T) {
                this.value = t
                val d = scheduler.scheduleDirect(this)
                DisposableHelper.replace(this, d)
            }
    
            override fun onError(e: Throwable) {
                error = e
                val d = scheduler.scheduleDirect(this)
                DisposableHelper.replace(this, d)
            }
    
            override fun run() {
                val ex = error
                if (ex != null) {
                    downstream.onError(ex)
                } else {
                    downstream.onSuccess(value!!)
                }
            }
    
            override fun dispose() {
                DisposableHelper.dispose(this)
            }
    
            override fun isDisposed() = DisposableHelper.isDisposed(get())
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    4、分析

    1、执行subscribe,老规矩,通通回调到 subscribeActual 的具体实现
    未完待续

  • 相关阅读:
    rust从0开始写项目-读取配置文件
    MySQL针对数据库-基本操作
    docker离线安装
    【数据结构】链表
    盘点一个使用Python自动化处理GPS、北斗经纬度数据实战(中篇)
    设计模式——2_A 访问者(Visitor)
    springboot(ssm宝鸡文理学院学生成绩动态追踪系统 成绩管理系统Java(code&LW)
    字符串拼接你真的啥都知道了吗
    linux添加环境变量
    将springboot jar应用打包成镜像并在docker运行成容器
  • 原文地址:https://blog.csdn.net/da_ma_dai/article/details/132267618