为了更好地使用 Subject
,rxjs
在 Subject
的基础上封装了一些更贴近实际使用场景的 variant Subject
AsyncSubject
// /src/internal/AsyncSubject.ts
/**
* A variant of Subject that only emits a value when it completes. It will emit
* its latest value to all its observers on completion.
*/
复制代码
直接看官方注释,只有当发生结束事件之后(即调用complete
),AsyncSubject
才会立即向所有的 observers
广播最后一次发生事件的值
const subject = new AsyncSubject()
subject.subscribe(data => console.log(`订阅A:${data}`))
subject.next(1)
subject.next(2)
subject.subscribe(data => console.log(`订阅B:${data}`))
subject.next(3)
subject.complete()
// 订阅A:3
// 订阅B:3
复制代码
既然 AsyncSubject
在调用 next
、complete
的时候,存在与 Subject
不一样的行为,那么 AsyncSubject
肯定重写了这两个方法,所以看一下
// /src/internal/AsyncSubject.ts
next(value: T): void {
if (!this.isStopped) {
this._value = value;
this._hasValue = true;
}
}
复制代码
使用 this._value
保存每次 next
时的值,保证 this._value
存的永远是最新值,并且在调用 next
之后,会将 this._hasValue
置为 true
,这个变量是一个标志位,用于表示是否调用过 next
方法,如果从来没有调用过 next
,那么 this._hasValue
就是 false
,那么就算后续调用了 complete
也不会广播任何值(符合常理,因为并没有值可以广播,所以也就不广播了)
// /src/internal/AsyncSubject.ts
complete(): void {
const { _hasValue, _value, _isComplete } = this;
if (!_isComplete) {
this._isComplete = true;
_hasValue && super.next(_value!);
super.complete();
}
}
复制代码
只有当调用 complete
的时候,才执行 Subject
的 next
方法逻辑,也就是对所有的 observers
广播值
BehaviorSubject
// /src/internal/BehaviorSubject.ts
/**
* A variant of Subject that requires an initial value and emits its current
* value whenever it is subscribed to.
*/
复制代码
BehaviorSubject
在初始化的时候需要传入一个初始值(initial value
),并且每当发生订阅事件(subscribe
)的时候,都会自动发射(emits
)最新值
Subject
在订阅(subscribe
) 的时候,只是将 subscriber
的方法暂存到 observers
中,只有当调用 next
的时候,才会广播事件。如果希望在第一次订阅的时候就能立即收到预设值,且在之后订阅时也可以收到最近一次发生过的最新值,则可以使用 BehaviorSubject
const subject = new BehaviorSubject(1)
subject.subscribe(data => console.log(`A:${data}`))
subject.next(2)
subject.subscribe(data => console.log(`B:${data}`))
// A:1
// A:2
// B:2
复制代码
// /src/internal/BehaviorSubject.ts
export class BehaviorSubject<T> extends Subject<T> {
constructor(private _value: T) {
super();
}
}
复制代码
BehaviorSubject
继承了 Subject
,并且接收一个私有初始值 _value
,这个值就是第一次订阅的时候会发出的值,BehaviorSubject
的 subscribe
存在 Subject
不一样的行为,那么其肯定重写了 Subject
的 _subscribe
方法,所以看一下
// /src/internal/BehaviorSubject.ts
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber);
!subscription.closed && subscriber.next(this._value);
return subscription;
}
复制代码
其仍旧调用了 Subject
的 _subscribe
,并且在此基础上,还调用了 subscriber.next
,这就是 BehaviorSubject
能在订阅时发出值的原因
this._value
保存的就是最新值,这个值的初始值是初始化 BehaviorSubject
的时候传入的值,但它是会更新的,在每次调用next
的时候,都会将 this._value
的值更新为 next
的参数值,保证了其值永远保持最新
// /src/internal/BehaviorSubject.ts
next(value: T): void {
super.next((this._value = value));
}
复制代码
此外,BehaviorSubject
还有个 value
属性,允许外界直接查询到 this._value
的值
// /src/internal/BehaviorSubject.ts
get value(): T {
return this.getValue();
}
getValue(): T {
const { hasError, thrownError, _value } = this;
// ...
return _value;
}
复制代码
ReplaySubject
ReplaySubject
的官方解释比较复杂,其实跟 BehaviorSubject
类似,都会在订阅事件发生的时候 emit
,只不过 BehaviorSubject
只能emit
最后一个值,而 ReplaySubject
可以 emit
最后 N
个值,N
的大小由开发者决定,另外,ReplaySubject
没有初始值的概念,所以必须调用了 next
之后,才有可以 emit
的值
const subject = new ReplaySubject(3);
subject.subscribe(data => console.log(`第一次订阅: ${data}`));
subject.next(1);
subject.next(2);
subject.subscribe(data => console.log(`第二次订阅: ${data}`));
// 第一次订阅: 1
// 第一次订阅: 2
// 第二次订阅: 1
// 第二次订阅: 2
复制代码
// /src/internal/ReplaySubject.ts
export class ReplaySubject<T> extends Subject<T> {
private _buffer: (T | number)[] = [];
private _infiniteTimeWindow = true;
/**
* @param bufferSize The size of the buffer to replay on subscription
* @param windowTime The amount of time the buffered items will say buffered
* @param timestampProvider An object with a `now()` method that provides the current timestamp. This is used to
* calculate the amount of time something has been buffered.
*/
constructor(
private _bufferSize = Infinity,
private _windowTime = Infinity,
private _timestampProvider: TimestampProvider = dateTimestampProvider
) {
super();
this._infiniteTimeWindow = _windowTime === Infinity;
this._bufferSize = Math.max(1, _bufferSize);
this._windowTime = Math.max(1, _windowTime);
}
}
复制代码
ReplaySubject
接受三个可选的初始化参数,第一个就是我们开头说的那个 N
,表示截取的事件数量,第二个参数 _windowTime
表示截取事件所在的事件窗口
在 ReplaySubject
中存在一个数组 _buffer
,如果在初始化 ReplaySubject
的时候,传入了第二个参数 _windowTime
,那么每次调用 next
方法的时候,会将两个值按照顺序依次存入 _buffer
中:next
的值,以及这个值的过期时间
next
的值我们知道,就是 next
的参数,过期时间是什么呢?其实就是 当前时间戳 + _windowTime
// /src/internal/ReplaySubject.ts
next(value: T): void {
const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
if (!isStopped) {
_buffer.push(value);
!_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
}
this._trimBuffer();
super.next(value);
}
复制代码
_timestampProvider.now()
一般情况下可以看做是 Date.now()
然后在 subscribe
的时候,遍历这个 _buffer
,取出
// /src/internal/ReplaySubject.ts
protected _subscribe(subscriber: Subscriber<T>): Subscription {
// ...
this._trimBuffer();
// ...
const copy = _buffer.slice();
for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) {
subscriber.next(copy[i] as T);
}
// ...
}
复制代码
上面说了,如果在初始化 ReplaySubject
的时候,传入了第二个参数 _windowTime
,那么 _buffer
的结构就类似于 [value, expiredTime, value, expiredTime...]
,数组中下标为 2n
(n >= 0
)的数值才是真正的 value
,所以 for
循环的步进值是 2
// /src/internal/ReplaySubject.ts
private _trimBuffer() {
// 从 _buffer 数组的开头向后删除数组,保持 _buffer 的长度和 _bufferSize * 2 一致
const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
_bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);
if (!_infiniteTimeWindow) {
const now = _timestampProvider.now();
let last = 0;
// 过期时间比当前时间戳要小,说明已经过期了
for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
last = i;
}
// 删除所有过期的 value
last && _buffer.splice(0, last + 1);
}
}
复制代码
在 next
和 _subscribe
中,还有一个方法都被调用到了,即 _trimBuffer
,这个方法用于删减 _buffer
中的数组,例如在初始化的时候传入的 _bufferSize = 2
,如果在调用这个方法的时候,发现 _buffer
有超过 2
个数据,则会把除了最后 2
之外的其他所有值全部删掉(splice
);如果传入了第二个参数 windowTime
,则在调用这个方法的时候,会将 _buffer
中所有过期的数据删掉
const subject = new ReplaySubject(100, 200);
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() => {
subject.next(4);
}, 400)
setTimeout(() => {
subject.subscribe(data => console.log(`订阅: ${data}`));
}, 500)
// 订阅: 4
复制代码
ReplaySubject
的第一个参数是 100
,表示可以向新订阅的观察者重播最后 100
个值,但又因为指定了第二个参数为 200
,在 subscribe
事件发生的时候,1
、2
、3
这三个值都已经过期 500 - 200 = 300ms
了,只有 4
还没过期,所以只重播了 4
这个值
ReplaySubject
还有第三个初始化参数 timestampProvider
// /src/internal/types.ts
export interface TimestampProvider {
now(): number;
}
复制代码
如果没传入这个参数的话,则此参数默认等于 Date
,如果要传入,一般会传入一个调度器 Scheduler
,本文就不展开了
小结
本文浅析了 rxjs
中内置 Subject
的实现原理,这些变种 Subject
在原始 Subject
的基础上实现了一些方便开发者在实际场景中使用的功能,如果内置的 Subject
无法满足我们的需求,我们完全可以在 Subject
的基础上继续扩充出我们需要的 Subject