RxJS,一个“神秘”的前端技术
写在前面
RxJS 是一个 JavaScript 库,它使用可观察对象来编写异步和基于事件的程序。它可以帮助开发人员更轻松地管理复杂的异步代码,并提供了许多操作符来处理可观察对象。RxJS 可以用于多种应用程序,包括 Web 应用程序、移动应用程序和桌面应用程序。但是,由于它有一些复杂的概念和操作符,因此有时也被认为是一种“神秘”的技术。
常用API
RxJS 提供了非常丰富的方法和 API,以下是 RxJS 中一些常用的方法和 API,以及详细示例和使用方法:
Observable 创建
-
Observable
:可观察对象,表示一个可观察的序列,可以发出零个或多个值,并在完成或出错时发出通知。 -
of
:创建一个可观察对象,该对象会依次发出指定的值。
import {
of } from 'rxjs';
const source = of(1, 2, 3);
source.subscribe(value => console.log(value)); // 输出 1, 2, 3
from
:将一个可迭代对象、类数组对象、Promise 或类似数组对象转换为可观察对象。
import {
from } from 'rxjs';
const source = from([1, 2, 3]);
source.subscribe(value => console.log(value)); // 输出 1, 2, 3
fromEvent
:创建一个可观察对象,该对象会在指定的 DOM 元素上发出指定类型的事件。
import {
fromEvent } from 'rxjs';
const button = document.getElementById('myButton');
const source = fromEvent(button, 'click');
source.subscribe(event => console.log(event)); // 输出点击事件对象
interval
:创建一个可观察对象,该对象会每隔指定的时间间隔发出一个递增的整数。
import {
interval } from 'rxjs';
const source = interval(1000);
source.subscribe(value => console.log(value)); // 每秒输出一个递增的整数
timer
:创建一个可观察对象,该对象会在指定的延迟时间后发出一个值。
import {
timer } from 'rxjs';
const source = timer(1000);
source.subscribe(() => console.log('Hello')); // 一秒后输出 'Hello'
range
:创建一个可观察对象,该对象会发出指定范围内的整数。
import {
range } from 'rxjs';
const source = range(1, 3);
source.subscribe(value => console.log(value)); // 输出 1, 2, 3
defer
:创建一个可观察对象,该对象会在订阅时调用一个工厂函数以生成一个新的可观察对象。
import {
defer } from 'rxjs';
const source = defer(() => {
const randomValue = Math.random();
return of(randomValue);
});
source.subscribe(value => console.log(value)); // 输出随机数
empty
:创建一个空的可观察对象,该对象会立即完成。
import {
empty } from 'rxjs';
const source = empty();
source.subscribe({
next: () => console.log('Next'),
complete: () => console.log('Complete')
}); // 立即输出 'Complete'
never
:创建一个永远不会发出任何值或完成的可观察对象。
import {
never } from 'rxjs';
const source = never();
source.subscribe(() => console.log('Next')); // 不会输出任何内容
throwError
:创建一个立即抛出错误的可观察对象。
import {
throwError } from 'rxjs';
const source = throwError('Something went wrong');
source.subscribe({
next: () => console.log('Next'),
error: error => console.error(error)
}); // 立即输出错误信息
Observable 转换
map
:对可观察对象发出的每个值应用一个函数,并将结果发出。
import {
of } from 'rxjs';
import {
map } from 'rxjs/operators';
const source = of(1, 2, 3);
const mappedSource = source.pipe(map(value => value * 2));
mappedSource.subscribe(value => console.log(value)); // 输出 2, 4, 6
filter
:对可观察对象发出的每个值应用一个谓词函数,如果返回 true,则发出该值。
import {
of } from 'rxjs';
import {
filter } from 'rxjs/operators';
const source = of(1, 2, 3);
const filteredSource = source.pipe(filter(value => value > 1));
filteredSource.subscribe(value => console.log(value)); // 输出 2, 3
scan
:对可观察对象发出的每个值应用一个累加器函数,并将累加器的当前状态作为下一个值发出。
import {
of } from 'rxjs';
import {
scan } from 'rxjs/operators';
const source = of(1, 2, 3);
const scannedSource = source.pipe(scan((acc, curr) => acc + curr, 0));
scannedSource.subscribe(value => console.log(value)); // 输出 1, 3, 6
buffer
:将可观察对象发出的值缓存到数组中,并在指定条件下将缓存的数组作为值发出。
import {
interval } from 'rxjs';
import {
buffer } from 'rxjs/operators';
const source = interval(1000);
const bufferedSource = source.pipe(buffer(interval(3000)));
bufferedSource.subscribe(value => console.log(value)); // 每三秒输出最近三秒内的值组成的数组
concatMap
:将可观察对象发出的每个值映射到一个新的可观察对象,并将这些可观察对象依次连接起来。
import {
of } from 'rxjs';
import {
concatMap } from 'rxjs/operators';
const source = of('Hello', 'World');
const mappedSource = source.pipe(concatMap(value =>
of(`${
value} RxJS`).pipe(delay(1000))
));
mappedSource.subscribe(value => console.log(value)); // 输出 "Hello RxJS" 和 "World RxJS",每个值间隔一秒
mergeMap
:将可观察对象发出的每个值映射到一个新的可观察对象,并将这些可观察对象合并成一个。
import {
of } from 'rxjs';
import {
mergeMap } from 'rxjs/operators';
const source = of('Hello', 'World');
const mappedSource = source.pipe(mergeMap(value =>
of(`${
value} RxJS`).pipe(delay(1000))
));
mappedSource.subscribe(value => console.log(value)); // 输出 "Hello RxJS" 和 "World RxJS",每个值间隔一秒
switchMap
:将可观察对象发出的每个值映射到一个新的可观察对象,并只订阅最新映射得到的可观察对象。
import {
of, interval } from 'rxjs';
import {
switchMap } from 'rxjs/operators';
const source = of('Hello', 'World');
const mappedSource = source.pipe(switchMap(() =>
interval(1000)
));
mappedSource.subscribe(value => console.log(value)); // 每秒输出递增的整数,但只有最新值会被输出
concatAll
:将多个可观察对象合并成一个,并按顺序发出它们发出的所有值。
import {
of } from 'rxjs';
import {
concatAll } from 'rxjs/operators';
const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);
const concatenatedSource = of(source1, source2).pipe(concatAll());
concatenatedSource.subscribe(value => console.log(value)); // 输出 1, 2, 3, 4, 5, 6
mergeAll
:将多个可观察对象合并成一个,并同时发出它们发出的所有值。
import {
of } from 'rxjs';
import {
mergeAll } from 'rxjs/operators';
const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);
const mergedSource = of(source1, source2).pipe(mergeAll());
mergedSource.subscribe(value => console.log(value)); // 输出所有值,顺序不确定
switchAll
:将多个可观察对象合并成一个,并只订阅最新发出的可观察对象。
import {
of, interval } from 'rxjs';
import {
switchAll } from 'rxjs/operators';
const source1 = interval(1000);
const source2 = interval(2000);
const switchedSource = of(source1, source2).pipe(switchAll());
switchedSource.subscribe(value => console.log(value)); // 只输出最新源发出的值,即每两秒输出一个递增整数
Observable 过滤
debounceTime
:在指定时间内忽略源发出的值,如果源在此期间没有发出新值,则发出最后一个值。
import {
fromEvent } from 'rxjs';
import {
debounceTime } from 'rxjs/operators';
const input = document.getElementById('myInput');
const source = fromEvent(input, 'input');
const debouncedSource = source.pipe(debounceTime(1000));
debouncedSource.subscribe(event => console.log(event.target.value)); // 在用户停止输入一秒后输出最终输入结果
distinctUntilChanged
:仅在源发出不同于前一个值时才发出该值。
import {
of } from 'rxjs';
import {
distinctUntilChanged } from 'rxjs/operators';
const source = of(1, 1, 2, 2, 3);
const distinctSource = source.pipe(distinctUntilChanged());
distinctSource.subscribe(value => console.log(value)); // 输出 1, 2, 3(仅在源中前后不同的值才被输出)
filter
:对可观察对象发出的每个值应用一个谓词函数,如果返回 true,则发出该值。
import {
of } from 'rxjs';
import {
filter } from 'rxjs/operators';
const source = of(1, 2, 3);
const filteredSource = source.pipe(filter(value => value > 1));
filteredSource.subscribe(value => console.log(value)); // 输出 2, 3(仅源中大于一的值被输出)
first
:仅在源发出第一个值时才发出该值,并立即完成。用于发出可观察对象的第一个值,然后完成。如果可观察对象为空,则会发出默认值或抛出错误。
import {
of } from 'rxjs';
import {
first } from 'rxjs/operators';
const source = of(1, 2, 3);
const firstValue = source.pipe(first());
firstValue.subscribe(value => console.log(value)); // 输出 1
在上面的示例中,我们创建了一个包含 1、2、3 的可观察对象,并使用 first 方法选择第一个值。因此,我们从 first 方法的输出中得到了值 1。
下面是 first 方法的另一个示例,其中我们提供了一个谓词函数来选择要发出的第一个值:
import {
of } from 'rxjs';
import {
first } from 'rxjs/operators';
const source = of(1, 2, 3);
const firstEvenValue = source.pipe(first(value => value % 2 === 0));
firstEvenValue.subscribe(value => console.log(value)); // 输出 2
在上面的示例中,我们创建了一个包含 1、2、3 的可观察对象,并使用 first 方法选择第一个偶数值。因此,我们从 first 方法的输出中得到了值 2。
如果可观察对象为空,则 first 方法会发出默认值或抛出错误。以下是 first 方法发出默认值的示例:
import {
EMPTY } from 'rxjs';
import {
first } from 'rxjs/operators';
const source = EMPTY;
const defaultValue = source.pipe(first(undefined, 'Default Value'));
defaultValue.subscribe(value => console.log(value)); // 输出 'Default Value'
在上面的示例中,我们创建了一个空的可观察对象,并使用 first 方法选择第一个值。由于可观察对象为空,因此 first 方法会发出默认值 ‘Default Value’。
如果不提供默认值,则 first 方法会抛出错误。以下是 first 方法抛出错误的示例:
import {
EMPTY } from 'rxjs';
import {
first } from 'rxjs/operators';
const source = EMPTY;
const noDefaultValue = source.pipe(first());
noDefaultValue.subscribe(value => console.log(value), error => console.log(error)); // 输出错误信息
在上面的示例中,我们创建了一个空的可观察对象,并使用 first 方法选择第一个值。由于可观察对象为空且没有提供默认值,因此 first 方法会抛出错误。
其他 API
-
Observable
:可观察对象,表示一个可观察的序列,可以发出零个或多个值,并在完成或出错时发出通知。 -
of
:创建一个可观察对象,该对象会依次发出指定的值。 -
from
:将一个可迭代对象、类数组对象、Promise 或类似数组对象转换为可观察对象。 -
fromEvent
:创建一个可观察对象,该对象会在指定的 DOM 元素上发出指定类型的事件。 -
interval
:创建一个可观察对象,该对象会每隔指定的时间间隔发出一个递增的整数。 -
timer
:创建一个可观察对象,该对象会在指定的延迟时间后发出一个值。 -
range
:创建一个可观察对象,该对象会发出指定范围内的整数。 -
defer
:创建一个可观察对象,该对象会在订阅时调用一个工厂函数以生成一个新的可观察对象。 -
empty
:创建一个空的可观察对象,该对象会立即完成。 -
never
:创建一个永远不会发出任何值或完成的可观察对象。 -
throwError
:创建一个立即抛出错误的可观察对象。 -
map
:对可观察对象发出的每个值应用一个函数,并将结果发出。 -
filter
:对可观察对象发出的每个值应用一个谓词函数,如果返回 true,则发出该值。 -
scan
:对可观察对象发出的每个值应用一个累加器函数,并将累加器的当前状态作为下一个值发出。 -
reduce
:对可观察对象发出的每个值应用一个累加器函数,并在完成时发出累加器的最终状态。 -
take
:从可观察对象中取出指定数量的值,然后完成。 -
takeUntil
:从可观察对象中取出值,直到另一个可观察对象发出值为止。 -
mergeMap
:将可观察对象发出的每个值映射到一个新的可观察对象,并将这些可观察对象合并成一个。 -
switchMap
:将可观察对象发出的每个值映射到一个新的可观察对象,并使用最新的可观察对象代替先前的可观察对象。 -
concatMap
:将可观察对象发出的每个值映射到一个新的可观察对象,并等待前一个可观察对象完成后再订阅下一个可观察对象。 -
debounceTime
:忽略在指定时间内连续发生的重复值,并只发出最后一个值。 -
distinctUntilChanged
:忽略连续重复的值,并只发出非重复的值。 -
tap
:对可观察对象发出的每个值执行一些副作用操作,而不改变它们。 -
catchError
:捕获可观察对象中的错误,并返回另一个可观察对象或抛出另一个错误。
以上是 RxJS 的常用方法和 API,它们可以帮助开发人员更轻松地管理复杂的异步代码。
进阶
Observable 组合
concat
:将多个可观察对象按顺序连接起来,依次发出每个可观察对象的值。
import {
of, concat } from 'rxjs';
const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);
const concatenatedSource = concat(source1, source2);
concatenatedSource.subscribe(value => console.log(value)); // 输出 1, 2, 3, 4, 5, 6
merge
:将多个可观察对象合并成一个,同时发出所有可观察对象的值。
import {
of, merge } from 'rxjs';
const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);
const mergedSource = merge(source1, source2);
mergedSource.subscribe(value => console.log(value)); // 输出所有值,顺序不确定
combineLatest
:将多个可观察对象合并成一个,只有当所有可观察对象都至少发出一个值时,才会发出一个值。
import {
combineLatest, interval } from 'rxjs';
const source1 = interval(1000);
const source2 = interval(2000);
const combinedSource = combineLatest(source1, source2);
combinedSource.subscribe(value => console.log(value)); // 每秒输出一个数组,包含两个可观察对象最新的值
zip
:将多个可观察对象合并成一个,只有当所有可观察对象都发出一个值时,才会发出一个值。
import {
zip, interval } from 'rxjs';
const source1 = interval(1000);
const source2 = interval(2000);
const zippedSource = zip(source1, source2);
zippedSource.subscribe(value => console.log(value)); // 每两秒输出一个数组,包含两个可观察对象最新的值
startWith
:在可观察对象发出值之前,发出指定的初始值。
import {
of } from 'rxjs';
import {
startWith } from 'rxjs/operators';
const source = of(1, 2, 3);
const startWithSource = source.pipe(startWith(0));
startWithSource.subscribe(value => console.log(value)); // 输出 0, 1, 2, 3
withLatestFrom
:将一个可观察对象和另一个或多个可观察对象组合成一个,只有当第一个可观察对象发出一个值时,才会发出一个值。
import {
interval } from 'rxjs';
import {
withLatestFrom } from 'rxjs/operators';
const source1 = interval(1000);
const source2 = interval(500);
const withLatestFromSource = source1.pipe(withLatestFrom(source2));
withLatestFromSource.subscribe(value => console.log(value)); // 每秒输出一个数组,包含 source1 最新的值和 source2 最新的值
Observable 过滤
debounceTime
:在指定的时间间隔内,忽略可观察对象发出的值,并在时间间隔结束时发出最后一个值。
import {
fromEvent } from 'rxjs';
import {
debounceTime } from 'rxjs/operators';
const input = document.getElementById('myInput');
const source = fromEvent(input, 'input');
const debouncedSource = source.pipe(debounceTime(1000));
debouncedSource.subscribe(event => console.log(event.target.value)); // 在用户停止输入一秒钟后输出最后一次输入的值
distinctUntilChanged
:忽略连续重复的值,并只发出非重复的值。
import {
of } from 'rxjs';
import {
distinctUntilChanged } from 'rxjs/operators';
const source = of(1, 1, 2, 2, 3, 3);
const distinctSource = source.pipe(distinctUntilChanged());
distinctSource.subscribe(value => console.log(value)); // 输出 1, 2, 3
filter
:对可观察对象发出的每个值应用一个谓词函数,如果返回 true,则发出该值。
import {
of } from 'rxjs';
import {
filter } from 'rxjs/operators';
const source = of(1, 2, 3);
const filteredSource = source.pipe(filter(value => value > 1));
filteredSource.subscribe(value => console.log(value)); // 输出 2, 3
take
:只发出指定数量的值,然后完成。
import {
interval } from 'rxjs';
import {
take } from 'rxjs/operators';
const source = interval(1000);
const takenSource = source.pipe(take(3));
takenSource.subscribe(value => console.log(value)); // 输出前三个递增的整数
takeUntil
:只发出在另一个可观察对象开始发出值之前的值,然后完成。
import {
interval, timer } from 'rxjs';
import {
takeUntil } from 'rxjs/operators';
const source = interval(1000);
const timer$ = timer(5000);
const takenSource = source.pipe(takeUntil(timer$));
takenSource.subscribe(value => console.log(value)); // 输出前五个递增的整数,然后完成
Observable 错误处理
catchError
:捕获错误,并返回另一个可观察对象或抛出错误。
import {
of } from 'rxjs';
import {
catchError } from 'rxjs/operators';
const source = of('Hello', 'World', new Error('Oops!'));
source.pipe(
map(value => value.toUpperCase()),
catchError(error => {
console.error(error.message);
return of(null);
})
).subscribe(value => console.log(value)); // 输出 HELLO, WORLD, null,并打印错误信息
retry
:在出现错误时重试可观察对象发出的值。用于在可观察对象发生错误时自动重试。它会尝试重新订阅可观察对象,并在每次发生错误时重试指定的次数。
import {
interval } from 'rxjs';
import {
retry } from 'rxjs/operators';
const source = interval(1000).pipe(
map(value => {
if (value === 3) {
throw new Error('Oops!');
}
return value;
})
);
const retriedSource = source.pipe(retry(2));
retriedSource.subscribe({
next: value => console.log(value),
error: error => console.error(error)
});
如果我们将 retry 方法的参数设为 -1,则可观察对象将一直重试,直到成功为止。
—————————— 【正文完】——————————
前端学习交流群,想进来面基的,可以加群: 832485817,685486827;
写在最后: 约定优于配置 —— 软件开发的简约原则
——————————【完】——————————
我的:
个人网站: https://neveryu.github.io/neveryu/
Github: https://github.com/Neveryu
微信: miracle421354532
更多学习资源请关注我的微信…好吗