操作符是 RxJS 库和 Observables 的基础块。 它使我们能够通过使用一些关键字(函数)来执行复杂的操作。 运算符只不过是获取源 Observable、对其执行操作并返回新 Observable 的函数。
rxJS 的 管道运算符 pipe 允许我们链接运算符。每个运算符都接受一个 Observable 并返回一个 Observable,这种一致的行为使得链接成为了可能。 Operator 返回的 Observable 作为下一个运算符的输入。
下面是一个最简单的自定义 Operator 的例子。
import { interval, of } from 'rxjs';
function fancyOperator(source) {
return source;
}
interval(1000)
.pipe(fancyOperator)
.subscribe((value) => console.log(value));
pipe 调用并不会真正执行 Operator 函数体的逻辑,直至 Observable 被 subscribe 为止??是这样吗?
pipeFromArray 仅仅取出 Operation 数组里存储的第一个函数。
然后调用之,输入参数 this 就是源 Observable.
下面我们对 fancyOperator 添加少许功能。
import { interval, Observable } from 'rxjs';
function fancyOperator(source) {
return Observable.create((observer) => {
observer.next('Jerry');
observer.complete();
});
}
interval(1000)
.pipe(fancyOperator)
.subscribe((value) => console.log(value));
这里我们采取了移花接木的方法,在自定义 Operator 里返回了一个新的 Observable,这个 Observable 只发射一次数据,就进入终止状态。
看一个现实项目中自定义 Observable 的例子:
import { fromEvent, interval, Observable } from 'rxjs';
import { filter } from 'rxjs/operators';
function filterKey(key) {
return filter((event: KeyboardEvent) => event.key === key);
}
fromEvent(document, 'keyup')
.pipe(filterKey('Enter'))
.subscribe(
(data) => console.log(data) // KeyboardEvent
);
这个自定义 Operator 的输入是 Enter,输出是 filter 调用的返回值,一个新的函数。filter 调用接收一个 predicate,返回一个新的函数。
看一下 filter 返回函数运行时打印的值:
把原始的 Observable,传送给 filter 返回的函数并执行:
filter 返回的函数对原始 Observable 进行加工,返回一个新的 Observable:
触发 filterOperator 的构造函数:
把新建的 filterOperator 实例传入原始 Observable 的 lift 方法。这个 lift 操作非常简单,新建一个 Observable,把原始的 Observable挂接到新的 Observable 的 source 字段,同时把 Operator 实例赋给新的 Observable 实例的 operator 方法。
因此最后订阅时,订阅的是 lift 操作之后的新的 Observable 实例。
subscribe 会触发 filterOperator 的 call 方法: