今天要讲的是lift()操作符,这个操作符可能大家比较陌生,但如果说到map、flatMap、filter这些操作符大家可能比较常用,而lift是这些操作符的基础,map()等操作符内部都用到了lift,所以在讲解map、flatMap等操作符前,先了解下lift操作符的使用方法及源码实现,对后面理解其他操作符很有帮助。
使用
如下示例,将每条Integer事件的前面拼接上“野猿新一”再发出,也就是将Integer类型的事件转化成了String类型的事件,是不是很神奇,其内部到底是如何实现的呢,让我们深入源码去瞧一瞧。
Observable.from(new Integer[]{1, 2, 3, 4, 5})
.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> child) {
return new Subscriber<Integer>() {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Integer integer) {
String s = "野猿新一:" + integer;
child.onNext(s);
}
};
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("Himmy", s);
}
});
}
结果输出
02-18 10:31:06.214 19566-19566/com.him.hisapp D/Himmy: 野猿新一:1
02-18 10:31:06.214 19566-19566/com.him.hisapp D/Himmy: 野猿新一:2
02-18 10:31:06.214 19566-19566/com.him.hisapp D/Himmy: 野猿新一:3
02-18 10:31:06.214 19566-19566/com.him.hisapp D/Himmy: 野猿新一:4
02-18 10:31:06.214 19566-19566/com.him.hisapp D/Himmy: 野猿新一:5
源码解析
为了降低理解难度,本文解析基于1.0.0版本源码
如下所示lift()方法的源码
由于使用了泛型,一开始看可能很乱,我先说明下源码中R和T所表示的类型
- T是原始事件的类型,上面例子中就是Integer类型
- R是事件转化后的类型,上面例子中就是转化后的String类型
lift中是重新new了一个Observable返回,这也就是为什么一开始的例子中我们创建的是一个Observable<Integer>,事件监听却是一个subscribe<String>,因为lift()新创建了一个Observable<String>
源码中的onSubscribe是一个Observable.OnSubscribe<T>,我习惯上叫他事件源,也就是一开始的事件是由该对象生产后发出的,关于OnSubscribe可以看之前的这篇文章RxJava操作符——Observable.create
我先简要说明下lift()源码实现的流程
- 新创建一个Observable
- 新创建的Observable内部运行了原始Observable
- 新创建一个Subscriber来监听原始的Observable,暂且叫他原始Subscriber
- 原始Subscriber中做一些业务装换,重新将消息转发给新建的Observable的Subscriber
是不是看起来很绕,简单说就是新建一个Observable其内部运行了原始Observable并监听其事件,对事件做相关处理后转发给新建Observable的Subscriber
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(lift).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
hook.onLift(lift)内部目前未做任何处理,直接返回lift,所以Subscriber<? super T> st = hook.onLift(lift).call(o)可以看成Subscriber<? super T> st = lift.call(o),就是通过o也就是新建Observable的Subscriber来创建源Observable的Subscriber。
这个list对象就是我们自定义的Operator对象,Operator的call()方法的作用就是通过传进来的参数Subscriber来创建一个新的Subscriber,说白了其内部就是将新建Subscriber接收到的消息做一些处理后再转发给传进来的参数Subscriber。
还是有点绕,看下我们一开始示例中自定义的Operator就明白了,onCompleted()和onError()方法原原本本的转发,onNext做一些自定义的处理后再转发
new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> child) {
return new Subscriber<Integer>() {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(Integer integer) {
String s = "野猿新一:" + integer;
child.onNext(s);
}
};
}
};