RACSubject
RACSubject 是 RACSignal 的子类,其作为信号流的同时又遵循了 <RACSubscriber>
协议,这就使其能够主动的发送信号量给订阅者。
RACSubject 同样重写了订阅方法 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
,在该方法中,订阅者将会被保存,并且不同于 RACSignal 的订阅,这里并不会,当然也没有什么 didSubscribe 代码会被触发执行。
创建方法:+ (instancetype)subject;
在这个类中,关键的方法如下:
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
#pragma mark RACSubscriber
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
sendNext: 是协议 <RACSubscriber>
中的方法,可见其只是调用了 enumerateSubscribersUsingBlock: 方法,在该方法中,会遍历保存的所有的订阅者,然后将指定的信号量传递出去。类似的,在发送错误或结束信号量时,同样会转发给每个订阅者。
RACBehaviorSubject
RACBehaviorSubject 是 RACSubject 的子类,同父类一样,其也会保存所有的订阅者,每一次传递的信号量都会传递给所有的订阅者,不同的是,当订阅者订阅该信号流时,其会将最近一次发送的信号量发送给订阅者,如果还没发送过信号量,就发送默认值,该默认值是在创建该信号流时指定的。
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value {
RACBehaviorSubject *subject = [self subject];
subject.currentValue = value;
return subject;
}
RACGroupedSignal
RACGroupedSignal 是 RACSubject 的子类,其拥有一个 key 属性用来标记相同组的信号流。
+ (instancetype)signalWithKey:(id<NSCopying>)key {
RACGroupedSignal *subject = [self subject];
subject.key = key;
return subject;
}
RACReplaySubject
RACReplaySubject 是 RACSubject 的子类,这个信号流同父类一样保存了所有的订阅者,并且还保存了接收到的所有信号量,至于保存的数量,可以在初始化时指定,如果使用 init 方法,那么默认为 RACReplaySubjectUnlimitedCapacity 即不限定数量。
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity;
每当订阅该信号流时,该信号流都会将保存的信号量发送给订阅者,即使该信号流接收到的错误或结束信号量。如果该信号流已经结束,那么每一次订阅者订阅,都只是接收固定的信号量,而后结束,所以该信号流也称作重播信号流。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
该类重写了发送方法,如下,当存储的信号量有限制时,会不断删除最早接收到的信号量。
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}