目录
1.1 简单Observable.create()创建调用流程

上面的这个流程图是下面这段代码调用流程图
- Observable.create(object :ObservableOnSubscribe
{ - override fun subscribe(emitter: ObservableEmitter<String>) {
- emitter.onNext("a")
- }
-
- }).subscribe({
- Log.d(TAG,"======== onNext value = $it")
- },{
- Log.d(TAG,"======== onError ")
- },{
- Log.d(TAG,"======== onComplete")
- },{
- Log.d(TAG,"======== onSubscribe")
- })

- Observable.create(object :ObservableOnSubscribe<Int>{
- override fun subscribe(emitter: ObservableEmitter<Int>) {
- emitter.onNext(1)
- }
-
- }).map { result->
- result.toString()+"map转换"
-
- }.subscribe({
- Log.d(TAG,"======== onNext value = ${it.toString()}")
- },{
- Log.d(TAG,"======== onError ")
- },{
- Log.d(TAG,"======== onComplete")
- },{
- Log.d(TAG,"======== onSubscribe")
- })

- Observable.create(object : ObservableOnSubscribe<Int> {
- override fun subscribe(emitter: ObservableEmitter<Int>) {
- emitter.onNext(1)
- }
-
- }).flatMap(object : Function<Int, ObservableSource<out String>> {
- override fun apply(t: Int): ObservableSource<out String> {
- return Observable.just("${t.toString()}flatmap")
- }
-
- }).subscribe({
- Log.d(TAG, "======== onNext value = ${it.toString()}")
- }, {
- Log.d(TAG, "======== onError ")
- }, {
- Log.d(TAG, "======== onComplete")
- }, {
- Log.d(TAG, "======== onSubscribe")
- })

- Observable.create(object : ObservableOnSubscribe<Int> {
- override fun subscribe(emitter: ObservableEmitter<Int>) {
- Log.d(TAG, "======== subscribe 当前线程是${Thread.currentThread().name} ")
-
- emitter.onNext(1)
- }
-
- }).subscribeOn(Schedulers.io())/* .observeOn(AndroidSchedulers.mainThread())*/
- .subscribeOn(Schedulers.newThread())
- .subscribe(MyObserver())
通过多次切换订阅线程,发现只有第一次生效,
看下日志:
D ======== setOnObservableAssembly apply
2023-10-09 20:30:20.440 28448-28448 com.exampl...deActivity com.example.myapplication D ======== setOnObservableAssembly apply
2023-10-09 20:31:53.456 28448-28448 com.exampl...deActivity com.example.myapplication D ======== setOnObservableAssembly apply
2023-10-09 20:32:41.050 28448-28448 com.exampl...deActivity com.example.myapplication D ======== onSubscribe 当前线程是main
根本原因是在订阅的时候

会根据这个订阅源不断的去找上个源,那么到这里就明白了,当前切换线上的会被新的订阅源替代,所以就是为什么多次切换线程只有第一次生效原因

对于observeOn多次切换只有最后一次生效,根本原因是

在ObservableObserveOn对下里面,每次调用onNext的时候都会进行一次线程切换,如果还有后续的Observe,那么就会继续调用这个对应的Observe的onNext方法,虽然每调用一次当前的线程会切换一次,但是最终的消费地方是在业务定义的那个Observe,这个是业务的地方,也就是我们业务真正关系的地方的线程切换
- Observable.create(object : ObservableOnSubscribe<Int> {
- override fun subscribe(emitter: ObservableEmitter<Int>) {
- Log.d(TAG, "======== subscribe 当前线程是${Thread.currentThread().name} ")
-
- emitter.onNext(1)
- }
-
- }).observeOn(AndroidSchedulers.mainThread())
- .observeOn(Schedulers.io())
- .subscribe(MyObserver())
对于这种多次切换的时候,那么看日志
======== setOnObservableAssembly apply
2023-10-10 20:08:28.113 10087-10087 com.exampl...deActivity com.example.myapplication D ======== setOnObservableAssembly apply
2023-10-10 20:08:28.117 10087-10087 com.exampl...deActivity com.example.myapplication D ======== onSubscribe 当前线程是main
2023-10-10 20:08:28.117 10087-10087 com.exampl...deActivity com.example.myapplication D ======== subscribe 当前线程是main
2023-10-10 20:08:28.301 10087-10132 com.exampl...deActivity com.example.myapplication D ======== onNext value = 1 当前线程是RxCachedThreadScheduler-1
最后左右一次切换生效到最终的业务上