过滤操作符主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。

通过设置指定的条件,仅发送符合条件的事件
filter(Func1)用来过滤观测序列中我们不想要的值,制返回满足条件的值
- Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
- //1.发送3个事件
- emitter.onNext(1);
- emitter.onNext(2);
- emitter.onNext(3);
- }).filter(integer -> {
- //2.根据返回的integer值,采用filter对被观察者发送的事件进行过滤 & 筛选
- //a.返回true, 则继续发送
- //b.返回false, 则不发送(即过滤)
- return integer <3; // 本例中过滤了整数≥3的数据并重写发送给观察者
- }).subscribe(integer -> {
- //3.接收最新被观察者发送来的数据
- System.out.println("最终接收的数据: " + integer);
- });
- //执行结果
- 最终接收的数据: 1
- 最终接收的数据: 2
它是filter的特殊形式,用于过滤 特定数据类型 的数据
- Observable.just(1, "3", true, 0.2f).ofType(Float.class) //筛选出浮点型数据
- .subscribe(aFloat -> System.out.println("获取到的浮点型事件元素是: " + aFloat));
- //执行结果
- 获取到的浮点型事件元素是: 0.2
skip(int)让我们可以忽略Observable发射的前n项数据。只保留之后的数据。
使用 skipLast() 操作符修改原始Observable,你可以忽略Observable'发射的后N项数据,只保留前面的数据。
- //使用1: 根据顺序跳过数据项
- Observable.just(1, "3", true, 0.2f)
- .skip(1) //跳过前1项
- .skipLast(2) //跳过后2项
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 3
-
-
- // 使用2:根据时间跳过数据项
- // 发送事件特点:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
- Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
- .skip(1, TimeUnit.SECONDS) // 跳过第1s发送的数据
- .skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
- .subscribe(along -> System.out.println("Time 获取到的整型事件元素是: " + along));
- try {
- Thread.sleep(8000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- //运行结果
- Time 获取到的整型事件元素是: 2
- Time 获取到的整型事件元素是: 3
过滤事件序列中重复的事件 / 连续重复的事件
distinct()的过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次。
参数中的Func1中的call方法会根据Observable发射的值生成一个Key,然后比较这个key来判断两个数据是不是相同;如果判定为重复则会和distinct()一样过滤掉重复的数据项。
- Observable.just(1, 2, 3, 1, 4)
- .distinct()
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 1
- 获取到的事件元素是: 2
- 获取到的事件元素是: 3
- 获取到的事件元素是: 4
distinctUntilChanged()它只判定Observable发射的当前数据项和前一个数据项是否相同。
- Observable.just(1, 2, 3, 1, 4)
- .distinctUntilChanged()
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 1
- 获取到的事件元素是: 2
- 获取到的事件元素是: 3
- 获取到的事件元素是: 1
- 获取到的事件元素是: 4
通过设置指定的事件数量,仅发送特定数量的事件
take(int)用一个整数n作为一个参数,只返回原始的序列中前N项数据,然后发射完成通知,忽略剩余的数据。
- Observable.just(1, 2, 3, 1, 4)
- .take(1)
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 1
takeLast(int)用一个整数n作为一个参数,只返回原始的序列中后N项数据,然后发射完成通知,忽略前面的数据。
- Observable.just(1, 2, 3, 1, 4)
- .takeLast(2)
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 1
- 获取到的事件元素是: 4
通过设置指定的时间,仅发送在该时间内的事件
在某段时间内,只发送该段时间内第1次事件 / 最后1次事件
如,1段时间内连续点击按钮,但只执行第1次的点击操作
- //在某一时段内,只发送该段时间内第1次事件
- Observable.create((ObservableOnSubscribe<Integer>) e -> {
- //隔段事件发送时间
- e.onNext(1);
- Thread.sleep(500);
- e.onNext(2);
- Thread.sleep(400);
- e.onNext(3);
- Thread.sleep(300);
- e.onNext(4);
- Thread.sleep(300);
- e.onNext(5);
- Thread.sleep(500);
- }).throttleFirst(1, TimeUnit.SECONDS) //每1秒采用第一条数据
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 1
- 获取到的事件元素是: 4
- Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
- //隔段事件发送时间
- e.onNext(1);
- Thread.sleep(500);
- e.onNext(2);
- Thread.sleep(400);
- e.onNext(3);
- Thread.sleep(300);
- e.onNext(4);
- Thread.sleep(300);
- e.onNext(5);
- Thread.sleep(500);
- }
- }).throttleLast(1, TimeUnit.SECONDS) //每1秒采用最后1条数据
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //执行结果
- 获取到的事件元素是: 3
- 获取到的事件元素是: 5
在某段时间内,只发送该段时间内最新(最后)1次事件,与 throttleLast() 操作符类似,仅需要把上文的 throttleLast() 改成 Sample() 操作符即可;
发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据debounce(long, TimeUnit)过滤掉了由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。通常我们用来结合RxBing(Jake Wharton大神使用RxJava封装的Android UI组件)使用,防止button重复点击.
- //在某一时段内,只发送该段时间内最后1次事件
- Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(@NonNull ObservableEmitter
e) throws Exception { - //隔段事件发送时间
- // 隔段事件发送时间
- e.onNext(1);
- Thread.sleep(500);
- e.onNext(2); // 1和2之间的间隔小于指定时间1s,所以前1次数据(1)会被抛弃,2会被保留
- Thread.sleep(1500); // 因为2和3之间的间隔大于指定时间1s,所以之前被保留的2事件将发出
- e.onNext(3);
- Thread.sleep(1500); // 因为3和4之间的间隔大于指定时间1s,所以3事件将发出
- e.onNext(4);
- Thread.sleep(500); // 因为4和5之间的间隔小于指定时间1s,所以前1次数据(4)会被抛弃,5会被保留
- e.onNext(5);
- Thread.sleep(500); // 因为5和6之间的间隔小于指定时间1s,所以前1次数据(5)会被抛弃,6会被保留
- e.onNext(6);
- Thread.sleep(1500); // 因为6和Complete实践之间的间隔大于指定时间1s,所以之前被保留的6事件将发出
- e.onComplete();
- }
- }).debounce(1, TimeUnit.SECONDS) //每1秒采用最后1条数据 换成 throttleWithTimeout
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
-
-
- //执行结果
- 获取到的事件元素是: 2
- 获取到的事件元素是: 3
- 获取到的事件元素是: 6
仅选取第1个元素 / 最后一个元素
- Observable.just(1, 2, 3, 4)
- .firstElement() //获取第一个元素 .lastElement 获取最后一个元素
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
elementAt(int)用来获取元素Observable发射的事件序列中的第n项数据,并当做唯一的数据发射出去
- Observable.just(1, 2, 3, 4)
- .elementAt(2)
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
在elementAt()的基础上,当出现越界情况(即获取的位置索引 > 发送事件序列长度)时,即抛出异常
- Observable.just(1, 2, 3, 4)
- .elementAtOrError(5)
- .subscribe(element -> System.out.println("获取到的事件元素是: " + element));
- //返回结果
- io.reactivex.exceptions.OnErrorNotImplementedException