前言
- 刚开始学前端时就知道有这玩意,差点因为这玩意把我劝退,这次好好学学。
- rxjs源码写的很棒,还有漂亮的注释让人了解如何去使用。
主要api
- 本文列举主要的api,其余自行点击文档查看。
- 官网:https://rxjs.dev/
- 弹珠图:https://rxmarbles.com/#from
Operators
- 由于操作符相当多,当你不知道用啥操作符时,可以试着使用操作符决策树这个功能:https://rxjs.dev/operator-decision-tree。它会告诉你你需要啥操作符。
Creation Operators
- https://rxjs.dev/guide/operators#creation-operators-list
from ——转为observerable
- from 是个非常常用的api,用来把对象、map、数组、promise、可迭代对象转换为oberservable。
- 可以转换的东西非常多。
- from的对象subscribe后立刻会推出。
import {
from } from 'rxjs';
const array = [10, 20, 30];
const result = from(array);
result.subscribe(x => console.log(x));
// Logs:
// 10
// 20
// 30
of ——转为observerable
- of和from有点类似,不同之处在于of会把其参数转为observerable,不会像from一样将数组展平。
const array = [10, 20, 30]
const array2 = [3, 41, 1, 2]
const result = of(array, array2)
result.subscribe({
next: next => console.log(next),
error: err => console.log(err),
complete: () => console.log('complete'),
})
[10, 20, 30]
[3, 41, 1, 2]
complete
Pipe
- pipe是为了更优雅的写操作符而制作的,它实际上就等于op()(obs)写法。
- 作为一种风格,op()(obs)从不使用,即使只有一个操作符;obs.pipe(op())是普遍首选。
自定义操作符
- 如果操作符满足不了,可以进行自定义:
import {
Observable, of } from 'rxjs';
function delay<T>(delayInMillis: number) {
return (observable: Observable<T>) =>
new Observable<T>((subscriber) => {
// this function will be called each time this
// Observable is subscribed to.
const allTimerIDs = new Set();
let hasCompleted = false;
const subscription = observable.subscribe({
next(value) {
// Start a timer to delay the next value
// from being pushed.
const timerID = setTimeout(() => {
subscriber.next(value);
// after we push the value, we need to clean up the timer timerID
allTimerIDs.delete(timerID);
// If the source has completed, and there are no more timers running,
// we can complete the resulting observable.
if (hasCompleted && allTimerIDs.size === 0) {
subscriber.complete();
}
}, delayInMillis);
allTimerIDs.add(timerID);
},
error(err) {
// We need to make sure we're propagating our errors through.
subscriber.error(err);
},
complete() {
hasCompleted = true;
// If we still have timers running, we don't want to yet.
if (allTimerIDs.size === 0) {
subscriber.complete();
}
},
});
// Return the teardown logic. This will be invoked when
// the result errors, completes, or is unsubscribed.
return () => {
subscription.unsubscribe();
// Clean up our timers.
for (const timerID of allTimerIDs) {
clearTimeout(timerID);
}
};
});
}
// Try it out!
of(1, 2, 3).pipe(delay(1000)).subscribe(console.log);
- 或者用现有操作符进行组合:
import {
pipe } from 'rxjs';
import {
filter, map } from 'rxjs/operators';
function discardOddDoubleEven() {
return pipe(
filter((v) => !(v % 2)),
map((v) => v + v)
);
}
Observerable
new
- observable在new的时候可以申明其内容:
import {
Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100); // "return" another value
subscriber.next(200); // "return" yet another
});
console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');
- 在声明时的写法很类似于promise,可以传递error ,error后的next也将无法执行:
import {
Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err); // delivers an error if it caught one
}
});
- 他会有特殊的complete,complete后的next将不会执行:
import {
Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered because it would violate the contract
});
subscribe
- subscribe可以传递参数或者对象,由于可读性问题,传参数的行为已经被废弃,rxjs推荐传递对象形式:
result.subscribe({
next: next => console.log(next),
error: err => console.log(err),
complete: () => console.log('complete'),
})
- 这样容易看出每个参数的意义。
- 其实subscribe里面跟着的对象就被称为observer(next,error,complete)。
- subscribe用通俗的理解是对observable进行调用:
const foo = new Observable(subscriber => {
console.log('Hello')
subscriber.next(42)
subscriber.next(100) // "return" another value
subscriber.next(200) // "return" yet another
})
foo.subscribe(v => console.log(v))
setTimeout(() => {
foo.subscribe(v => console.log(v))
}, 2000)
- subscribe的返回值为subscription,可以使用它来解除订阅。
import {
from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();
Subject
- subject是特殊的观察物,每一个subject都是一个observerable和Observer。
- subject被subscribe后是多播状态,而非像observable一样执行一次。
import {
Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${
v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${
v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
- 由于subject也是观察者,所以它也可以倒给observable这样:
import {
Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${
v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${
v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
multicast
- multicast主要用来控制开始播的时间。当connect时候即推送。
- 可以看一下multicast的例子(这个操作符目前被弃用),:
import {
interval, Subject } from 'rxjs';
import {
multicast } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${
v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${
v}`)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
- 上述结果:
observerA: 0
observerA: 1
observerB: 1
observerB: 2
observerB: 3
- 上述例子的手动connect比较麻烦,通常我们会等第一个订阅者到来时开始连接,最后一个订阅者离开时结束连接。
- 这时会有refcount这个操作符使用(该操作符已弃用):
import {
interval, Subject } from 'rxjs';
import {
multicast, refCount } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${
v}`)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${
v}`)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
- 只需要加个操作符,连接和取消连接不写就行了。
行为主题
- subject是多播的,这就会导致在另个观察者subscribe的时候,此时正好是个空档期,行为主体可以让观察者连接时立马收到目前的最新值:
import {
BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${
v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${
v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
重播主题
- 可以传递参数,为缓冲区大小,用于存放过时的数据。
import {
ReplaySubject } from 'rxjs'
const subject = new ReplaySubject(3) // buffer 3 values for new subscribers
subject.subscribe({
next: v => console.log(`observerA: ${
v}`),
})
subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)
subject.subscribe({
next: v => console.log(`observerB: ${
v}`),
})
subject.next(5)
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
异步主题
- 等待完成后传递最后一个值:
import {
AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${
v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${
v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
Scheduler
-
Schduler按照执行逻辑分成以下几类:
-
null:也就是不指定scheduler,同步执行
-
queueScheduler:也是同步执行,但在执行Rxjs会将所有同步的Observable放到queue内,再依次执行,等下我们说明这和null有什么区别
-
asapScheduler:异步执行,与Promise一样的异步处理层级,也就是microtask 宏任务
-
asyncScheduler:异步执行,处理方式同setIntervael,属于macrotask层级 微任务
-
animationFrameScheduler异步执行,处理方式同requestAnimationFrame,也是属于macrotask层级,常用来做动画