类型 | 作用 | 特点 | 适用场景 | |
组合多个被观察者 | concat() / concatArray() | 组合多个被观察者一起发送数据, 合并后按发送顺序串行执行; | 二者区别:组合被观察者数量,即: concat() 组合被观察者数量≤4个 concatArray()则可>4个。 | 读取缓存数据, 合并数据源 并 同时展示 联合处理
|
merge() / mergeArray() | 组合多个被观察者一起发送数据, 合并后按时间线并行执行 | 二者区别:组合被观察者数量,即: merge()组合被观察者数量≤4个 mergeArray()则可>4个。 | ||
concatDelayError() mergeDelayError() | 将错误事件延迟到发送事件完毕后 再继续执行; |
| ||
合并多个事件 | zip() | 合并多个观察者发送的事件,生成一个新的事件序列(组合过的事件序列),并最终发送。 | 事件的组合方式=严格按照原先事件顺序进行对位合并,最终合并的事件数量 = 多Observable中数量最少的数量 | |
combineLatest() | 当两个观察者中任何一个发送了数据后,将先发送了数据的观察者的最新数据 与另一观察者发送的每个数据结合,最终基于该函数的结构发送数据 | 与 Zip()区别: Zip(): 按个数合并,即1对1合并; CombineLates():按同一时间点合并 | ||
combineLaststDelayError | 类似concatDelayError 错误处理 |
| ||
reduce() | 把被观察者需要发送的事件聚合成一个事件,并发送 | 聚合的逻辑根据需求撰写, 但本质 = 前2个数据聚合,然后与后1个数据继续聚合,以此类推. | ||
collect() | 将观察者发送的数据事件收集到一个数据结构里. | |||
发送前追加发送事件 | startWith() | 在一个被观察者发送事件前追加发送一些数据; | 追加数据顺序 = 后调用先追加 | |
startWithArray() | 在一个观察者发送事件前,追加发送一个新的被观察者。 | |||
统计发送事件数量 | count() | 统计被观察者发送事件的数量 | 返回结果 = Long类型 |
组合多个被观察者一起发送数据,合并后按发送顺序串行执行,区别:组合被观察者数量,concat() ≤4个, concatArray()>4个。
- Observable.concat(Observable.just(1),
- Observable.just(2),
- Observable.just(3))
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- System.out.println("收到事件"+integer);
- }
- });
- //执行结果
- 收到事件1
- 收到事件2
- 收到事件3
组合多个被观察者一起发送数据,合并后按时间线并行执行
- Observable.merge(Observable.just(1),
- Observable.just(2),
- Observable.just(3))
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- System.out.println("收到事件"+integer);
- }
- });
- //执行结果
- 收到事件1
- 收到事件2
- 收到事件3
将错误事件延迟到发送事件完毕后再继续执行;
- Observable.concatArrayDelayError(
- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
-
-
- emitter.onNext(1);
- emitter.onNext(2);
- emitter.onNext(3);
- emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
- emitter.onComplete();
- }
- }),
- Observable.just(4, 5, 6))
- .subscribe(new Observer<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
-
-
- }
- @Override
- public void onNext(Integer value) {
- System.out.println("接收到了事件"+ value );
- }
-
-
- @Override
- public void onError(Throwable e) {
- System.out.println("对Error事件作出响应" );
- }
-
-
- @Override
- public void onComplete() {
- System.out.println("对Complete事件作出响应");
- }
- });
-
合并多个观察者发送的事件,生成一个新的事件序列(组合过的事件序列),并最终发送。zip操作符返回一个Obversable,
它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。
它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,
它可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables。
- Observable.zip(Observable.just(1, 2, 3),
- Observable.just(4, 5),
- new BiFunction<Integer, Integer, Integer>() {
- @NonNull
- @Override
- public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
- return integer + integer2;
- }
- })
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- System.out.println("Next=>" + integer);
- }
- });
- //执行结果
- Next=>5
- Next=>7
combineLatest 操作符的行为类似于 zip,但是只有当原始的 Observable 中的每一个都发射了一条数据时 zip 才发射数据,而 combineLatest 是当原始的 Observable 中任意一个发射了数据时就发射一条数据。当原始 Observable 的任何一个发射了一条数据时, combineLatest 使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
- Observable.combineLatest(
- Observable.just(1, 3, 5),
- Observable.just(2, 4, 6),
- new BiFunction<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer integer, Integer integer2) throws Exception {
- Log.d(TAG, "integer: " + integer + " ## integer2: " + integer2);
- return integer + integer2;
- }
- }
- ).subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- Log.d(TAG, "Next: " + integer);
- }
- }, new Consumer<Throwable>() {
- @Override
- public void accept(Throwable throwable) throws Exception {
- Log.d(TAG, "Error: " + throwable);
- }
- }, new Action() {
- @Override
- public void run() throws Exception {
- Log.d(TAG, "Complete.");
- }
- });
-
-
-
-
- // 执行结果
- integer: 5 ## integer2: 2
- Next: 7
- integer: 5 ## integer2: 4
- Next: 9
- integer: 5 ## integer2: 6
- Next: 11
- Complete.
在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
- Observable.just(1, 2, 3)
- .startWith(0)
- .startWithArray(4, 5, 6)
- .subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- System.out.println("发送的事件数量=>" + integer);
- }
- });
- //执行结果
- 发送的事件数量=>4
- 发送的事件数量=>5
- 发送的事件数量=>6
- 发送的事件数量=>0
- 发送的事件数量=>1
- 发送的事件数量=>2
- 发送的事件数量=>3
统计被观察者发送事件的数量
- Observable.just(1, 2, 3)
- .count()
- .subscribe(new Consumer
() { - @Override
- public void accept(Long aLong) throws Exception {
- System.out.println("发送的事件数量=>" + aLong);
- }
- });
- // 执行结果
- 发送的事件数量=>3
amb :给定多个 Observable ,只让第一个发射数据的 Observable 发射全部数据
defaultlfEmpty :发射来自原始 Observable 的数据,如果原始 Observable 没有发射数据,则发射一个默认数据
skipUntil :丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据
skipWhile :丢弃原始 Observable 发射的数据,直到一个特定的条件为假,然后发射原始 Observable 剩余的数据
takeUntil :发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知
takeWhile and takeWhileWithIndex:发射原始 Observable 的数据,直到一个特定的条件为真,然后跳过剩余的数据
RxJava 的布尔操作符主要包括:
all :判断是否所有的数据项都满足某个条件
contains :判断 Observable 是否会发射一个指定的值
exists and isEmpty :判断 Observable 是否发射了一个值
sequenceEqual :判断两个 Observables 发射的序列是否相等
参考资料: 条件操作符