RxJS提供了很多操作符对数据流(Observable)进行操作控制。在 【Angular中的RxJS】- 创建数据流 一文中介绍了创建数据流的方法,相关操作符这里就不赘述了。本文只介绍常用的针对数据流中的数据和流整体的处理的操作符。
1、针对流数据的操作符
map 操作符
使用 map 操作符 对数据流中每次产生的数据进行处理,map(val: funcion(val: R):R) ,代码示例:
import { from, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
let ob: Observable<number> = from([80, 90, 100]);
let fn = (num: number) => num * 2;
ob.pipe(map(fn)).subscribe(d => { console.log(d); });
// 输出:
// 160
// 180
// 200
tap 操作符
使用 tap 操作符 对源 Observable 上的每个发出值进行监听,做额外处理,但返回源相同的 Observable。代码示例:
import { from, Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
let ob: Observable<number> = from([1, 2, 3]);
ob.pipe(
tap(d => { console.log(`tap log ${d}`); })
).subscribe(d => { console.log(d); });
// 输出:
// tap log 1
// 1
// tap log 2
// 2
// tap log 3
// 3
scan 操作符
使用 scan 操作符 针对数据的产生次数进行累加计数,scan(accumulator: funcion(acc: R):R, seed: R) ,代码示例:
import { from, Observable } from 'rxjs';
import { scan } from 'rxjs/operators';
let ob: Observable<string> = from(["a", "b", "c"]);
ob.pipe(scan(d => d + 10, 15)).subscribe(d => {
console.log(d);
});
// 输出:
// 25
// 35
// 45
throttleTime 操作符
使用 throttleTime 操作符 从源数据流中发出一个值,然后在设置的时间内忽略随后发出的源值, 然后重复此过程。
throttleTime(duration: number) 对应的流程图示:
【Angular中的RxJS】- 概要介绍中控制按钮一秒钟内最多点击一次的示例就使用了 throttleTime 操作符:
import { fromEvent } from 'rxjs';
import { scan, throttleTime } from 'rxjs/operators';
/**
* 绑定按钮点击事件
*/
bindBtn2CLick() {
const button = document.querySelector('#btn2') as HTMLElement;
fromEvent(button, 'click').pipe(
throttleTime(1000),
scan(count => count + 1, 0)
).subscribe(count => {
console.log(`按钮2点击了 ${count} 次`);
});
}
auditTime 操作符
使用 auditTime 操作符 在设置的时间内忽略源值,然后发出源 Observable 的最新值, 并且重复此过程。
auditTime(duration: number) 对应的流程图示:
auditTime 和 throttleTime 很像, 但是发送沉默时间窗口的最后一个值, 而不是第一个。只要 audit 的内部时间器被禁用,它就会在 输出 Observable 上发出源 Observable 的最新值,并且当定时器启用时忽略源值。初始时,时间器是禁用的。只要第一个值到达, 时间器被启用。度过 持续时间后,时间间隔被禁用, 输出 Observable 发出最新的值, 不断的重复这个过程。
debounceTime 操作符
使用 debounceTime 操作符只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。对应的流程图示:
debounceTime 操作符一个最常用的使用场景就是输入联想,一下示例演示停止输入50毫秒后进行输入联想查询:
const input = fromEvent(document.querySelector('input'), 'input');
input.pipe(
debounceTime(200),
map(event => event.target.value),
).subscribe(value => {
console.log(value);
});
skip 操作符
使用 skip 操作符 跳过源 Observable 发出的前N个值(N = count),skip(count: number) 对应的流程图示:
skipUntil 操作符
使用 skipUntil 操作符 跳过源 Observable 发出的值,直到另一个Observable有值发出
skipUntil(notifier: Observable) 对应的流程图示:
take 操作符
使用 take 操作符 只发出源 Observable 最初发出的的N个值 (N = count)。
take(count: number) 对应的流程图示:
takeUntil 操作符
使用 takeUntil 操作符 发出源 Observable 发出的值直到另一个Observable有值发出为止。
takeUntil(notifier: Observable) 对应的流程图示:
更多常用的针对流数据操作符的还有: first() last() 等,这里就不再一一列举了,可以参考官方文档学习:RxJS 官方文档,需要注意的是,在Angular框架中针对流数据的操作符都是放在 pipe()方法中书写,这一点与官网的语法稍有不同。
2、针对整体流的处理
zip 操作符
使用 zip 操作符将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来。如果最后一个参数是函数, 这个函数被用来计算最终发出的值.否则, 返回一个顺序包含所有输入值的数组。
import { Observable, from, zip } from 'rxjs';
let obName: Observable<string> = from(["Lucy", "LiLei", "HanMeiMei"]);
let obScore: Observable<number> = from([80, 90, 100]);
zip(obName, obScore).subscribe(([name, score]) => {
console.log(`name: ${name}, score: ${score}.`)
});
// 输出:
// name: Lucy, score: 80.
// name: LiLei, score: 90.
// name: HanMeiMei, score: 100.
zip 最后一个参数是函数的情况:
import { Observable, from, zip } from 'rxjs';
let obName: Observable<string> = from(["Lucy", "LiLei", "HanMeiMei"]);
let obScore: Observable<number> = from([80, 90, 100]);
let fn = (name: any, score: any) => {
return `name: ${name}, score: ${score}.`;
};
zip(obName, obScore, fn).subscribe((d) => { console.log(d); });
// 输出:
// name: Lucy, score: 80.
// name: LiLei, score: 90.
// name: HanMeiMei, score: 100.
使用 zip 操作符时,只有在每个数据流都有数据产生时才会触发总的数据流产生数据,如以上示例中 obScore = from([80, 90]); ,则只会有两次数据输出,只会打印前两行。
merge 操作符
使用 merge 操作符可以将多个数据流发出数据进行合并为一个数据流,再依次发出:
import { timer, Observable, merge } from 'rxjs';
import { map } from 'rxjs/operators';
let obA: Observable<any> = timer(10000, 2000).pipe(map(d => `Observable A : ${d}`));
let obB: Observable<any> = timer(2000, 2000).pipe(map(d => `Observable B : ${d}`));
merge(obA, obB).subscribe(d => { console.log(d); });
// 延迟1秒后每间隔2秒输出
// Observable A : 0
// Observable B : 0
// Observable A : 1
// Observable B : 1
// ...
concat 操作符
使用 concat 操作符 可以将两个数据流的数据连接在一起作为一个数据流的数据按连接顺序发出:
import { timer, Observable, concat } from 'rxjs';
let obA: Observable<number> = from([1, 2, 3]);
let obB: Observable<number> = from([4, 5, 6]);
concat(obA, obB).subscribe(d => { console.log(d); });
// 输出
// 1
// 2
// 3
// 4
// 5
// 6
使用concat操作符时,后面的数据流中的数据必须等到前面的数据流的数据发送完成之后才能发出:
import { timer, Observable, concat } from 'rxjs';
import { map } from 'rxjs/operators';
let obA: Observable<any> = timer(1000, 2000).pipe(map(d => `Observable A : ${d}`));
let obB: Observable<any> = timer(2000, 2000).pipe(map(d => `Observable B : ${d}`));
concat(obA, obB).subscribe(d => { console.log(d); });
以上代码将只会打印 Observable A 的信息,因为 obA 一直不能完成,就不能发出obB中的数据。