• 5. RxJava合并条件操作符


    一.合并操作符

    类型

    作用

    特点

    适用场景

    组合多个被观察者

    concat() / concatArray()

    组合多个被观察者一起发送数据,

    合并后按发送顺序串行执行;

    二者区别:组合被观察者数量,即:

    concat() 组合被观察者数量≤4个

    concatArray()则可>4个。

    读取缓存数据,

    合并数据源同时展示

    联合处理

    merge() / mergeArray()

    组合多个被观察者一起发送数据,

    合并后按时间线并行执行

    二者区别:组合被观察者数量,即:

    merge()组合被观察者数量≤4个

    mergeArray()则可>4个。

    concatDelayError()

    mergeDelayError()

    将错误事件延迟到发送事件完毕后

    再继续执行;

    合并多个事件

    zip()

    合并多个观察者发送的事件,生成一个新的事件序列(组合过的事件序列),并最终

    事件的组合方式=严格按照原先事件顺序进行对位合并,最终合并的事件数量 = 多Observable中数量最少的数量

    combineLatest()

    当两个观察者中任何一个发送了数据后,将先发送了数据的观察者的最新数据 与另一观察者发送的每个数据结合,最终基于该函数的结构发送数据

    与 Zip()区别:

    Zip(): 按个数合并,即1对1合并;

    CombineLates():按同一时间点合并

    combineLaststDelayError

    类似concatDelayError 错误处理

    reduce()

    把被观察者需要发送的事件聚合成一个事件,并发送

    聚合的逻辑根据需求撰写, 但本质 = 前2个数据聚合,然后与后1个数据继续聚合,以此类推.

    collect()

    将观察者发送的数据事件收集到一个数据结构里.

    发送前追加发送事件

    startWith()

    在一个被观察者发送事件前追加发送一些数据;

    追加数据顺序 = 后调用先追加

    startWithArray()

    在一个观察者发送事件前,追加发送一个新的被观察者。

    统计发送事件数量

    count()

    统计被观察者发送事件的数量

    返回结果 = Long类型

    1. concat() / concatArray()

    组合多个被观察者一起发送数据,合并后按发送顺序串行执行,区别:组合被观察者数量,concat() ≤4个, concatArray()>4个。

    1. Observable.concat(Observable.just(1),
    2. Observable.just(2),
    3. Observable.just(3))
    4. .subscribe(new Consumer<Integer>() {
    5. @Override
    6. public void accept(Integer integer) throws Exception {
    7. System.out.println("收到事件"+integer);
    8. }
    9. });
    10. //执行结果
    11. 收到事件1
    12. 收到事件2
    13. 收到事件3

    2. merge() / mergeArray()

    组合多个被观察者一起发送数据,合并后按时间线并行执行

    1. Observable.merge(Observable.just(1),
    2. Observable.just(2),
    3. Observable.just(3))
    4. .subscribe(new Consumer<Integer>() {
    5. @Override
    6. public void accept(Integer integer) throws Exception {
    7. System.out.println("收到事件"+integer);
    8. }
    9. });
    10. //执行结果
    11. 收到事件1
    12. 收到事件2
    13. 收到事件3

    3. concatDelayError() / mergeDelayError()

    将错误事件延迟到发送事件完毕后再继续执行;

    1. Observable.concatArrayDelayError(
    2. Observable.create(new ObservableOnSubscribe<Integer>() {
    3. @Override
    4. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    5. emitter.onNext(1);
    6. emitter.onNext(2);
    7. emitter.onNext(3);
    8. emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
    9. emitter.onComplete();
    10. }
    11. }),
    12. Observable.just(4, 5, 6))
    13. .subscribe(new Observer<Integer>() {
    14. @Override
    15. public void onSubscribe(Disposable d) {
    16. }
    17. @Override
    18. public void onNext(Integer value) {
    19. System.out.println("接收到了事件"+ value );
    20. }
    21. @Override
    22. public void onError(Throwable e) {
    23. System.out.println("对Error事件作出响应" );
    24. }
    25. @Override
    26. public void onComplete() {
    27. System.out.println("对Complete事件作出响应");
    28. }
    29. });

    4. Zip()

    • 合并多个观察者发送的事件,生成一个新的事件序列(组合过的事件序列),并最终。zip操作符返回一个Obversable,

    • 它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。

    • 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

    • zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,

    • 它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。

    1. Observable.zip(Observable.just(1, 2, 3),
    2. Observable.just(4, 5),
    3. new BiFunction<Integer, Integer, Integer>() {
    4. @NonNull
    5. @Override
    6. public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
    7. return integer + integer2;
    8. }
    9. })
    10. .subscribe(new Consumer<Integer>() {
    11. @Override
    12. public void accept(Integer integer) throws Exception {
    13. System.out.println("Next=>" + integer);
    14. }
    15. });
    16. //执行结果
    17. Next=>5
    18. Next=>7

    5. combineLatest()

    combineLatest 操作符的行为类似于 zip,但是只有当原始的 Observable 中的每一个都发射了一条数据时 zip 才发射数据,而 combineLatest 是当原始的 Observable 中任意一个发射了数据时就发射一条数据。当原始 Observable 的任何一个发射了一条数据时, combineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

    1. Observable.combineLatest(
    2. Observable.just(1, 3, 5),
    3. Observable.just(2, 4, 6),
    4. new BiFunction<Integer, Integer, Integer>() {
    5. @Override
    6. public Integer apply(Integer integer, Integer integer2) throws Exception {
    7. Log.d(TAG, "integer: " + integer + " ## integer2: " + integer2);
    8. return integer + integer2;
    9. }
    10. }
    11. ).subscribe(new Consumer<Integer>() {
    12. @Override
    13. public void accept(Integer integer) throws Exception {
    14. Log.d(TAG, "Next: " + integer);
    15. }
    16. }, new Consumer<Throwable>() {
    17. @Override
    18. public void accept(Throwable throwable) throws Exception {
    19. Log.d(TAG, "Error: " + throwable);
    20. }
    21. }, new Action() {
    22. @Override
    23. public void run() throws Exception {
    24. Log.d(TAG, "Complete.");
    25. }
    26. });
    27. // 执行结果
    28. integer: 5 ## integer2: 2
    29. Next: 7
    30. integer: 5 ## integer2: 4
    31. Next: 9
    32. integer: 5 ## integer2: 6
    33. Next: 11
    34. Complete.

    6. startWith() / startWithArray()

    在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者

    1. Observable.just(1, 2, 3)
    2. .startWith(0)
    3. .startWithArray(4, 5, 6)
    4. .subscribe(new Consumer<Integer>() {
    5. @Override
    6. public void accept(Integer integer) throws Exception {
    7. System.out.println("发送的事件数量=>" + integer);
    8. }
    9. });
    10. //执行结果
    11. 发送的事件数量=>4
    12. 发送的事件数量=>5
    13. 发送的事件数量=>6
    14. 发送的事件数量=>0
    15. 发送的事件数量=>1
    16. 发送的事件数量=>2
    17. 发送的事件数量=>3

    7. count()

    统计被观察者发送事件的数量

    1. Observable.just(1, 2, 3)
    2. .count()
    3. .subscribe(new Consumer() {
    4. @Override
    5. public void accept(Long aLong) throws Exception {
    6. System.out.println("发送的事件数量=>" + aLong);
    7. }
    8. });
    9. // 执行结果
    10. 发送的事件数量=>3

    参考资料: 合并操作符 合并操作符2

    二. 条件操作符布尔操作符

    • amb :给定多个 Observable ,只让第一个发射数据的 Observable 发射全部数据

    • defaultlfEmpty :发射来自原始 Observable 的数据,如果原始 Observable 没有发射数据,则发射一个默认数据

    • skipUntil :丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据

    • skipWhile :丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据

    • takeUntil :发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知

    • takeWhile and takeWhileWithIndex:发射原始 Observable 的数据,直到一个特定的条件为真,然后跳过剩余的数据

    • RxJava 的布尔操作符主要包括:

    • all :判断是否所有的数据项都满足某个条件

    • contains :判断 Observable 是否会发射一个指定的值

    • exists and isEmpty :判断 Observable 是否发射了一个值

    • sequenceEqual :判断两个 Observables 发射的序列是否相等

    参考资料: 条件操作符

  • 相关阅读:
    探讨javascript的程序性能
    【day9.30】消息队列实现进程间通信
    ADB 命令结合 monkey 的简单使用,超详细
    JNA java调用dll
    相机位置和朝向计算(世界坐标系下)
    Java面向对象,全程无废话,偏实战
    OpenSM-QoS管理
    【无标题】
    Grub启动Linux引导到BIOS问题的解决
    UI自动化的适用场景,怎么做?
  • 原文地址:https://blog.csdn.net/x910131/article/details/126071044