使用一个函数从头开始创建一个Observable
你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,
编写这个函数让它的行为表现为一个Observable--恰当的调用观察者的onNext, onError 和 onCompleted方法;
一个形式正确的有限Observable必须尝试调用观察者的onCompleted()正好一次或者它的onError()正好一次,
且此后不能再调用观察者的任何其他方法;RxJava将这个操作符实现为create方法,要先检查一下观察者的isDispose状态,
以便在没有观察者的时候,让你的Observable停止发射数据,防止运行昂贵的运算。
示例代码:
- Disposable disposable;
- public static void useCreate(){
- disposable = Observable.create(new ObservableOnSubscribe
() { - @Override
- public void subscribe(ObservableEmitter
emitter) throws Exception { - try {
- if (!emitter.isDisposed()) {
- emitter.onNext("Hello");
- emitter.onNext("Hi");
- emitter.onNext("Aloha");
- emitter.onComplete();
- }
- } catch (Exception e) {
- emitter.onError(e);
- }
- }
- }).subscribe(new Consumer
() { - @Override
- public void accept(String s) throws Exception {
- System.out.println("onNext-> " + s);
- }
- }, new Consumer
() { - @Override
- public void accept(Throwable throwable) throws Exception {
- System.out.println("onError-> " + throwable.getMessage());
- }
- }, new Action() {
- @Override
- public void run() throws Exception {
- System.out.println("onComplete.");
- }
- });
- }
- @Override
- protected void onDestroy() {
- super.onDestroy();
- if (disposable!=null && !disposable.isDisposed()) {
- disposable.dispose();
- }
- }
- //运行结果
- onNext-> Hello
- onNext-> Hi
- onNext-> Aloha
- onComplete.
-
-
创建一个发射指定值的Observable
将单个数据转换为一个会发送这些对象的Observable
just 类似于 from,但是 from 会将数组或 Iterable 的数据取出然后逐个发射,而 just 只是简单地原样发射,将数组或 Iterable 当作单个数据。它可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的 Observable
注意:如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。
- private static void testJust() {
- Disposable subscribe = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- .subscribe(integer -> System.out.println("Next: " + integer),
- throwable -> System.out.println("Error: " + throwable.getMessage()),
- () -> System.out.println("complete()"));
- }
- //执行结果
- Next: 1
- Next: 2
- Next: 3
- Next: 4
- Next: 5
- Next: 6
- Next: 7
- Next: 8
- Next: 9
- Next: 10
- complete()
- private static void testJust2() {
- List
s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift"); - Disposable subscribe = Observable.just(s)
- .subscribe(strings -> {
- for (String s1 : strings) {
- System.out.println("Next: " + s1);
- }
- }, throwable -> System.out.println("Error: " + throwable.getMessage()),
- () -> System.out.println("complete()"));
- }
- //执行结果
- Next: Java
- Next: Android
- Next: Ruby
- Next: Ios
- Next: Swift
- complete()
-
-
将其他的对象或数据结构类型转换为Observable
当我们使用Observable时,如果要处理的数据都可以转换成Observables.而不是需要混合使用Observables和其他类型的数据,
会非常方便,这让我们在数据流的整个生命周期中,可以使用一组统一的操作符来管理它们。
例如,Iterable可以看成同步的Observable,Future可以看成总是只发射单个数据的Observable,通过显式地将那些数据转换为Observables,我们可以像使用Observable样与它们交互;对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。因此,大部分Rx实现都提供了将特定语言的对象和数据结构转换为Observables的方法。
1> fromArray
这个方法和just类似,只不过fromArray可以传入多于10个的变量,并可传入数组。
- private static void testFromArray() {
- List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");
- Observable.fromArray(s).subscribe(strings -> {
- for (String s1 : strings) {
- System.out.println("Next: " + s1);
- }
- });
- }
- //执行结果
- Next: Java
- Next: Android
- Next: Ruby
- Next: Ios
- Next: Swift
-
-
2>fromIterable
直接发送一个List 集合数据给观察者,产生的Observable会发射数组的每一项数据
- private void testFromIterable() {
- List
s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift"); - Observable.fromIterable(s).subscribe(
- s1 -> System.out.println("Next: " + s1),
- throwable -> System.out.println("Error: " + throwable.getMessage()),
- () -> System.out.println("onComplete"));
- }
- //执行结果
- Next: Java
- Next: Android
- Next: Ruby
- Next: Ios
- Next: Swift
- onComplete
3>fromFuture
参数中的Future是java.util.concurrent中的Future,Future的作用是增加了cancel()等方法操作Callable,它可以通过get()方法来获取Callable返回的值
- private void testFromFuture() {
- ExecutorService executorService = Executors.newCachedThreadPool();
- Future
future = executorService.submit(new MyCallable()); - Observable.fromFuture(future).subscribe(s -> System.out.println("Next: " + s));
- //fromFuture方法有一个可接收两个可选参数的版本,分别指定超时时长和时间单位,如果过了指定的时间,
- //Future 还没返回一个值,那么这个Observable就会发射错误通知并终止,
- Observable.fromFuture(future,1, TimeUnit.SECONDS).subscribe(s -> System.out.println("Next: " + s)
- , throwable -> System.out.println(throwable.getMessage()),
- () -> System.out.println("onCompleted"));
- }
- private class MyCallable implements Callable<String> {
- @Override
- public String call() throws Exception {
- System.out.println("模拟一些耗时的任务...");
- Thread.sleep(2000L);
- return "Ok";
- }
- }
- //执行结果
- 模拟一些耗时的任务...
- Next: Ok
1>empty
创建一个不发射任何数据但是正常终止的Observable
2>never
创建一个不发射数据也不终止的Observable
3>throw
创建一个不发射数据以一个错误终止的Observable
这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。
RxJava将这些操作符实现为 empty,never和error。error操作符需要一个Throwable参数,你的Observable会以此终止。这些操作符默认不在任何特定的调度器上执行,但是empty和error有一个可选参数是Scheduler,如果你传递了Scheduler参数,它们会在这个调度器上发送通知。
直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个全新的 Observable
defer 操作符会一直等待直到有观察者订阅它,然后它使用 Observable 工厂方法生成一个 Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable ,但事实上每个订阅者获取的是它们自己单独的数据序列。
在某些情况下,直到最后一分钟(订阅发生时)才生成 Observable ,以确保 Observable 包含最新的数据。
- private void testDefer() {
- //返回一个Observable
- Observable<String> observable = Observable.defer(new Callable
extends String>>() { - @Override
- public ObservableSource extends String> call() throws Exception {
- return Observable.just("hello defer");
- }
- });
- observable.subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- System.out.println("Next: "+s);
- }
- });
- }
- //
- Next: hello defer
创建在一个指定延迟之后发射单个数据的Observable
timer 操作符创建一个在给定的时间段之后返回一个特殊值的Observable
timer 返回一个Observable, 它在延迟一段给定的时间后发射一个简单的数字0,timer操作符默认在computation调度器上执行。
- Observable.timer(2, TimeUnit.SECONDS)
- .subscribe(new Consumer
() { - @Override
- public void accept(Long aLong) throws Exception {
- // 2秒后打印
- Log.d(TAG, "Next: " + aLong);
- }
- });
-
-
- // 执行结果
- Next: 0
创建一个按固定的时间间隔发射一个无限递增的整数序列的Observable。
interval接受一个表示时间间隔的参数和一个表示时间单位的参数,interval默认在computation调度器上执行。
- Disposable subscribe = Observable.interval(1, TimeUnit.SECONDS)
- .subscribe(new Consumer
() { - @Override
- public void accept(Long aLong) throws Exception {
- String msg = "Next" + aLong;
- Log.d(TAG, msg);
- tv.setText(msg);
- }
- });
创建一个发射指定范围的整数序列的Observable
它接受两个参数,一个是范围的起始值,一是范围的数据的数目,如果将第二个值设置为0,将导致Observable不发射任何数据(若设置为负数会报异常)。rang默认不在任何特定的调度器上执行,有一个变体可通过可选参数指定Scheduler.
- Observable.range(2, 5).subscribe(new Consumer<Integer>() {
- @Override
- public void accept(Integer integer) throws Exception {
- System.out.println(integer);
- }
- });
- // 执行结果
- 2
- 3
- 4
- 5
- 6
它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的或者通过repeat(n)指定重复次数
repeat重复地发射数据,某些实现允许你重复的发射某个数据序列,还有一些允许你限制重复的次数;
- Observable.just("hello","repeat").repeat(3).subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- System.out.println("Next" + s);
- }
- });
- //执行结果
- Next: hello
- Next: repeat
- Next: hello
- Next: repeat
- Next: hello
- Next: repeat