• 2. RxJava创建操作符


    1.create

    使用一个函数从头开始创建一个Observable

    你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,

    编写这个函数让它的行为表现为一个Observable--恰当的调用观察者的onNext, onError 和 onCompleted方法;

    一个形式正确的有限Observable必须尝试调用观察者的onCompleted()正好一次或者它的onError()正好一次,

    且此后不能再调用观察者的任何其他方法;RxJava将这个操作符实现为create方法,要先检查一下观察者的isDispose状态,

    以便在没有观察者的时候,让你的Observable停止发射数据,防止运行昂贵的运算。

    示例代码:

    1. Disposable disposable;
    2. public static void useCreate(){
    3. disposable = Observable.create(new ObservableOnSubscribe() {
    4. @Override
    5. public void subscribe(ObservableEmitter emitter) throws Exception {
    6. try {
    7. if (!emitter.isDisposed()) {
    8. emitter.onNext("Hello");
    9. emitter.onNext("Hi");
    10. emitter.onNext("Aloha");
    11. emitter.onComplete();
    12. }
    13. } catch (Exception e) {
    14. emitter.onError(e);
    15. }
    16. }
    17. }).subscribe(new Consumer() {
    18. @Override
    19. public void accept(String s) throws Exception {
    20. System.out.println("onNext-> " + s);
    21. }
    22. }, new Consumer() {
    23. @Override
    24. public void accept(Throwable throwable) throws Exception {
    25. System.out.println("onError-> " + throwable.getMessage());
    26. }
    27. }, new Action() {
    28. @Override
    29. public void run() throws Exception {
    30. System.out.println("onComplete.");
    31. }
    32. });
    33. }
    34. @Override
    35. protected void onDestroy() {
    36. super.onDestroy();
    37. if (disposable!=null && !disposable.isDisposed()) {
    38. disposable.dispose();
    39. }
    40. }
    41. //运行结果
    42. onNext-> Hello
    43. onNext-> Hi
    44. onNext-> Aloha
    45. onComplete.

    2.just

    创建一个发射指定值的Observable

    将单个数据转换为一个会发送这些对象的Observable

    just 类似于 from,但是 from 会将数组或 Iterable 的数据取出然后逐个发射,而 just 只是简单地原样发射,将数组或 Iterable 当作单个数据。它可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的 Observable

    注意:如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。

    1. private static void testJust() {
    2. Disposable subscribe = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    3. .subscribe(integer -> System.out.println("Next: " + integer),
    4. throwable -> System.out.println("Error: " + throwable.getMessage()),
    5. () -> System.out.println("complete()"));
    6. }
    7. //执行结果
    8. Next: 1
    9. Next: 2
    10. Next: 3
    11. Next: 4
    12. Next: 5
    13. Next: 6
    14. Next: 7
    15. Next: 8
    16. Next: 9
    17. Next: 10
    18. complete()

    1. private static void testJust2() {
    2. List s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
    3. Disposable subscribe = Observable.just(s)
    4. .subscribe(strings -> {
    5. for (String s1 : strings) {
    6. System.out.println("Next: " + s1);
    7. }
    8. }, throwable -> System.out.println("Error: " + throwable.getMessage()),
    9. () -> System.out.println("complete()"));
    10. }
    11. //执行结果
    12. Next: Java
    13. Next: Android
    14. Next: Ruby
    15. Next: Ios
    16. Next: Swift
    17. complete()

    3.from

    将其他的对象或数据结构类型转换为Observable

    当我们使用Observable时,如果要处理的数据都可以转换成Observables.而不是需要混合使用Observables和其他类型的数据,

    会非常方便,这让我们在数据流的整个生命周期中,可以使用一组统一的操作符来管理它们。

    例如,Iterable可以看成同步的Observable,Future可以看成总是只发射单个数据的Observable,通过显式地将那些数据转换为Observables,我们可以像使用Observable样与它们交互;对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。因此,大部分Rx实现都提供了将特定语言的对象和数据结构转换为Observables的方法。

    1> fromArray

    这个方法和just类似,只不过fromArray可以传入多于10个的变量,并可传入数组。

    1. private static void testFromArray() {
    2. List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
    3. Observable.fromArray(s).subscribe(strings -> {
    4. for (String s1 : strings) {
    5. System.out.println("Next: " + s1);
    6. }
    7. });
    8. }
    9. //执行结果
    10. Next: Java
    11. Next: Android
    12. Next: Ruby
    13. Next: Ios
    14. Next: Swift

    2>fromIterable

    直接发送一个List 集合数据给观察者,产生的Observable会发射数组的每一项数据

    1. private void testFromIterable() {
    2. List s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
    3. Observable.fromIterable(s).subscribe(
    4. s1 -> System.out.println("Next: " + s1),
    5. throwable -> System.out.println("Error: " + throwable.getMessage()),
    6. () -> System.out.println("onComplete"));
    7. }
    8. //执行结果
    9. Next: Java
    10. Next: Android
    11. Next: Ruby
    12. Next: Ios
    13. Next: Swift
    14. onComplete

    3>fromFuture

    参数中的Future是java.util.concurrent中的Future,Future的作用是增加了cancel()等方法操作Callable,它可以通过get()方法来获取Callable返回的值

    1. private void testFromFuture() {
    2. ExecutorService executorService = Executors.newCachedThreadPool();
    3. Future future = executorService.submit(new MyCallable());
    4. Observable.fromFuture(future).subscribe(s -> System.out.println("Next: " + s));
    5. //fromFuture方法有一个可接收两个可选参数的版本,分别指定超时时长和时间单位,如果过了指定的时间,
    6. //Future 还没返回一个值,那么这个Observable就会发射错误通知并终止,
    7. Observable.fromFuture(future,1, TimeUnit.SECONDS).subscribe(s -> System.out.println("Next: " + s)
    8. , throwable -> System.out.println(throwable.getMessage()),
    9. () -> System.out.println("onCompleted"));
    10. }
    11. private class MyCallable implements Callable<String> {
    12. @Override
    13. public String call() throws Exception {
    14. System.out.println("模拟一些耗时的任务...");
    15. Thread.sleep(2000L);
    16. return "Ok";
    17. }
    18. }
    19. //执行结果
    20. 模拟一些耗时的任务...
    21. Next: Ok

    4.empty/never/throw

    1>empty

    创建一个不发射任何数据但是正常终止的Observable

    2>never

    创建一个不发射数据也不终止的Observable

    3>throw

    创建一个不发射数据以一个错误终止的Observable

    这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

    RxJava将这些操作符实现为 empty,never和error。error操作符需要一个Throwable参数,你的Observable会以此终止。这些操作符默认不在任何特定的调度器上执行,但是empty和error有一个可选参数是Scheduler,如果你传递了Scheduler参数,它们会在这个调度器上发送通知。

    5.defer

    直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个全新的 Observable

    defer 操作符会一直等待直到有观察者订阅它,然后它使用 Observable 工厂方法生成一个 Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable ,但事实上每个订阅者获取的是它们自己单独的数据序列。

    在某些情况下,直到最后一分钟(订阅发生时)才生成 Observable ,以确保 Observable 包含最新的数据。

    1. private void testDefer() {
    2. //返回一个Observable
    3. Observable<String> observable = Observable.defer(new Callable extends String>>() {
    4. @Override
    5. public ObservableSource extends String> call() throws Exception {
    6. return Observable.just("hello defer");
    7. }
    8. });
    9. observable.subscribe(new Consumer<String>() {
    10. @Override
    11. public void accept(String s) throws Exception {
    12. System.out.println("Next: "+s);
    13. }
    14. });
    15. }
    16. //
    17. Next: hello defer

    6.timer

    创建在一个指定延迟之后发射单个数据的Observable

    timer 操作符创建一个在给定的时间段之后返回一个特殊值的Observable

    timer 返回一个Observable, 它在延迟一段给定的时间后发射一个简单的数字0,timer操作符默认在computation调度器上执行。

    1. Observable.timer(2, TimeUnit.SECONDS)
    2. .subscribe(new Consumer() {
    3. @Override
    4. public void accept(Long aLong) throws Exception {
    5. // 2秒后打印
    6. Log.d(TAG, "Next: " + aLong);
    7. }
    8. });
    9. // 执行结果
    10. Next: 0

    7.Interval

    创建一个按固定的时间间隔发射一个无限递增的整数序列的Observable。

    interval接受一个表示时间间隔的参数和一个表示时间单位的参数,interval默认在computation调度器上执行。

    1. Disposable subscribe = Observable.interval(1, TimeUnit.SECONDS)
    2. .subscribe(new Consumer() {
    3. @Override
    4. public void accept(Long aLong) throws Exception {
    5. String msg = "Next" + aLong;
    6. Log.d(TAG, msg);
    7. tv.setText(msg);
    8. }
    9. });

    8.range

    创建一个发射指定范围的整数序列的Observable

    它接受两个参数,一个是范围的起始值,一是范围的数据的数目,如果将第二个值设置为0,将导致Observable不发射任何数据(若设置为负数会报异常)。rang默认不在任何特定的调度器上执行,有一个变体可通过可选参数指定Scheduler.

    1. Observable.range(2, 5).subscribe(new Consumer<Integer>() {
    2. @Override
    3. public void accept(Integer integer) throws Exception {
    4. System.out.println(integer);
    5. }
    6. });
    7. // 执行结果
    8. 2
    9. 3
    10. 4
    11. 5
    12. 6

    9.repeat

    它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的或者通过repeat(n)指定重复次数

    repeat重复地发射数据,某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数;

    1. Observable.just("hello","repeat").repeat(3).subscribe(new Consumer<String>() {
    2. @Override
    3. public void accept(String s) throws Exception {
    4. System.out.println("Next" + s);
    5. }
    6. });
    7. //执行结果
    8. Next: hello
    9. Next: repeat
    10. Next: hello
    11. Next: repeat
    12. Next: hello
    13. Next: repeat
  • 相关阅读:
    攻防世界Reverse三星题 zorropub
    前端基础入门之JS BOM简介
    【图论】图深度学习读书笔记:绪论
    idea中mapper直接跳转到xml的插件
    【uni-app】小程序往缓存里添加对象并读取数据
    Steger算法实现结构光光条中心提取(python版本)
    一文搞懂什么是单点登录(SSO)及它的优缺点
    C语言初学者必学必会的C语言必背100代码
    国产AD+全志T3开发案例,为能源电力行业排忧解难!8/16通道
    开源免费缺陷管理工具:对比6款
  • 原文地址:https://blog.csdn.net/x910131/article/details/126070884