RxJava之转换操作符源码介绍

转载请以链接形式标明出处:
本文出自:103style的博客

转换相关的操作符 以及 官方介绍

RxJava转换操作符 官方介绍 :Transforming Observables

以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析

buffer

  • 官方示例:
    Observable.range(0, 10)
        .buffer(4)
        .subscribe((List<Integer> buffer) -> System.out.println(buffer));
    
    输出:
    [0, 1, 2, 3]
    [4, 5, 6, 7]
    [8, 9]
    
  • 返回对象的 ObservableBuffersubscribeActual 方法:
    单参数bufferskipcount 是相等的。
    protected void subscribeActual(Observer<? super U> t) {
        if (skip == count) {
            BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
            if (bes.createBuffer()) {//1.0
                source.subscribe(bes);
            }
        } else {
            source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
        }
    }
    
    • (1.0) createBuffer即新创建了一个ArrayList对象 buffer
  • onNext(T t)onComplete()方法:
    public void onNext(T t) {
        U b = buffer;
        if (b != null) {
            b.add(t);
            if (++size >= count) {//1.0
                downstream.onNext(b);
                size = 0;
                createBuffer();
            }
        }
    }
    
    public void onComplete() {
        U b = buffer;
        if (b != null) {//2.0
            buffer = null;
            if (!b.isEmpty()) {
                downstream.onNext(b);
            }
            downstream.onComplete();
        }
    }
    
    • (1.0) 每次调用onNext 就检查缓存的事件数是否 不小于 buffer操作符设置的 值,成立则将缓存的 buffer 数组 传给观察者的 onNext
    • (2.0) onComplete 是检查缓存的事件数是否不为空,成立则将缓存的 buffer 数组 传给观察者的 onNext,再调用观察者的 onComplete

cast

  • 官方示例:
    Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
    numbers.filter((Number x) -> x instanceof Integer)
            .cast(Integer.class)
            .subscribe((Integer x) -> System.out.println(x));
    
    输出:
    1
    7
    12
    5
    
  • cast 是通过map操作符来实现的,我们直接看map
    public final <U> Observable<U> cast(final Class<U> clazz) {
        ObjectHelper.requireNonNull(clazz, "clazz is null");
        return map(Functions.castFunction(clazz));
    }
    
    • apply方法:
      public U apply(T t) throws Exception {
          return clazz.cast(t);
      } 
      

map

  • 官方示例:
    Observable.just(1, 2, 3)
            .map(x -> x * x)
            .subscribe(System.out::println);
    
    输出:
    1
    4
    9
    
  • 返回对象的 ObservableMapsubscribeActual 方法:
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
  • 继续看 MapObserveronNext(T t)
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != NONE) {
            downstream.onNext(null);
            return;
        }
        U v;
        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        downstream.onNext(v);
    }
    
    • 即将 map 操作符 传入Function对象的 返回值 传递给 链式调用上一步的返回对象的 onNext(T t)

concatMap

RxJava之concatMap系列转换操作符源码介绍


flatMap

RxJava之flatMap系列转换操作符源码介绍


flattenAsFlowable

  • 官方示例:
    Single<Double> source = Single.just(2.0);
    Flowable<Double> flowable = source.flattenAsFlowable(x -> {
        return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
    });
    flowable.subscribe(x -> System.out.println("onNext: " + x));
    
    输出:
    onNext: 2.0
    onNext: 4.0
    onNext: 8.0
    
  • 我们先看Single.just(2.0)
    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    
  • SingleJustsubscribeActual
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }
    
  • flattenAsFlowable返回对象的 SingleFlatMapIterableFlowablesubscribeActual 方法:
    protected void subscribeActual(Subscriber<? super R> s) {
        source.subscribe(new FlatMapIterableObserver<T, R>(s, mapper));
    }
    
  • 继续看 FlatMapIterableObserveronSubscribe(Disposable d)onSuccess(T value)
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);//1.0
        }
    }
    public void onSuccess(T value) {
        Iterator<? extends R> iterator;
        boolean has;
        try {
            iterator = mapper.apply(value).iterator();//2.0 调用apply返回的Iterable对象的 iterator()方法。
            has = iterator.hasNext();//3.0
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            downstream.onError(ex);
            return;
        }
        if (!has) {
            downstream.onComplete();//3.1
            return;
        }
        this.it = iterator;//3.2
        drain();
    }
    
    • (1.0) 通过Flowable subscribe流程介绍 我们知道downstream.onSubscribe(this)即调用 FlowableInternalHelper.RequestMax.INSTANCEaccept方法:
      public enum RequestMax implements Consumer<Subscription> {
          INSTANCE;
          @Override
          public void accept(Subscription t) throws Exception {
              t.request(Long.MAX_VALUE);
          }
      }
      
      即:FlatMapIterableObserver.request(Long.MAX_VALUE): 即为设置变量requestedvalueLong.MAX_VALUEdrain()因为it 变量还是null,所以没做什么操作。
      public void request(long n) {
          if (SubscriptionHelper.validate(n)) {
              BackpressureHelper.add(requested, n);
              drain();
          }
      }
      
    • (2.0) 调用flattenAsFlowable传入的Functionapply返回的Iterable对象的 iterator()方法。
    • (3.0) 检查Iterable时候为空,(3.1) 为空直接onComplete()(3.2) 不为空则将 iterator()返回值赋值给当前的 it 变量,继续执行drain()
  • drain():
    void drain() {
        ...
        Subscriber<? super R> a = downstream;
        Iterator<? extends R> iterator = this.it;
        ...
        int missed = 1;
        for (; ; ) {
            if (iterator != null) {
                long r = requested.get();
                long e = 0L;
                if (r == Long.MAX_VALUE) {//1.0
                    slowPath(a, iterator);
                    return;
                }
                ...
            }
            ...
        }
    }
    
    • 因为上一步downstream.onSubscribe(this)调用了request(Long.MAX_VALUE), 所以 (1.0) 这里条件成立,执行slowPath(downstream iterator)
  • slowPath(downstream iterator)
    void slowPath(Subscriber<? super R> a, Iterator<? extends R> iterator) {
        for (;;) {
            if (cancelled) {
                return;
            }
            R v;
            try {
                v = iterator.next();//1.0
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                a.onError(ex);
                return;
            }
            a.onNext(v);//1.1
            if (cancelled) {
                return;
            }
            boolean b;
            try {
                b = iterator.hasNext();//1.2
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                a.onError(ex);
                return;
            }
            if (!b) {
                a.onComplete();//1.3
                return;
            }
        }
    }
    
    • (1.0) 获取到的元素,(1.1)传递给downstreamonNext(1.2)然后判断是否还有其他元素,如果有则循环继续,没有的话即调用 downstreamonComplete 结束。

flattenAsObservable

  • 官方示例:
    Single<Double> source = Single.just(2.0);
    Observable<Double> observable = source.flattenAsObservable(x -> {
        return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
    });
    observable.subscribe(x -> System.out.println("onNext: " + x));
    
    输出:
    onNext: 2.0
    onNext: 4.0
    onNext: 8.0
    

subscribeActual实现的逻辑和 flattenAsFlowable 类似,只是返回的对象为 SingleFlatMapIterableObservable,就不再赘述了。


groupBy

  • 官方示例:

    Observable<String> animals = Observable.just(
        "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
    animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
        .concatMapSingle(Observable::toList)
        .subscribe(System.out::println);
    

    输出:

    [TIGER, TURTLE]
    [ELEPHANT]
    [CAT, CHAMELEON]
    [FROG, FISH, FLAMINGO]
    
  • 我们来看看返回对象的ObservableGroupBy:

    public GroupByObserver(Observer<? super GroupedObservable<K, V>> actual, 
                           Function<? super T, ? extends K> keySelector, 
                           Function<? super T, ? extends V> valueSelector, 
                           int bufferSize, boolean delayError) {
        this.downstream = actual;
        this.keySelector = keySelector;
        this.valueSelector = valueSelector;
        this.bufferSize = bufferSize;
        this.delayError = delayError;
        this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
        this.lazySet(1);
    }
    
    public void subscribeActual(Observer<? super GroupedObservable<K, V>> t) {
        source.subscribe(new GroupByObserver<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError));
    } 
    
  • 继续看 GroupByObserveronNext(T t)

    public void onNext(T t) {
        K key;
        try {
            key = keySelector.apply(t);//1.0
        } catch (Throwable e) {
            ...
            return;
        }
        Object mapKey = key != null ? key : NULL_KEY;
        GroupedUnicast<K, V> group = groups.get(mapKey);
        if (group == null) {
            if (cancelled.get()) {
                return;
            }
            group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
            groups.put(mapKey, group);//2.0
            getAndIncrement();
            downstream.onNext(group);//3.0
        }
    
        V v;
        try {
            v = ObjectHelper.requireNonNull(valueSelector.apply(t), "The value supplied is null");//4.0
        } catch (Throwable e) {
            ...
            return;
        }
        group.onNext(v);//4.1
    }
    
    • (1.0) 通过keySelector.apply(t)即官方示例中的 animal.charAt(0)获取分组的 key
    • (2.0) 如果GroupedUnicast不存再这个key,则保存进去。
    • (3.0) 然后继续调用上一步操作符的 onNext方法,即官方示例中的just
    • (4.0) 通过valueSelector.apply(t)即官方示例中的 String::toUpperCase)获取值,(4.1)添加到ToListObservercollection中。
  • 最后通过 onComplete()输出:

    public void onComplete() {
        List<ObservableGroupBy.GroupedUnicast<K, V>> list = new ArrayList<ObservableGroupBy.GroupedUnicast<K, V>>(groups.values());
        groups.clear();
        for (ObservableGroupBy.GroupedUnicast<K, V> e : list) {
            e.onComplete();
        }
        downstream.onComplete();
    }
    

    ToListObserveronComplete():

    public void onComplete() {
        U c = collection;
        collection = null;
        downstream.onNext(c);
        downstream.onComplete();
    }
    

scan

  • 官方示例:
    Observable.just(5, 3, 8, 1, 7)
            .scan(0, (partialSum, x) -> partialSum + x)
            .subscribe(System.out::println);
    
    输出:
    0
    5
    8
    16
    17
    24
    
  • 我们来看看返回对象的ObservableScanSeed:
    public ObservableScanSeed(ObservableSource<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> accumulator) {
        super(source);
        this.accumulator = accumulator;
        this.seedSupplier = seedSupplier;
    }
    
    @Override
    public void subscribeActual(Observer<? super R> t) {
        R r;
        try {
            r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null");//1.0
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, t);
            return;
        }
        source.subscribe(new ScanSeedObserver<T, R>(t, accumulator, r));
    }
    
    • (1.0) seedSupplier.call()即官方示例中的 0,即设置 r 的值为0.
  • 继续看ScanSeedObserveronNext(T t):
    public void onNext(T t) {
        if (done) {
            return;
        }
        R v = value;//1.0
        R u;
        try {
            u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");//2.0
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            upstream.dispose();
            onError(e);
            return;
        }
        value = u;//3.0
        downstream.onNext(u);//4.0
    }
    
    • (1.0) value 即为上一步设置的 r0.
    • (2.0) accumulator.apply(v, t) 即为官方示例中的 partialSum + x
    • (3.0) 更新value 的值
    • (4.0)accumulator.apply(v, t)传递给观察者的onNext

switchMap

  • 官方示例:
    Observable.interval(0, 1, TimeUnit.SECONDS)
            .switchMap(x -> {
                return Observable.interval(0, 750, TimeUnit.MILLISECONDS)
                        .map(y -> x);
            })
            .takeWhile(x -> x < 3)
            .blockingSubscribe(System.out::print);
    
    
    输出:
    001122
    
  • 我们来看看返回对象的ObservableSwitchMap:
    public ObservableSwitchMap(ObservableSource<T> source,
                               Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize,
                                       boolean delayErrors) {
        super(source);
        this.mapper = mapper;
        this.bufferSize = bufferSize;
        this.delayErrors = delayErrors;
    }
    
    @Override
    public void subscribeActual(Observer<? super R> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(new SwitchMapObserver<T, R>(t, mapper, bufferSize, delayErrors));
    }
    
  • 继续看SwitchMapObserveronNext:
    public void onNext(T t) {
        long c = unique + 1;
        unique = c;
        SwitchMapInnerObserver<T, R> inner = active.get();
        if (inner != null) {
            inner.cancel();
        }
        ObservableSource<? extends R> p;
        try {
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The ObservableSource returned is null");//1.0
        } catch (Throwable e) {
            ...
            return;
        }
        SwitchMapInnerObserver<T, R> nextInner = new SwitchMapInnerObserver<T, R>(this, c, bufferSize);//2.0
        for (;;) {
            inner = active.get();
            if (inner == CANCELLED) {
                break;
            }
            if (active.compareAndSet(inner, nextInner)) {
                p.subscribe(nextInner);//3.0
                break;
            }
        }
    }
    
    • (1.0) 通过mapper.apply(t)即官方示例中的 Observable.interval(0, 750, TimeUnit.MILLISECONDS).map(y -> x)返回的ObservableMap对象。
    • (2.0) 构建SwitchMapInnerObserver对象
    • (3.0) 用返回的ObservableMap订阅SwitchMapInnerObserver对象

window

  • 官方示例:
    Observable.range(1, 10)
            // Create windows containing at most 2 items, and skip 3 items before starting a new window.
            .window(2, 3)
            .flatMapSingle(window -> {
                return window.map(String::valueOf)
                        .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
            })
            .subscribe(System.out::println);
    
    输出:
    [1, 2]
    [4, 5]
    [7, 8]
    [10]
    
  • 我们来看看返回对象的ObservableWindow:
    public ObservableWindow(ObservableSource<T> source, long count, long skip, int capacityHint) {
        super(source);
        this.count = count;
        this.skip = skip;
        this.capacityHint = capacityHint;
    }
    
    @Override
    public void subscribeActual(Observer<? super Observable<T>> t) {
        if (count == skip) {
            source.subscribe(new WindowExactObserver<T>(t, count, capacityHint));
        } else {
            source.subscribe(new WindowSkipObserver<T>(t, count, skip, capacityHint));
        }
    }
    
  • 继续看WindowSkipObserveronNext:
    public void onNext(T t) {
        final ArrayDeque<UnicastSubject<T>> ws = windows;
        long i = index;
        long s = skip;
        if (i % s == 0 && !cancelled) {//3.0
            wip.getAndIncrement();
            UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
            ws.offer(w);
            downstream.onNext(w);
        }
        long c = firstEmission + 1;
        for (UnicastSubject<T> w : ws) {
            w.onNext(t);//1.0
        }
        if (c >= count) {
            ws.poll().onComplete();//2.0
            if (ws.isEmpty() && cancelled) {
                this.upstream.dispose();
                return;
            }
            firstEmission = c - s;
        } else {
            firstEmission = c;
        }
        index = i + 1;
    }
    
    • (1.0) 将元素存入 queue
    • (2.0) 当元素个数到达count时,就之前的元素全部输出
    • (3.0) 当元素个数到达skip时,就重新创建一个UnicastSubject来存储元素

以上

转载于:https://www.jianshu.com/p/d693017a706a

猜你喜欢

转载自blog.csdn.net/weixin_34060299/article/details/91052817