这一篇主要就是讲 flattening operators,像其他的 pipeable 一样, flattening operators 内部会 subscribe 每一个传进来的 Observable,并且将其返回一个新的 Observable。不过它可以将 higher order observable 扁平化,就像 lodash 的 flattening 一样(lodash 的 flattening 可以将嵌套的数组转化为扁平数组)。
flattening operators 包含 concatMap,switchMap,mergeMap 等,具体的区别后面再说。
concatMap 的基础使用:
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map((event) => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`)
)
)
.subscribe((value) => console.log(value));
这样的处理就不需要一些回调函数在 concatMap 内部中处理结果,而是将多个/多重 Observable 传到下游,因此可以直接在最后的 subscribe 中进行处理。
也如同之前提到的一样,如果这时候,pipe 中的 Observable 报错,那么就会终止操作。
这是第一个处理错误的方法,使用之前讲过的 catchError 进行处理。
import { fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map((event) => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`)
),
catchError((err) => of(err))
)
.subscribe({
next: (value) => console.log(value),
complete: () => console.log('completed'),
});
使用这种方法操作 Observable,一旦遇到错误,那么该 stream 就会进入完成状态,从而终结 subscription。
稍微调整一下 catchError 的过程就会让 concatMap 传出成功而非报错的信号,从而继续处理下一个流。
import { fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map((event) => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`).pipe(
catchError((err) => of(err))
)
)
)
.subscribe((value) => console.log(value));
主要就是几个 operator 对于并发的处理:
concatmap 会将所有的 Observable 推到 queue 中,等到上游的 Observable 完成之后,再按顺序的继续执行下一个 Observable。
在不知道用什么的时候,可以优先选择 concatMap
switchMap 在遇到下游的 Observable 之后,会取消(unsubscribing)上一个 Observable。
如果是 HTTP 请求的话,就会将上一个 HTTP 请求取消。对于 GET 请求来说这不会有什么问题,不过如果是 POST 的话,有可能在取消该请求之前,该请求就已经到了后端,并且做了相应的变化。
mergemap 会在每次接受到一个 observable 的时候就发送一个请求
相较于其他两个 flattening operators,mergeMap 是最容易出现问题的一个,所以用的时候需要更加的谨慎