版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
RxRelay源码解析:
RxRelay源码地址
自定义RxBus + RxRelay
Relays 是既是Observable也是Consumer的RxJava 类型,由于没有onComplete 或 onError,所以发生异常时不会触发下游的终止状态,不会抛出异常。
//接收订阅之后数据
Relay relay = PublishRelay.create().toSerialized();
//接受订阅之前一个和之后数据
Relay relay = BehaviorRelay.createDefault("Hello World");
//不管何时订阅,发送所有数据
Relay relay = ReplayRelay.create(5).toSerialized();
/**
* 队列,用于存储带发射的值
*/
class AppendOnlyLinkedArrayList<T> {
private final int capacity; //容量
private final Object[] head; //用于存放值
private Object[] tail;
private int offset; //数组指针
/**
* 指定容量,初始化数组为容量大小
*/
AppendOnlyLinkedArrayList(int capacity) {
this.capacity = capacity;
this.head = new Object[capacity + 1];
this.tail = head;
}
/**
* 向队列插入值
*/
void add(T value) {
final int c = capacity;
int o = offset;
//若容量满了,创建新数组tail,新数组开始位置指向head末尾,向新数组tail[0]插入值
if (o == c) {
Object[] next = new Object[c + 1];
tail[c] = next;
tail = next;
o = 0;
}
//容量未满,按偏移量插入值
tail[o] = value;
offset = o + 1;
}
/**
* 自定义条件
*/
public interface NonThrowingPredicate<T> extends Predicate<T> {
@Override
boolean test(T t);
}
/**
* 便利队列中所有数组,遇到空或返回条件为真退出
*/
@SuppressWarnings("unchecked")
void forEachWhile(NonThrowingPredicate<? super T> consumer) {
Object[] a = head;
final int c = capacity;
while (a != null) {
for (int i = 0; i < c; i++) {
Object o = a[i];
if (o == null) {
break;
}
if (consumer.test((T)o)) {
break;
}
}
//若数组便利完成,便利下一个数组,a[c]指向下一个数组开头
a = (Object[])a[c];
}
}
/**
* 遍历队列中所有数组,回掉值给Relay的accept方法,遇到空值则退出
* @param observer
* @return
*/
@SuppressWarnings("unchecked")
boolean accept(Relay<? super T> observer) {
Object[] a = head;
final int c = capacity;
while (a != null) {
for (int i = 0; i < c; i++) {
Object o = a[i];
if (o == null) {
break;
}
//回掉值给Relay的accept方法
observer.accept((T) o);
}
a = (Object[])a[c];
}
return false;
}
}
public abstract class Relay<T> extends Observable<T> implements Consumer<T> {
/** 接受数据 */
@Override public abstract void accept(T value); // Redeclare without checked exception.
public abstract boolean hasObservers();
/** 封装Relay并序列化调用 */
@NonNull
@CheckReturnValue
public final Relay<T> toSerialized() {
if (this instanceof SerializedRelay) {
return this;
}
return new SerializedRelay<T>(this);
}
}
/**
* 序列化Relay,封装一次Relay
*/
public class SerializedRelay<T> extends Relay<T> {
private final Relay<T> actual;
//表示发射正在进行,由这个控制
private boolean emitting;
//队列
private AppendOnlyLinkedArrayList<T> queue;
SerializedRelay(final Relay<T> actual) {
this.actual = actual;
}
/**
* 形成订阅
*/
@Override
protected void subscribeActual(Observer<? super T> observer) {
actual.subscribe(observer);
}
/**
* 接受来自队列的值
*/
@Override
public void accept(T value) {
synchronized (this) {
//正在发射状态,就先添加值到队列
if (emitting) {
AppendOnlyLinkedArrayList<T> q = queue;
if (q == null) {
q = new AppendOnlyLinkedArrayList<T>(4);
queue = q;
}
q.add(value); //添加到队列
return;
}
emitting = true;
}
//未在发射状态,直接回掉值给Relay的accept方法
actual.accept(value);
emitLoop();
}
/** 遍历队列,回掉值给Relay的accept方法 */
private void emitLoop() {
for (;;) {
AppendOnlyLinkedArrayList<T> q;
synchronized (this) {
q = queue;
//为空说明已发送完成
if (q == null) {
emitting = false;
return;
}
queue = null;
}
//遍历队列q中所有数组,回掉值给Relay的accept方法,遇到空值则退出
q.accept(actual);
}
}
@Override
public boolean hasObservers() {
return actual.hasObservers();
}
}
/**
* 发送订阅之后数据
* @param <T>
*/
public class PublishRelay<T> extends Relay<T> {
//一个空的订阅者数组,以避免一直分配它
@SuppressWarnings("rawtypes")
static final PublishDisposable[] EMPTY = new PublishDisposable[0];
//原子引用,使得多线程操作的是同一个对象
final AtomicReference<PublishDisposable<T>[]> subscribers;
/**
* 生成一个原子引用的订阅者数组EMPTY
*/
@CheckReturnValue
@NonNull
public static <T> PublishRelay<T> create() {
return new PublishRelay<T>();
}
/**
* 原子引用订阅者数组EMPTY
*/
@SuppressWarnings("unchecked")
PublishRelay() {
subscribers = new AtomicReference<PublishDisposable<T>[]>(EMPTY);
}
@Override
protected void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
t.onSubscribe(ps);
add(ps);
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isDisposed()) {
remove(ps);
}
}
/**
* 创建一个比原数组大1的数组,复制原数组到新数组,添加订阅者ps到末尾
*/
void add(PublishDisposable<T> ps) {
for (;;) {
//得到原子引用对象订阅者数组EMPTY
PublishDisposable<T>[] a = subscribers.get();
int n = a.length;
//创建比EMPTY大1的数组,刚开始EMPTY为0,大小从1开始,2,3...
@SuppressWarnings("unchecked")
PublishDisposable<T>[] b = new PublishDisposable[n + 1];
//从索引0复制a数组到b数组,放置起始索引0,复制n项
System.arraycopy(a, 0, b, 0, n);
//新添加的订阅者ps放置在最末尾
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return;
}
}
}
/**
*删除数组中指定订阅者
*/
@SuppressWarnings("unchecked")
void remove(PublishDisposable<T> ps) {
for (;;) {
PublishDisposable<T>[] a = subscribers.get();
if (a == EMPTY) {
return;
}
int n = a.length;
int j = -1;
//找到待删除订阅者索引,赋值给j,退出循环
for (int i = 0; i < n; i++) {
if (a[i] == ps) {
j = i;
break;
}
}
if (j < 0) {
return;
}
PublishDisposable<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
//创建一个大小减1的数组
b = new PublishDisposable[n - 1];
//复制a中前j项到b,要删除的索引为j(第j+1个)
System.arraycopy(a, 0, b, 0, j);
//再复制a中j+2开始项之后到b
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
//若原子引用对象和a是同一个对象,用b替换a
if (subscribers.compareAndSet(a, b)) {
return;
}
}
}
@Override
public void accept(T value) {
if (value == null) throw new NullPointerException("value == null");
for (PublishDisposable<T> s : subscribers.get()) {
//发射value给订阅者
s.onNext(value);
}
}
@Override
public boolean hasObservers() {
return subscribers.get().length != 0;
}
/**
* 封装订阅者, 跟踪其请求并进行取消操作,以便从当前订阅者数组中删除自己.
*/
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
private static final long serialVersionUID = 3562861878281475070L;
final Observer<? super T> downstream;
final PublishRelay<T> parent;
PublishDisposable(Observer<? super T> actual, PublishRelay<T> parent) {
this.downstream = actual;
this.parent = parent;
}
public void onNext(T t) {
//当前引用对象为fals,即订阅状态,发射这个t
if (!get()) {
downstream.onNext(t);
}
}
@Override
public void dispose() {
//设置引用对象为true
if (compareAndSet(false, true)) {
parent.remove(this);
}
}
@Override
public boolean isDisposed() {
return get(); //当前引用对象的boolean值
}
}
}
其他两个大同小异。BehaviorRelay原子应用value中存储了最新的一项数据,当有订阅时会发送给订阅者。
ReplayRelay默认缓存为16,也可指定大小。