RxJava 之二—— Lift()详解

转载请标明出处:http://blog.csdn.net/xx326664162/article/details/53611268 文章出自:薛瑄的博客

RxJava最让人兴奋的就是它有各种各样的操作符,什么map呀,flatMap呀各种,我们今天要知其然知其所以然,那么他们是如何实现功能的呢?

下面通过一个例子,逐步深入分析。最后面还会再进行一次总结

例子:

代码块一:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
})
.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "word";
    }
})
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("rx", s);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

变换的原理:lift()

这些变换map(),flatMap()虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。

我们先看下进行链式调用map之后,发生了什么。

代码块二:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

对,就是调用了lift()!,先来看一下OperatorMap这个类是个什么东西,看似有点多,其实很简单:

代码块三:

public final class OperatorMap<T, R> implements Operator<R, T> {

    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        return parent;
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaPluginUtils.handleException(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

使用传递进来的Func1参数,生成一个Subscriber类型的类,调用OperatorMap的call()函数,将返回这个类

看一下 lift() 的内部实现(仅核心代码):

代码块四:

lift()函数主要就是使用代码块一中,map()函数中的Func1()来生成一个新的Observable,后续的操作使用这个新的Observable。这里知道一下,会有两个Observable对象

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

来看一下这个新的Observable使用的OnSubscribe

代码块五:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

分析一下这段代码:

  1. 第 行,Subscriber<? super T> st = hook.onLift(operator).call(o);这里的call(),其实就是代码块三中的call()函数,返回的是一个Subscriber的父类
  2. 第 行,parent.call(st);这里的parent是第一个Observable的onSubScribe

先记住这里会有两个Observable,下面会详细分析。

subscribe()源码

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

代码块六

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

对整个过程进行详解:

  • Observable.subscribe() 源码(代码块六)第三行的 onSubscribe 指的是 Observable 中的 onSubscribe 对象(在Observable.create()时传入的),但是 lift() 之后的情况就复杂了点。

  • 当含有 lift() 时:

    1. lift() 创建了一个 Observable ,我们把它取名为后Observable$2,加上之前的原始 Observable,同样把它取名为Observable$ 1,已经有两个 Observable 了;
    2. 同样地,Observable$2 里的新 OnSubscribe$2 加上之前的 Observable$1 中的原始 OnSubscribe$1,也就有了两个 OnSubscribe;
    3. 当用户调用经过 lift() ,再调用subscribe(), 使用的是Observable$2.subscribe() 。因此 subscribe()源码(代码块六)中的onSubscribe.call(subscriber),也是调用Observable$2 .OnSubscribe$2对象(即在 lift() 中创建Observable时新创建的OnSubscribe(代码块五的这个对象));
      • onSubscribe$2.call() 方法中的parent (onSubscribe的父类 ),是Observable$ 1 .onSubscribe$1 。

      • parent.call(st);(代码块五第 行)使用的是Observable$ 1 .onSubscribe$1(任务计划列表),执行新的任务newSubscriber。

这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

这里写图片描述

网上关于这部分的理解,很多博客都使用了这篇博客给 Android 开发者的 RxJava 详解的图,虽然其他部分讲的不错,但是画出的图并不是很贴切代码,理解起来很费劲。所以这类没有引用

参考:谜之RxJava (二) —— Magic Lift

猜你喜欢

转载自blog.csdn.net/feirose/article/details/56482242