• RxJava(三)-合并操作符


    1.concat和concatArray
      是将多个被观察者合并后,发射出去。

    1. Observable o1 = Observable.just(1,2,3);
    2. Observable o2 = Observable.just("a","b","c");
    3. Observable.concat(o1,o2).subscribe(new Observer() {
    4. @Override
    5. public void onSubscribe(@NonNull Disposable d) {
    6. }
    7. @Override
    8. public void onNext(@NonNull Object o) {
    9. Log.e("rxjava","onNext "+o.toString());
    10. }
    11. @Override
    12. public void onError(@NonNull Throwable e) {
    13. }
    14. @Override
    15. public void onComplete() {
    16. }
    17. });
    18. 执行结果:
    19. rxjava: onNext 1
    20. rxjava: onNext 2
    21. rxjava: onNext 3
    22. rxjava: onNext a
    23. rxjava: onNext b
    24. rxjava: onNext c

    concat和concatArray都是返回的new ObservableConcatMap(fromArray(sources), ...)对象,接收sources对象是一个ObservableSource数组。
    fromArray(sources),就是传递进来的被观察者集合
    在调用concat时,在RxJava内部调用的还是concatArray
    2.merge和mergeArray
      将多个被观察者合并后发射出去

    1. List list = new ArrayList();
    2. list.add(1);
    3. list.add(2);
    4. list.add(3);
    5. Observable o1 = Observable.fromIterable(list);
    6. Observable o2 = Observable.just("a","b","c");
    7. Observable.merge(o1,o2).subscribe(new Observer() {
    8. @Override
    9. public void onSubscribe(@NonNull Disposable d) {
    10. }
    11. @Override
    12. public void onNext(@NonNull Object o) {
    13. Log.e("rxjava","onNext "+o.toString());
    14. }
    15. @Override
    16. public void onError(@NonNull Throwable e) {
    17. }
    18. @Override
    19. public void onComplete() {
    20. }
    21. });
    22. 执行结果:
    23. rxjava: onNext 1
    24. rxjava: onNext 2
    25. rxjava: onNext 3
    26. rxjava: onNext a
    27. rxjava: onNext b
    28. rxjava: onNext c

    merge和mergeArray调用的都是:
    fromArray(sources).flatMap((Function)Functions.identity(), sources.length);
    它的合并是通过flatMap实现的。
    3.zip
      是将多个被观察者合并后,再转换新的对象返回给链上下游的观察者和被观察者。

    1. bservable o1 = Observable.just(1,2,3);
    2. Observable o2 = Observable.just("a","b","c");
    3. Observable.zip(o1, o2, new BiFunction<Integer, String, String>() {
    4. @NonNull
    5. @Override
    6. public String apply(@NonNull Integer integer, @NonNull String string) throws Exception {
    7. return integer+"_"+string;
    8. }
    9. }).subscribe(new Observer<String>() {
    10. @Override
    11. public void onSubscribe(@NonNull Disposable d) {
    12. }
    13. @Override
    14. public void onNext(@NonNull String o) {
    15. Log.e("rxjava","onNext "+o);
    16. }
    17. @Override
    18. public void onError(@NonNull Throwable e) {
    19. }
    20. @Override
    21. public void onComplete() {
    22. }
    23. });
    24. 执行结果:
    25. rxjava: onNext 1_a
    26. rxjava: onNext 2_b
    27. rxjava: onNext 3_c

    zip返回的是ObservableZip(ObservableSource[] sources,Function zipper)对象,sources就是合并后的Observable。

    zipper 就是传递进来的BiFunction类,通过apply函数,返回合并后的结果,并将结果传递给下游的对象。
     

  • 相关阅读:
    SwiftUI4.0有趣的动画升级:新iOS16视图内容过渡动画
    小常识:琢磨一下淘宝上手机换套餐的生意
    【HMS core】【FAQ】Analytics Kit、Push Kit典型问题合集3
    Sentinel集成Nacos对流控与降级规则的持久化
    整理的最新版的K8S安装教程,看完还不会,请你吃瓜
    如何选择正确的需求管理工具和软件
    【高并发】不得不说的线程池与ThreadPoolExecutor类浅析
    【Swift 60秒】59 - Closures: Summary
    tensorflow安装踩坑总结
    485modbus转profinet网关在混料配料输送系统应用博图配置案例
  • 原文地址:https://blog.csdn.net/niuyongzhi/article/details/126925288