• RxJava(三)-合并操作符


    1.concat和concatArray
      是将多个被观察者合并后,发射出去。

    1. Observable o1 = Observable.just(1,2,3);
    2. Observable o2 = Observable.just("a","b","c");
    3. Observable.concat(o1,o2).subscribe(new Observer() {
    4. @Override
    5. public void onSubscribe(@NonNull Disposable d) {
    6. }
    7. @Override
    8. public void onNext(@NonNull Object o) {
    9. Log.e("rxjava","onNext "+o.toString());
    10. }
    11. @Override
    12. public void onError(@NonNull Throwable e) {
    13. }
    14. @Override
    15. public void onComplete() {
    16. }
    17. });
    18. 执行结果:
    19. rxjava: onNext 1
    20. rxjava: onNext 2
    21. rxjava: onNext 3
    22. rxjava: onNext a
    23. rxjava: onNext b
    24. rxjava: onNext c

    concat和concatArray都是返回的new ObservableConcatMap(fromArray(sources), ...)对象,接收sources对象是一个ObservableSource数组。
    fromArray(sources),就是传递进来的被观察者集合
    在调用concat时,在RxJava内部调用的还是concatArray
    2.merge和mergeArray
      将多个被观察者合并后发射出去

    1. List list = new ArrayList();
    2. list.add(1);
    3. list.add(2);
    4. list.add(3);
    5. Observable o1 = Observable.fromIterable(list);
    6. Observable o2 = Observable.just("a","b","c");
    7. Observable.merge(o1,o2).subscribe(new Observer() {
    8. @Override
    9. public void onSubscribe(@NonNull Disposable d) {
    10. }
    11. @Override
    12. public void onNext(@NonNull Object o) {
    13. Log.e("rxjava","onNext "+o.toString());
    14. }
    15. @Override
    16. public void onError(@NonNull Throwable e) {
    17. }
    18. @Override
    19. public void onComplete() {
    20. }
    21. });
    22. 执行结果:
    23. rxjava: onNext 1
    24. rxjava: onNext 2
    25. rxjava: onNext 3
    26. rxjava: onNext a
    27. rxjava: onNext b
    28. rxjava: onNext c

    merge和mergeArray调用的都是:
    fromArray(sources).flatMap((Function)Functions.identity(), sources.length);
    它的合并是通过flatMap实现的。
    3.zip
      是将多个被观察者合并后,再转换新的对象返回给链上下游的观察者和被观察者。

    1. bservable o1 = Observable.just(1,2,3);
    2. Observable o2 = Observable.just("a","b","c");
    3. Observable.zip(o1, o2, new BiFunction<Integer, String, String>() {
    4. @NonNull
    5. @Override
    6. public String apply(@NonNull Integer integer, @NonNull String string) throws Exception {
    7. return integer+"_"+string;
    8. }
    9. }).subscribe(new Observer<String>() {
    10. @Override
    11. public void onSubscribe(@NonNull Disposable d) {
    12. }
    13. @Override
    14. public void onNext(@NonNull String o) {
    15. Log.e("rxjava","onNext "+o);
    16. }
    17. @Override
    18. public void onError(@NonNull Throwable e) {
    19. }
    20. @Override
    21. public void onComplete() {
    22. }
    23. });
    24. 执行结果:
    25. rxjava: onNext 1_a
    26. rxjava: onNext 2_b
    27. rxjava: onNext 3_c

    zip返回的是ObservableZip(ObservableSource[] sources,Function zipper)对象,sources就是合并后的Observable。

    zipper 就是传递进来的BiFunction类,通过apply函数,返回合并后的结果,并将结果传递给下游的对象。
     

  • 相关阅读:
    力扣labuladong——一刷day44
    F. MST Unification
    【python】python中字符串简单介绍及相关操作
    Java不能操作内存?Unsafe了解一下
    element ui框架(vuex3使用)
    JNDIExploit-1.2-SNAPSHOT.jar工具在log4j漏洞复现中的使用
    高压放大器使用方法介绍
    swift UITextField 设置leftView不生效
    Spring【注解实现IOC(@Component、@Repository、@Service、@Controller)】(三)-全面详解(学习总结---从入门到深化)
    Linux块设备缓存Bcache使用
  • 原文地址:https://blog.csdn.net/niuyongzhi/article/details/126925288