RxJava 是一个基于 Java 的响应式编程库,用于处理异步事件流和数据流。它是由 Netflix 开发并开源,现在广泛用于 Android 和 Java 后端开发。RxJava 提供了一种用于组合和处理异步数据的丰富工具集,它的核心思想是将数据流视为一系列事件,以响应事件的方式进行处理。RxJava 提供了丰富的操作符,用于处理和转换数据流。这些操作符可以帮助你执行各种操作,包括过滤、映射、合并、变换等,以便更好地处理异步数据流。
Observable
(可观察对象)和 Observer
(观察者)。Observable
表示一个可观察的数据源,它可以发射数据项,而 Observer
用于订阅并监听这些数据项的变化。subscribeOn
和 observeOn
等操作符,可以指定在哪个线程上执行 Observable 和 Observer 的代码。onErrorReturn
、onErrorResumeNext
等,以确保应用程序能够更健壮地处理异常情况。zip
将多个数据流合并,或使用 map
转换数据项的类型。1. 响应式编程: RxJava 的核心思想是响应式编程,它允许你以一种响应事件的方式来处理数据流。你可以订阅一个数据流,然后定义事件处理的方式。当数据项到达时,它们会触发事件,观察者(Observer)会监听这些事件并执行相应的操作。
2. 基本组件: RxJava 的基本组件包括以下几个部分
3. 操作符: RxJava 提供了大量的操作符,用于操作和转换数据流。这些操作符包括 map
、filter
、flatMap
、zip
、merge
等,允许你根据需要执行各种操作。
4. 异步和并发: RxJava 简化了异步编程,允许你轻松地处理多线程和并发操作。你可以使用 subscribeOn
和 observeOn
操作符来指定代码的执行线程,以避免阻塞主线程。
5. 错误处理: RxJava 提供了多种方式来处理错误,包括 onError
、onErrorReturn
、onErrorResumeNext
等,以确保应用程序能够更健壮地处理异常情况。
6. 背压处理: RxJava 2 引入了背压处理机制,用于处理生产者和消费者之间的速率不匹配问题。背压机制允许 Observable 控制数据的发射速率,以避免内存溢出和资源泄漏。
7. 在 Android 中的应用: RxJava 在 Android 开发中广泛应用于处理异步操作,例如网络请求、数据库访问、UI事件响应等。它简化了异步编程,提高了代码的可读性和可维护性,同时提供了更好的性能和响应性。
8. 流式编程: RxJava 支持链式调用,可以将多个操作符和观察者方法连接在一起,以创建复杂的数据流处理逻辑。这使得代码更具表达力和可读性。
RxJava 的基本用法: \
Observable
,它会发射整数数据。\Observer
,用于订阅这个 Observable
。subscribe
方法将观察者订阅到 Observable
上,观察者会监听 Observable
发射的数据项和事件。 //创建一个 `Observable`,它会发射整数数据。
val observable = Observable.create(ObservableOnSubscribe {
for (i in 1..10) {
it.onNext(i)
}
it.onComplete()
})
var num: Int;
var dd : Disposable? = null
//创建一个观察者 `Observer`,用于订阅这个 `Observable`
val observer = object:Observer{
override fun onSubscribe(d: Disposable) {//当调用订阅时调用此方法
dd = d
}
override fun onNext(t: Int) {//上游发送数据时调用此方法 即当 Observable 发射数据项时调用
num = t
if (num == 5){
dd!!.dispose()
}
Log.d(TAG,"接受上游数据:$t")
}
override fun onError(e: Throwable) {// 当出现错误时调用
}
override fun onComplete() {// 当 Observable 完成时调用
}
}
//使用subscribe方法将观察者订阅到 Observable上,观察者会监听Observable发射的数据项和事件。
observable.subscribe(observer)
也可以使用方法:直接使用函数式编程,把创建后的被观察者通过订阅方法(订阅操作符)把创建观察者作为订阅方法的参数;伪代码:
Observable.create(ObservableOnSubscribe {
//上游发送数据
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
})
.subscribe(object :Observer{
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Int) {
//下游接受数据
Log.i(TAG,"接受到上游数据:$t")
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
})
Rxjava 的使用核心点就是各种操作符的使用;所以以下介绍一些常用的 RxJava 操作符的详细使用示例:
首先操作符的分类:
创建操作符:
just
:创建一个发射指定数据项的 Observable。fromArray
:从一个数组或可迭代对象中创建一个 Observable。create
:手动创建一个 Observable。range
:创建一个发射特定整数范围的 Observable。转换操作符:
map
:将数据项转换为另一种类型。flatMap
:将每个数据项映射为一个 Observable,然后将这些 Observables 合并成一个数据流。concatMap
:类似于 flatMap
,但保持原始数据项的顺序。buffer
:将数据项分组为列表,并以列表的形式发射。过滤操作符:
filter
:过滤掉不满足条件的数据项。distinct
:过滤掉重复的数据项。take
:仅发射前 N 个数据项。skip
:跳过前 N 个数据项。组合操作符:
merge
:合并多个 Observables 的数据流。zip
:将多个 Observables 的数据项按顺序一对一地合并。combineLatest
:将多个 Observables 最近的数据项合并成一个。辅助操作符:
subscribe
:用于订阅 Observable 并处理数据。observeOn
:指定观察者运行在特定的调度器上。subscribeOn
:指定 Observable 运行在特定的调度器上。debounce
:用于过滤数据流,只保留最新的数据项。delay
:延迟发射数据项。错误处理操作符:
onErrorReturn
:在遇到错误时发射一个默认值。onErrorResumeNext
:在遇到错误时切换到另一个 Observable。retry
:在遇到错误时重试操作。创建型用于创建 Observable(被观察者)实例,它们是构建数据流的起点(上游)。
1. Observable.create
: 这是最通用的创建型操作符。它允许你手动创建一个 Observable,你需要在其中定义数据项的发射逻辑。这个操作符通常用于创建自定义的 Observable。
val observable = Observable.create { emitter ->
emitter.onNext(1) //手动调用发射逻辑
emitter.onNext(2)
emitter.onComplete()//手动调用发射完成的逻辑
}
.subscribe(object :Observer{
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.d("Rxjava ","下游接受到上游的数据:" + t)
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
})
或者使用Observer的简化版Consumer
.subscribe(object :Consumer{
override fun accept(t: String?) {
Log.d("Rxjava","下游接受到上游的数据:$t")
}
})
2. Observable.just
: 用于创建一个发射指定数据项的 Observable。它可以发射多个数据项,然后自动完成发射数据项。
val observable = Observable.just(1, 3, 5,7)//just方法参数为可变参数,参数类型为泛型
.subscribe(object :Consumer{
override fun accept(t: Int) {
Log.d("Rxjava","下游接受到上游的数据:$t")
}
})
3. Observable.fromArray
: 从一个数组、可迭代对象或可变参数列表中创建一个 Observable。然后自动完成发射数据项。
val numbers = arrayOf(1, 2, 3)
val observable = Observable.fromArray(numbers)
.subscribe(object :Consumer>{
override fun accept(t: Array?) {
}
})
4. Observable.fromIterable: 从一个 Iterable 对象(如 List 或 Set)中创建一个 Observable。
val list = listOf("A", "B", "C")
val observable = Observable.fromIterable(list)
.subscribe(object :Consumer{
override fun accept(t: String?) {
}
})
5. Observable.interval
: 创建一个 Observable,定期发射一个递增的长整型数值。
Observable.interval(1, TimeUnit.SECONDS)//重载方法很多
.subscribe(object :Consumer{
override fun accept(t: Long?) {
}
})
6. Observable.range: 创建一个 Observable,发射一个指定范围内的整数序列。
Observable.range(1,6)
.subscribe(object :Consumer{
override fun accept(t: Int?) {
}
})
7. Observable.timer
: 创建一个 Observable,在指定延迟后发射一个数据项。
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(object :Consumer{
override fun accept(t: Long?) {
}
})
这些创建型操作符用于生成不同类型的 Observable,根据需求选择合适的操作符。它们是构建数据流的起点,后续可以使用各种操作符对数据流进行变换、过滤、合并等操作,以满足具体的需求。在实际应用中,你通常会根据场景选择适当的创建型操作符来构建 Observable。
RxJava 的转换操作符用于对 Observable 发射的数据流进行变换、映射和操作。它们允许你以不同的方式处理数据项,以满足特定需求。
map
操作符: map
用于将 Observable 发射的每个数据项转换为另一种数据类型。它的参数是一个函数,该函数将原始数据项转换为新的数据项。
Observable.just(1, 2, 3)
.map(object :Function{
override fun apply(t: Int?): String {
val str = t.toString()
return str
}
})
.subscribe(object:Consumer{
override fun accept(t: String?) {
Log.d(TAG, "Transformed: $t")
}
})
/*subscribe {
value ->
Log.d(TAG, "Transformed: $value")
}*/
flatMap
操作符: flatMap
用于将每个数据项映射为一个 Observable,然后将这些 Observables 合并成一个单一的数据流。这允许并发处理数据项。
Observable.just(1, 2, 3)
.flatMap(object:Function>{
override fun apply(t: Int?): Observable {
return Observable.just(t.toString())
}
})
// flatMap { number ->
// Observable.just(number, number * 2)
// }
.subscribe { value -> Log.d(TAG, "FlatMapped: $value") }
concatMap
操作符: concatMap
类似于 flatMap
,但它保持原始数据项的顺序。它等待前一个 Observable 完成后才处理下一个。
Observable.just(1, 2, 3)
.concatMap(object:Function>{
override fun apply(t: Int?): Observable {
return Observable.just(t.toString())
}
})
/*.concatMap { number ->
Observable.just(number, number * 2)
}*/
.subscribe { value ->
Log.d(TAG, "ConcatMapped: $value")
}
buffer
操作符: buffer
用于将数据项分组为列表,并以列表的形式发射。你可以指定每个列表中的数据项数量。
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(2)
.subscribe(object:Consumer>{
override fun accept(t: List?) {
Log.d("Rxjava","Buffered: $t")
}
})
/*.subscribe { buffer -> println("Buffered: $buffer") }*/
groupBy
操作符: groupBy
允许你将 Observable 数据项按某个标准进行分组,然后发射多个子 Observable,每个子 Observable 包含一组具有相同标准的数据项。
Observable.just(1, 2, 3, 4, 5, 6)
.groupBy(object : Function {
override fun apply(t: Int): String {
if (t!! % 2 == 0) {
return "偶数"
}
return "奇数"
}
})
.subscribe(object : Consumer> { //注意GroupedObservable参数的类型和Function反过来
override fun accept(t: GroupedObservable) { //GroupedObservable被观察者
t.subscribe(object : Consumer {
override fun accept(value: Int?) {
Log.d("Rxjava", "Group ${t.key}: ${value}")
}
})
}
})
//lambda表达式
Observable.just(1, 2, 3, 4, 5, 6)
.groupBy { it % 2 == 0 }
.subscribe { group ->
group.subscribe { value ->
Log.d("Rxjava", "Group ${group.key}: $value")
}
}
scan
操作符: scan
用于将数据项累积成一个中间结果,然后发射这个中间结果。
Observable.just(1, 2, 3, 4, 5)
.scan(object :BiFunction{
override fun apply(t1: Int, t2: Int): Int {
return t1 + t2
}
})
.subscribe(object:Consumer{
override fun accept(t: Int?) {
Log.d("Rxjava","Scanned: $t")
}
})
// .scan { acc, value -> acc + value }
// .subscribe { result -> Log.d("Rxjava","Scanned: $result") }
打印结果:
Scanned: 1
Scanned: 3
Scanned: 6
Scanned: 10
Scanned: 15
这些转换操作符可以帮助你根据需要对数据流进行变换、映射和操作。你可以选择合适的操作符来满足具体的业务需求,以便更有效地处理异步数据流。 RxJava 提供了众多其他转换操作符,可以根据实际需求查阅文档来使用。
RxJava 的过滤操作符用于从 Observable 中过滤、筛选和筛除数据项,以便只保留满足特定条件的数据项。
filter
操作符: filter
用于过滤掉不满足条件的数据项,只保留满足条件的数据项。条件由一个函数决定。
Observable.just(1, 2, 3, 4, 5, 6)
.filter(object:Predicate{
override fun test(t: Int): Boolean {
return t%2 ==0;
}
})
.subscribe(object :Consumer{
override fun accept(value: Int?) {
Log.d("Rxjava","Filtered: $value")
}
})
/* .filter { it % 2 == 0 }
.subscribe { value -> Log.d("Rxjava","Filtered: $value") }*/
distinct
操作符: distinct
用于过滤掉重复的数据项,只保留第一次出现的数据项。
//lambda写法
Observable.just(1, 2, 2, 3, 4, 4, 5)
.distinct()
.subscribe { value -> println("Distinct: $value") }
distinctUntilChanged
操作符: distinctUntilChanged
用于过滤掉连续重复的数据项,只保留第一次出现的数据项。
//lambda写法
Observable.just(1, 1, 2, 2, 3, 4, 4, 5)
.distinctUntilChanged()
.subscribe { value -> println("DistinctUntilChanged: $value") }
take
操作符: take
用于仅发射前 N 个数据项,忽略其余的数据项。
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe { value -> println("Taken: $value") }
skip
操作符: skip
用于跳过前 N 个数据项,只发射后续的数据项。
Observable.just(1, 2, 3, 4, 5)
.skip(2)
.subscribe { value -> println("Skipped: $value") }
elementAt
操作符: elementAt
用于发射指定索引位置的数据项,忽略其他数据项。
Observable.just(1, 2, 3, 4, 5)
.elementAt(2)
.subscribe { value -> println("ElementAt: $value") }
takeLast
操作符: takeLast
用于仅发射最后 N 个数据项,忽略前面的数据项。
Observable.just(1, 2, 3, 4, 5)
.takeLast(3)
.subscribe { value -> println("TakeLast: $value") }
这些过滤操作符可以帮助你根据特定条件来过滤和筛选数据流,以满足具体的需求。你可以选择合适的操作符来处理数据流,从而仅保留需要的数据项,而忽略其他数据项。在实际应用中,过滤操作符常用于数据筛选、去重、限制数量等场景,以帮助你更有效地处理异步数据流。 RxJava 还提供了其他过滤操作符,可以根据实际需求查阅文档来使用。
RxJava 的组合操作符用于将多个 Observable 合并、组合或操作,以生成新的 Observable 或数据流。
merge
操作符: merge
用于合并多个 Observables 的数据流,以按照发射顺序合并它们的数据项。这意味着数据项将按照它们发射的顺序合并,不考虑来源 Observable。
kotlinCopy codeval observable1 = Observable.just(1, 2, 3)
val observable2 = Observable.just(4, 5, 6)
Observable.merge(observable1, observable2)
.subscribe { value -> println("Merged: $value") }
concat
操作符: concat
用于合并多个 Observables 的数据流,但它保持原始 Observables 的顺序,先合并第一个 Observable 的数据,再合并第二个 Observable 的数据,以此类推。
kotlinCopy codeval observable1 = Observable.just(1, 2, 3)
val observable2 = Observable.just(4, 5, 6)
Observable.concat(observable1, observable2)
.subscribe { value -> println("Concatenated: $value") }
zip
操作符: zip
用于将多个 Observables 的数据项一对一地合并,生成一个新的 Observable。它会按顺序将每个 Observable 的相同索引位置的数据项合并在一起。
kotlinCopy codeval observable1 = Observable.just("A", "B", "C")
val observable2 = Observable.just(1, 2, 3)
Observable.zip(observable1, observable2) { str, num -> "$str$num" }
.subscribe { value -> println("Zipped: $value") }
combineLatest
操作符: combineLatest
用于将多个 Observables 最近的数据项合并成一个新的 Observable。每当任何一个源 Observable 发射新数据,将使用最近发射的数据项来组合生成新的数据项。
kotlinCopy codeval observable1 = Observable.interval(300, TimeUnit.MILLISECONDS).map { "A$it" }
val observable2 = Observable.interval(200, TimeUnit.MILLISECONDS).map { "B$it" }
Observable.combineLatest(observable1, observable2) { a, b -> "$a-$b" }
.take(5)
.subscribe { value -> println("Combined: $value") }
switchOnNext
操作符: switchOnNext
用于在一个 Observable 发射多个 Observables 时,切换到最新的 Observable 并发射它的数据。
kotlinCopy codeval observables = listOf(
Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9)
)
Observable.fromIterable(observables)
.switchMap { it }
.subscribe { value -> println("Switched: $value") }
这些组合操作符允许你将多个 Observables 合并、组合或操作,以满足不同的数据处理需求。你可以根据具体的场景选择合适的操作符,以便有效地处理异步数据流。在实际应用中,组合操作符常用于合并多个数据源,进行数据计算和处理,以及管理多个数据流的交互。 RxJava 还提供了其他组合操作符,可以根据实际需求查阅文档来使用。
RxJava 的错误处理操作符用于处理 Observable 中可能出现的错误和异常情况,以确保应用程序能够更健壮地处理这些问题。
onErrorReturn
操作符: onErrorReturn
用于在 Observable 遇到错误时发射一个默认值,并继续正常的数据流。
kotlinCopy codeObservable.create { emitter ->
emitter.onNext(1)
emitter.onError(Exception("An error occurred"))
}
.onErrorReturn { error -> 0 }
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: ${error.message}") }
)
onErrorResumeNext
操作符: onErrorResumeNext
用于在 Observable 遇到错误时切换到另一个 Observable,并继续发射数据。
kotlinCopy codeval sourceObservable = Observable.create { emitter ->
emitter.onNext(1)
emitter.onError(Exception("An error occurred"))
}
val fallbackObservable = Observable.just(2, 3, 4)
sourceObservable
.onErrorResumeNext(fallbackObservable)
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: ${error.message}") }
)
retry
操作符: retry
用于在 Observable 遇到错误时重试操作,指定重试次数。如果重试次数用尽仍有错误,错误会传递给观察者。
kotlinCopy codevar attempts = 0
Observable.create { emitter ->
if (attempts < 3) {
attempts++
emitter.onError(Exception("An error occurred"))
} else {
emitter.onNext(1)
emitter.onComplete()
}
}
.retry(3)
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: ${error.message}") }
)
retryWhen
操作符: retryWhen
允许你自定义错误重试策略。你可以在 retryWhen
中返回一个 Observable,用于控制重试次数和时机。
kotlinCopy codevar attempts = 0
Observable.create { emitter ->
if (attempts < 3) {
attempts++
emitter.onError(Exception("An error occurred"))
} else {
emitter.onNext(1)
emitter.onComplete()
}
}
.retryWhen { errors ->
errors.flatMap { error ->
if (attempts < 3) {
Observable.timer(1, TimeUnit.SECONDS)
} else {
Observable.error(error)
}
}
}
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: ${error.message}") }
)
onErrorComplete
操作符: onErrorComplete
用于在 Observable 遇到错误时忽略错误,不传递给观察者,直接完成 Observable。
kotlinCopy codeObservable.create { emitter ->
emitter.onNext(1)
emitter.onError(Exception("An error occurred"))
}
.onErrorComplete()
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: ${error.message}") }
)
这些错误处理操作符允许你在 Observable 遇到错误和异常情况时采取不同的处理策略,以确保你的应用程序能够更好地应对异常情况。你可以根据具体的需求选择合适的操作符,以提高应用程序的可靠性和健壮性。 RxJava 还提供了其他错误处理操作符,可以根据实际需求查阅文档来使用。
总结:以上都是同步操作,即被观察者和观察者都是在默认线程中执行(Android的主线程)
subscribeOn
操作符: subscribeOn
用于指定 Observable 的创建和订阅操作运行在指定的线程。通常用于将耗时的任务移到后台线程执行,以避免阻塞主线程。
kotlinCopy codeObservable.create { emitter ->
// 在 IO 线程执行任务
emitter.onNext(1)
emitter.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { value -> println("Received on main thread: $value") }
observeOn
操作符: observeOn
用于指定观察者(Observer)接收数据的线程。它允许你将数据切换到主线程,以更新 UI,或切换到其他线程执行特定操作。
kotlinCopy codeObservable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { value -> println("Received on main thread: $value") }
observeOn(io.reactivex.rxjava3.schedulers.Scheduler)
: 除了 Android 主线程调度器 (AndroidSchedulers.mainThread()
),RxJava 还提供了其他内置的调度器,如 Schedulers.io()
、Schedulers.computation()
等,用于切换到不同的线程池执行任务。
kotlinCopy codeObservable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe { value -> println("Received on computation thread: $value") }
subscribeOn
和 observeOn
组合: 通常,你需要结合使用 subscribeOn
和 observeOn
来控制 Observable 的创建和订阅线程以及观察者接收数据的线程。
kotlinCopy codeObservable.create { emitter ->
// 在 IO 线程执行任务
emitter.onNext(1)
emitter.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { value -> println("Received on main thread: $value") }
doOnSubscribe
和 doOnNext
操作符: 这些操作符用于在特定事件发生时执行操作,例如,使用 doOnSubscribe
可以在订阅发生时切换线程。
kotlinCopy codeObservable.just(1, 2, 3)
.doOnSubscribe { println("Subscribed on ${Thread.currentThread().name}") }
.subscribeOn(Schedulers.io())
.doOnNext { value -> println("Processing on ${Thread.currentThread().name}: $value") }
.observeOn(AndroidSchedulers.mainThread())
.subscribe { value -> println("Received on main thread: $value") }
这些线程切换操作符允许你在 RxJava 中灵活地控制数据流的线程调度,以满足不同场景下的性能和响应需求。你可以根据具体的需求和场景来选择合适的线程切换操作符,以优化应用程序的性能和用户体验。 RxJava 还提供其他线程切换操作符和自定义调度器的功能,可以根据实际需求查阅文档来使用。
Flowable是 RxJava 中用于处理背压的一种特殊类型的 Observable。它被设计用来处理生产者产生数据速度快于消费者处理数据速度的情况,从而防止数据积累和内存溢出。以下是关于Flowable` 的详细说明:
Flowable
是 RxJava 2 中引入的一种 Observable 类型,它支持背压。这意味着它内置了背压处理机制,可以确保在数据流速度不同步的情况下能够安全地传递数据。Flowable
提供了多种背压策略,可以通过 onBackpressureXXX()
操作符来指定。一些常见的背压策略包括:
onBackpressureBuffer()
: 缓冲数据并等待消费者处理。onBackpressureDrop()
: 丢弃过多的数据,只保留最新的数据。onBackpressureLatest()
: 保留最新的数据项,丢弃其余的数据。onBackpressureError()
: 当数据流速度太快时,抛出异常以通知消费者处理背压问题。Flowable
: 你可以使用 Flowable.create
或其他创建方法来创建 Flowable
。通常,在创建 Flowable
时需要使用 BackpressureStrategy
来指定背压策略。Flowable.create({ emitter ->
for (i in 1..10000) {
emitter.onNext(i)
}
}, BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.subscribe { value -> println("Received: $value") }
使用 Flowable
的消费者: 消费 Flowable
的观察者(Observer)通常需要使用 onBackpressureXXX
操作符来处理背压问题。消费者也可以通过调用 request
方法来请求数据。
Flowable.create({ emitter ->
for (i in 1..10000) {
emitter.onNext(i)
}
}, BackpressureStrategy.BUFFER)
.onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribe(object : FlowableSubscriber {
override fun onSubscribe(subscription: Subscription) {
subscription.request(10) // 请求处理的数据项数量
}
override fun onNext(value: Int) {
println("Received: $value")
}
override fun onError(throwable: Throwable) {
println("Error: ${throwable.message}")
}
override fun onComplete() {
println("Completed")
}
})
注意事项: 使用 Flowable
需要注意,消费者需要明确请求数据,否则数据不会被传递。你可以通过调用 request
方法来请求数据,或者在订阅时使用 onBackpressureXXX
操作符来自动请求数据。
Flowable
是处理背压问题的有效工具,适用于处理大量数据或高频率事件时,以确保数据流能够稳定和高效地传递。选择合适的背压策略和请求数据的数量是使用 Flowable
的关键因素,以满足应用程序的需求。
注意使用Flowable (被观察者)时观察者使用subscriber