• 合并RxJava的Observable数据流


    本文讨论几种不同方式合并RxJava的Observable数据流。

    Observable介绍

    Observable 序列,或简单称为Observable,表示异步数据流。这些概念遵循基于观察者模式,在该模式中,一个叫做观察者的对象订阅了Observable发出的数据。要使用RxJava需要增加依赖:

    
        io.reactivex
        rxjava
        1.2.5
    
    

    订阅是无阻塞的,因为观察者会对Observable未来发出的任何消息做出响应,这也又促进了并发性。下面是简单RxJava示例:

    Observable
      .from(new String[] { "John", "Doe" })
      .subscribe(name -> System.out.println("Hello " + name))
    

    合并Observable数据

    使用响应式框架编程,常见场景是合并不同的Observable数据。举例,web应用中可能需要获得两组相互独立的异步数据流。为了避免等待前面数据流完成才请求下一个数据流,我们可以同时调用,然后订阅合并两个数据流。本节讨论几种不同方式合并多个Observable数据,并区别它们之间的差异。

    Merge

    使用Merge操作可以合并多个Observable数据为一个输出结果,示例代码如下:

    @Test
    public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {
        TestSubscriber testSubscriber = new TestSubscriber<>();
    
        Observable.merge(
          Observable.from(new String[] {"Hello", "World"}),
          Observable.from(new String[] {"I love", "RxJava"})
        ).subscribe(testSubscriber);
    
        testSubscriber.assertValues("Hello", "World", "I love", "RxJava");
    }
    

    MergeDelayError

    mergeDelayError 方法与merge功能一致,但如果在合并过程中有错误发生,可以忽略错误继续合并,最后传播错误异常:

    @Test
    public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {
        TestSubscriber testSubscriber = new TestSubscriber<>();
            
        Observable.mergeDelayError(
          Observable.from(new String[] { "hello", "world" }),
          Observable.error(new RuntimeException("Some exception")),
          Observable.from(new String[] { "rxjava" })
        ).subscribe(testSubscriber);
    
        testSubscriber.assertValues("hello", "world", "rxjava");
        testSubscriber.assertError(RuntimeException.class);
    }
    

    上面示例输出结果,与没有错发发生结果一致:

    hello
    world
    rxjava
    

    注意,如果使用 merge 代替 mergeDelayError, 则字符串rxjava不会发出,因为merge遇到错误会立刻停止Observable数据流。

    zip

    zip 方法组合两个序列为成对(pair)数据序列:

    @Test
    public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {
        List zippedStrings = new ArrayList<>();
    
        Observable.zip(
          Observable.from(new String[] { "Simple", "Moderate", "Complex" }), 
          Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),
          (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);
            
        assertThat(zippedStrings).isNotEmpty();
        assertThat(zippedStrings.size()).isEqualTo(3);
        assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");
    }
    

    Zip With Interval

    下面示例给zip方法增加interrval参数,可以有效延迟第一个数据流推送数据元素:

    @Test
    public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {
        TestSubscriber testSubscriber = new TestSubscriber<>();
            
        Observable data = Observable.just("one", "two", "three", "four", "five");
        Observable interval = Observable.interval(1L, TimeUnit.SECONDS);
            
        Observable
          .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))
          .toBlocking().subscribe(testSubscriber);
            
        testSubscriber.assertCompleted();
        testSubscriber.assertValueCount(5);
        testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");
    }
    

    总结

    本文介绍几个合并RxJava的Observable数据流的方法。你还可以学习通过官方文档学习更多的方法:combineLatest, join, groupJoin, switchOnNext等。

  • 相关阅读:
    力扣每日一题:790. 多米诺和托米诺平铺 【dp动态规划】
    C++ set 的使用
    Session
    初识JavaScript
    SQL之数据库连接
    2023-2024 年适用于 Windows 电脑的顶级视频录制软件
    解决安装 RabbitMQ 安装不成功的问题
    【T+】win10/win11系统安装畅捷通T+Cloud专属云18.0
    【ArcGIS】属性表导出及乱码问题
    Windows上安装 RabbitMQ 教程
  • 原文地址:https://blog.csdn.net/neweastsun/article/details/127104394