Single.just(777)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getSingleObserver(tag));
1、just(777) ,声明数据处理器
class SingleJust<T>(private val value: T) : Single<T>() {
override fun subscribeActual(observer: SingleObserver<in T>) {
observer.onSubscribe(Disposable.disposed())
observer.onSuccess(value)
}
}
2、subscribeOn、observeOn,声明处理的观察者
class SingleSubscribeOn<T>(
private val source: SingleSource<out T>,
private val scheduler: Scheduler
) : Single<T>() {
override fun subscribeActual(observer: SingleObserver<in T>) {
val parent = SubscribeOnObserver(observer, source)
observer.onSubscribe(parent)
val f = scheduler.scheduleDirect(parent)
parent.task.replace(f)
}
internal class SubscribeOnObserver<T>(
private val downstream: SingleObserver<in T>,
private val source: SingleSource<out T>
) :
AtomicReference<Disposable>(), SingleObserver<T>, Disposable,
Runnable {
val task = SequentialDisposable()
override fun onSubscribe(d: Disposable) {
DisposableHelper.setOnce(this, d)
}
override fun onSuccess(t: T) {
downstream.onSuccess(t)
}
override fun onError(e: Throwable) {
downstream.onError(e)
}
override fun dispose() {
DisposableHelper.dispose(this)
task.dispose()
}
override fun isDisposed() = DisposableHelper.isDisposed(get())
override fun run() {
source.subscribe(this)
}
}
}
class SingleObserveOn<T>(
private val source: SingleSource<out T>,
private val scheduler: Scheduler
) : Single<T>() {
override fun subscribeActual(observer: SingleObserver<in T>) {
source.subscribe(ObserveOnSingleObserver(observer, scheduler))
}
internal class ObserveOnSingleObserver<T>(
private val downstream: SingleObserver<in T>,
private val scheduler: Scheduler
) :
AtomicReference<Disposable>(), SingleObserver<T>, Disposable,
Runnable {
var value: T? = null
var error: Throwable? = null
override fun onSubscribe(d: Disposable) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this)
}
}
override fun onSuccess(t: T) {
this.value = t
val d = scheduler.scheduleDirect(this)
DisposableHelper.replace(this, d)
}
override fun onError(e: Throwable) {
error = e
val d = scheduler.scheduleDirect(this)
DisposableHelper.replace(this, d)
}
override fun run() {
val ex = error
if (ex != null) {
downstream.onError(ex)
} else {
downstream.onSuccess(value!!)
}
}
override fun dispose() {
DisposableHelper.dispose(this)
}
override fun isDisposed() = DisposableHelper.isDisposed(get())
}
}
1、执行subscribe,老规矩,通通回调到 subscribeActual 的具体实现
未完待续