1、主要角色
对于异步事件流中涉及的各种角色对象,RxJS作了如下定义:
- Subject : 主体,即产生数据流的主体
- Observable :可观察对象,可视为数据流
- Observer :观察者,负责接收并处理响应数据流中的数据。
观察者是由 Observable 发送的值的消费者。体现在代码中,观察者是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete。
let observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
- Subscription :订阅 方法observable.subscribe(observer)的返回值就是一个 Subscription。
使用 观察者(回调方法)订阅(动词)一个 可观察对象 就生成一个订阅(名词)。
2、完整的流程
2.1 创建 Observable
使用构造函数创建了一个 Observable,它每隔一秒会向观察者发送字符串 'hi' :
let observable = new Observable(function subscribe(observer) {
let id = setInterval(() => {
observer.next('hi')
}, 1000);
});
2.2 订阅 Observable
observable.subscribe(x => console.log(x));
2.3 执行 Observables
new Observable() 方法中, function subscribe(observer) {} 方法中的代码表示 “Observable 执行”, 它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。
Observable 执行可以传递三种类型的值:
- "Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
- "Error" 通知: 发送一个 JavaScript 错误 或 异常。
- "Complete" 通知: 不再发送任何值。
"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
let observable = new Observable((observer: any) => {
try {
observer.next(1);
observer.next(2);
observer.complete();
} catch (err) {
observer.error(err);
}
});
在 Observable 执行中, 可能会发送零个到无穷多个 "Next" 通知。如果发送的是 "Error" 或 "Complete" 通知的话,那么之后不会再发送任何通知了。
2.4 清理 Observable 执行
因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。
当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):
let subscription: Subscription = observable.subscribe(d => { console.log(d); });
调用 unsubscribe() 方法可以取消进行中的执行:
subscription.unsubscribe();
实际的流程并不一定严格按以上顺序,Observable 执行后再进行订阅也是可以的。