RxRelay源码解析

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接: https://blog.csdn.net/qq_39507260/article/details/84996735

RxRelay源码解析:
RxRelay源码地址
自定义RxBus + RxRelay
Relays 是既是Observable也是Consumer的RxJava 类型,由于没有onComplete 或 onError,所以发生异常时不会触发下游的终止状态,不会抛出异常。

	 //接收订阅之后数据
     Relay relay = PublishRelay.create().toSerialized();
     //接受订阅之前一个和之后数据
     Relay relay = BehaviorRelay.createDefault("Hello World");
     //不管何时订阅,发送所有数据
     Relay relay = ReplayRelay.create(5).toSerialized();
/**
 * 队列,用于存储带发射的值
 */
class AppendOnlyLinkedArrayList<T> {

    private final int capacity;     //容量
    private final Object[] head;    //用于存放值
    private Object[] tail;
    private int offset;     //数组指针

    /**
     * 指定容量,初始化数组为容量大小
     */
    AppendOnlyLinkedArrayList(int capacity) {
        this.capacity = capacity;
        this.head = new Object[capacity + 1];
        this.tail = head;
    }

    /**
     * 向队列插入值
     */
    void add(T value) {
        final int c = capacity;
        int o = offset;
        //若容量满了,创建新数组tail,新数组开始位置指向head末尾,向新数组tail[0]插入值
        if (o == c) {
            Object[] next = new Object[c + 1];
            tail[c] = next;
            tail = next;
            o = 0;
        }
        //容量未满,按偏移量插入值
        tail[o] = value;
        offset = o + 1;
    }

    /**
     * 自定义条件
     */
    public interface NonThrowingPredicate<T> extends Predicate<T> {
        @Override
        boolean test(T t);
    }

    /**
     * 便利队列中所有数组,遇到空或返回条件为真退出
     */
    @SuppressWarnings("unchecked")
    void forEachWhile(NonThrowingPredicate<? super T> consumer) {
        Object[] a = head;
        final int c = capacity;
        while (a != null) {
            for (int i = 0; i < c; i++) {
                Object o = a[i];
                if (o == null) {
                    break;
                }
                if (consumer.test((T)o)) {
                    break;
                }
            }
            //若数组便利完成,便利下一个数组,a[c]指向下一个数组开头
            a = (Object[])a[c];
        }
    }

    /**
     * 遍历队列中所有数组,回掉值给Relay的accept方法,遇到空值则退出
     * @param observer
     * @return
     */
    @SuppressWarnings("unchecked")
    boolean accept(Relay<? super T> observer) {
        Object[] a = head;
        final int c = capacity;
        while (a != null) {
            for (int i = 0; i < c; i++) {
                Object o = a[i];
                if (o == null) {
                    break;
                }
                //回掉值给Relay的accept方法
                observer.accept((T) o);
            }
            a = (Object[])a[c];
        }
        return false;
    }
}
public abstract class Relay<T> extends Observable<T> implements Consumer<T> {

    /** 接受数据 */
    @Override public abstract void accept(T value); // Redeclare without checked exception.

    public abstract boolean hasObservers();

    /** 封装Relay并序列化调用 */
    @NonNull
    @CheckReturnValue
    public final Relay<T> toSerialized() {
        if (this instanceof SerializedRelay) {
            return this;
        }
        return new SerializedRelay<T>(this);
    }
}
/**
 * 序列化Relay,封装一次Relay
 */
public class SerializedRelay<T> extends Relay<T> {

    private final Relay<T> actual;

    //表示发射正在进行,由这个控制
    private boolean emitting;

    //队列
    private AppendOnlyLinkedArrayList<T> queue;


    SerializedRelay(final Relay<T> actual) {
        this.actual = actual;
    }

    /**
     * 形成订阅
     */
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        actual.subscribe(observer);
    }


    /**
     * 接受来自队列的值
     */
    @Override
    public void accept(T value) {
        synchronized (this) {
            //正在发射状态,就先添加值到队列
            if (emitting) {
                AppendOnlyLinkedArrayList<T> q = queue;
                if (q == null) {
                    q = new AppendOnlyLinkedArrayList<T>(4);
                    queue = q;
                }
                q.add(value);   //添加到队列
                return;
            }
            emitting = true;
        }
        //未在发射状态,直接回掉值给Relay的accept方法
        actual.accept(value);
        emitLoop();
    }

    /** 遍历队列,回掉值给Relay的accept方法 */
    private void emitLoop() {
        for (;;) {
            AppendOnlyLinkedArrayList<T> q;
            synchronized (this) {
                q = queue;
                //为空说明已发送完成
                if (q == null) {
                    emitting = false;
                    return;
                }
                queue = null;
            }
            //遍历队列q中所有数组,回掉值给Relay的accept方法,遇到空值则退出
            q.accept(actual);
        }
    }

    @Override
    public boolean hasObservers() {
        return actual.hasObservers();
    }
}
/**
 * 发送订阅之后数据
 * @param <T>
 */
public class PublishRelay<T> extends Relay<T> {

    //一个空的订阅者数组,以避免一直分配它
    @SuppressWarnings("rawtypes")
    static final PublishDisposable[] EMPTY = new PublishDisposable[0];

    //原子引用,使得多线程操作的是同一个对象
    final AtomicReference<PublishDisposable<T>[]> subscribers;

    /**
     * 生成一个原子引用的订阅者数组EMPTY
     */
    @CheckReturnValue
    @NonNull
    public static <T> PublishRelay<T> create() {
        return new PublishRelay<T>();
    }

    /**
     * 原子引用订阅者数组EMPTY
     */
    @SuppressWarnings("unchecked")
    PublishRelay() {
        subscribers = new AtomicReference<PublishDisposable<T>[]>(EMPTY);
    }

    @Override
    protected void subscribeActual(Observer<? super T> t) {
        PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
        t.onSubscribe(ps);
        add(ps);
        // if cancellation happened while a successful add, the remove() didn't work
        // so we need to do it again
        if (ps.isDisposed()) {
            remove(ps);
        }
    }

    /**
     * 创建一个比原数组大1的数组,复制原数组到新数组,添加订阅者ps到末尾
     */
    void add(PublishDisposable<T> ps) {
        for (;;) {
            //得到原子引用对象订阅者数组EMPTY
            PublishDisposable<T>[] a = subscribers.get();
            int n = a.length;
            //创建比EMPTY大1的数组,刚开始EMPTY为0,大小从1开始,2,3...
            @SuppressWarnings("unchecked")
            PublishDisposable<T>[] b = new PublishDisposable[n + 1];
            //从索引0复制a数组到b数组,放置起始索引0,复制n项
            System.arraycopy(a, 0, b, 0, n);
            //新添加的订阅者ps放置在最末尾
            b[n] = ps;

            if (subscribers.compareAndSet(a, b)) {
                return;
            }
        }
    }

    /**
     *删除数组中指定订阅者
     */
    @SuppressWarnings("unchecked")
    void remove(PublishDisposable<T> ps) {
        for (;;) {
            PublishDisposable<T>[] a = subscribers.get();
            if (a == EMPTY) {
                return;
            }

            int n = a.length;
            int j = -1;
            //找到待删除订阅者索引,赋值给j,退出循环
            for (int i = 0; i < n; i++) {
                if (a[i] == ps) {
                    j = i;
                    break;
                }
            }

            if (j < 0) {
                return;
            }

            PublishDisposable<T>[] b;

            if (n == 1) {
                b = EMPTY;
            } else {
                //创建一个大小减1的数组
                b = new PublishDisposable[n - 1];
                //复制a中前j项到b,要删除的索引为j(第j+1个)
                System.arraycopy(a, 0, b, 0, j);
                //再复制a中j+2开始项之后到b
                System.arraycopy(a, j + 1, b, j, n - j - 1);
            }
            //若原子引用对象和a是同一个对象,用b替换a
            if (subscribers.compareAndSet(a, b)) {
                return;
            }
        }
    }

    @Override
    public void accept(T value) {
        if (value == null) throw new NullPointerException("value == null");
        for (PublishDisposable<T> s : subscribers.get()) {
            //发射value给订阅者
            s.onNext(value);
        }
    }

    @Override
    public boolean hasObservers() {
        return subscribers.get().length != 0;
    }

    /**
     * 封装订阅者, 跟踪其请求并进行取消操作,以便从当前订阅者数组中删除自己.
     */
    static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {

        private static final long serialVersionUID = 3562861878281475070L;

        final Observer<? super T> downstream;

        final PublishRelay<T> parent;

        PublishDisposable(Observer<? super T> actual, PublishRelay<T> parent) {
            this.downstream = actual;
            this.parent = parent;
        }

        public void onNext(T t) {
            //当前引用对象为fals,即订阅状态,发射这个t
            if (!get()) {
                downstream.onNext(t);
            }
        }

        @Override
        public void dispose() {
            //设置引用对象为true
            if (compareAndSet(false, true)) {
                parent.remove(this);
            }
        }

        @Override
        public boolean isDisposed() {
            return get();   //当前引用对象的boolean值
        }
    }
}

其他两个大同小异。BehaviorRelay原子应用value中存储了最新的一项数据,当有订阅时会发送给订阅者。
ReplayRelay默认缓存为16,也可指定大小。

猜你喜欢

转载自blog.csdn.net/qq_39507260/article/details/84996735