RxJava 现在用的比较广泛了,尤其是用在网络中,我们来看看它到底是什么,怎么用。这里以 1.1 版本为例,比较早起的本,使用的时候,先注明引用
implementation 'io.reactivex:rxjava:1.1.7'
我们先来看个简单的例子,这是在Java中写的,
private static void initRxJava1(){
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
System.out.println(" create " + Thread.currentThread().getName() );
// 重点一
subscriber.onNext("hi");
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println( " onCompleted " );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
int num = Runtime.getRuntime().availableProcessors();
System.out.println(s + " " + num + " " + Thread.currentThread().getName());
}
});
}
这个例子中,打印的值为
create main
hi 4 main
onCompleted
我们知道,RxJava 是响应链式编程,顾名思义就是从上往下相应的编程,上面知识一个最简单的例子,我们来分析一下代码,看看它是怎么从上往下执行的
Observable:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
RxJavaHooks:
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
在这里,我们可以简化点,认为 RxJavaHooks.onCreate(f) 中返回的就是 f 值本身,这样,通过第一步 Observable.create() 方法就创建一个 Observable 对象,暂时记为 Ob1,它里面的 onSubscribe 属性纪委 onS1;继续往下看,此时调用了 subscribe() 方法,传入了 Subscriber 对象, Subscriber 是个抽象类,它实现了 Observer 和 Subscription 接口,其实这里传入一个 Observer 对象也是可以的,它会被包装一层,转化为 ObserverSubscriber 对象继续调用 subscribe(Subscriber<? super T> subscriber)方法,ObserverSubscriber 是 Subscriber 的子类,是个装饰模式。继续看 subscribe() 方法,称传入的 Subscriber 对象为 Sub1,
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
这是简化后的代码,其实 try...catch 中可以简化一行代码 observable.onSubscribe.call(subscriber); 也就是说可以是
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
observable.onSubscribe.call(subscriber);
return subscriber;
} catch (Throwable e) {
...
return Subscriptions.unsubscribed();
}
}
此时,在 Ob1 中,subscriber 对象传进来时是 Sub1,先执行它的 onStart() 方法;然后包裹一层,转为为 SafeSubscriber 对象,此时记做 SafeSub1, observable.onSubscribe 对应的是 onS1,也就是说它调用了 call() 方法,里面传入的对象是 SafeSubscriber 类型的 SafeSub1,看看它的代码
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
@Override
public void onCompleted() {
try {
actual.onCompleted();
} catch (Throwable e) {
throw new OnCompletedFailedException(e.getMessage(), e);
}
}
@Override
public void onNext(T args) {
try {
if (!done) {
actual.onNext(args);
}
} catch (Throwable e) {
Exceptions.throwOrReport(e, this);
}
}
}
这是简化后的代码,说是安全,其实就是添加了个boolean属性,防止重复执行,例如 onNext() 方法中就是保留的例子。SafeSubscriber 是 Subscriber 的子类,看它构造方法中调用了 super(actual) 方法,把传进来的 Sub1 传给了父类,这里可以先不用看它的父类,对逻辑没影响。继续回到 observable.onSubscribe.call(subscriber) 这行代码,此时执行了 initRxJava1() 中的 new Observable.OnSubscribe<String>() 对象中的 call() 回调,打印了 create main 字符串,然后执行了 重点一 的代码,如果没有在这两行代码,整个流程到此结束,我们看看,此时 subscriber 就是 SafeSubscriber 类型,即 SafeSub1 , 执行的 onNext() 和 onCompleted() 都是它内部属性也就是 Sub1 对应的方法,Sub1 就是 initRxJava1() 方法中的 new Subscriber<String>() 对象,此时执行它里面的打印方法,这个调用流程就是这样。
看看方法 initRxJava2(),会有个不一样的体验效果
private static void initRxJava2(){
Observable
.just("hello")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println( " onCompleted " );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
int num = Runtime.getRuntime().availableProcessors();
System.out.println(s + " " + num + " " + Thread.currentThread().getName());
}
});
}
打印的值为
hello 4 main
onCompleted
这个明显没有地方调用 onNext() 和 onCompleted() 方法,为什么会调用呢?看看它里面的代码
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
ScalarSynchronousObservable:
public static <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
protected ScalarSynchronousObservable(final T t) {
super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
this.t = t;
}
static final class JustOnSubscribe<T> implements OnSubscribe<T> {
final T value;
JustOnSubscribe(T value) {
this.value = value;
}
@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, value));
}
}
这里是生成了一个 ScalarSynchronousObservable 对象,它是 Observable 的子类,ScalarSynchronousObservable 的构造方法中可以忽略 RxJavaHooks 的方法代码,直接理解为 super(new JustOnSubscribe<T>(t)) 即可,这里是创建了一个 JustOnSubscribe 对象传给了父类,即 onSubscribe 属性为 JustOnSubscribe 对象。继续看 initRxJava2() 中方法,这里也执行到了 subscribe() 方法,老样子,先把 Subscriber 对象包裹一层,转换为 SafeSubscriber 对象,然后执行 JustOnSubscribe 的 call() 方法,方法中的形参就是 SafeSubscriber 类型,先看看 createProducer(s, value) 方法
static <T> Producer createProducer(Subscriber<? super T> s, T v) {
if (STRONG_MODE) {
return new SingleProducer<T>(s, v);
}
return new WeakSingleProducer<T>(s, v);
}
这里创建的是 WeakSingleProducer 对象
static final class WeakSingleProducer<T> implements Producer {
final Subscriber<? super T> actual;
final T value;
boolean once;
public WeakSingleProducer(Subscriber<? super T> actual, T value) {
this.actual = actual;
this.value = value;
}
...
}
s.setProducer(createProducer(s, value)) 中的 s 是 SafeSubscriber 类型,它是 Subscriber 的子类,调用了 setProducer() 方法,看看它的代码
public abstract class Subscriber<T> implements Observer<T>, Subscription {
private static final long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final rx.Subscriber<?> subscriber;
private Producer producer;
private long requested = NOT_SET; // default to not set
protected Subscriber() {
this(null, false);
}
protected Subscriber(rx.Subscriber<?> subscriber) {
this(subscriber, true);
}
protected Subscriber(rx.Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}
public void onStart() {
// do nothing by default
}
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
}
里面比较重要的是构造方法和 setProducer() 这个方法,由于调用的一个参数的构造方法,所以 subscriber 和 subscriptions 都有值,此时 setProducer()中,形参 Producer 是 WeakSingleProducer 类型, s 是 SafeSubscriber 类型,setProducer() 中 synchronized 代码块中,会执行到 passToSubscriber = true这一步,然后就是 subscriber.setProducer(producer) 这行代码,此时 subscriber 就是 initRxJava2() 中的 new Subscriber<String>() 对象,producer 是 WeakSingleProducer,执行后,还是会执行
setProducer(Producer p) 方法,区别是此时方法所属的对象是 Subscriber 而非 SafeSubscriber,我们继续看,由于此时 subscriber 为null,所以 passToSubscriber 值是false,执行 else 中的操作 producer.request(Long.MAX_VALUE) 代码,因此回到了 WeakSingleProducer 中
static final class WeakSingleProducer<T> implements Producer {
final Subscriber<? super T> actual;
final T value;
boolean once;
public WeakSingleProducer(Subscriber<? super T> actual, T value) {
this.actual = actual;
this.value = value;
}
@Override
public void request(long n) {
Subscriber<? super T> a = actual;
T v = value;
try {
a.onNext(v);
} catch (Throwable e) {
return;
}
a.onCompleted();
}
}
简化后代码,此时 actual 就是 SafeSubscriber ,因此调用还是它内部的 onNext() 、onCompleted() 方法,它内部执行 initRxJava2() 中 new Subscriber<String>() 对象的方法。故此,我们没有明文像 initRxJava1() 中的 重点一 代码,却也执行了打印方法。
private static void initRxJava3(){
Observable
.just("hello","world")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println( " onCompleted " );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
int num = Runtime.getRuntime().availableProcessors();
System.out.println(s + " " + num + " " + Thread.currentThread().getName());
}
});
}
注意,just() 方法中传入的是两个字符串,看看运行的结果
hello 4 main
world 4 main
onCompleted
onNext() 方法执行了两次,这是为什么呢?老规矩,看源码
public static <T> Observable<T> just(T t1, T t2) {
return from((T[])new Object[] { t1, t2 });
}
public static <T> Observable<T> empty() {
return EmptyObservableHolder.instance();
}
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}
由此可以看出,如果数组长度为0,返回的是 EmptyObservableHolder 中的单利对象,它只会执行 Subscriber 的 onCompleted() 方法;数组长度为1时,还是调用 just() 方法;数组长度超过1时,这里又创建了 OnSubscribeFromArray 对象。
public final class OnSubscribeFromArray<T> implements Observable.OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
final Subscriber<? super T> child;
final T[] array;
public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}
@Override
public void request(long n) {
...
fastPath();
...
}
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
}
}
这个是简化后的代码,看过 initRxJava2() 方法中的代码分析,知道 OnSubscribeFromArray 中的 call() 方法中的参数 child 是 SafeSubscriber 类型,所以执行 SafeSubscriber 的 setProducer() 方法,最终执行到 FromArrayProducer 对象中的 request() 方法,经过判断,执行 fastPath() 方法,这个方法逻辑比较简单,先是遍历数组,执行 child 也就是 SafeSubscriber 的 onNext() 方法,遍历结束后,执行 onCompleted() 方法,SafeSubscriber 是个包装类,对应执行的是 initRxJava3() 中的 new Subscriber<String>() 对象中的 onNext() 和 onCompleted() 方法。
private static void initRxJava4(){
Observable
.just("a", "b", "c", "d")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + s;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println( " onCompleted " );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s + " " + Thread.currentThread().getName());
}
});
}
这个方法在上面的基础上添加了个 map 方法,它的作用是类型转换,我们这里仅仅是让传入的字符串由单个变为双个,来看它的原理,just() 方法创建了 Observable A,它里面属性 onSubscribe 是 OnSubscribeFromArray, OnSubscribeFromArray 中包含我们传入的字符串数组;map() 方法是在 A 中又创建了一个 Observable 对象,命名为B,属性onSubscribe是
OnSubscribeMap 对象,它把 A 和 map() 中的类型转换Func1作为参数传进构造方法中
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
此时是在 B 中,继续看 subscribe() 方法,我们知道,此时把 initRxJava4() 中的 new Subscriber<String>() 对象转换为 SafeSubscriber 类型,然后调用
OnSubscribeMap 的 call(subscriber) 方法,形参是 SafeSubscriber
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
此时,source 是 Observable A,parent 构造方法中的两个对象是 SafeSubscriber 和 map()中的类型转换Func1,看看 unsafeSubscribe() 方法
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
onSubscribe.call(subscriber);
...
}
简化后的代码就是这样,此时 onSubscribe 是 OnSubscribeFromArray ,subscriber 是 MapSubscriber, call() 方法为
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
有没有很熟悉的感觉,这里是老套路了,看看 MapSubscriber 中的方法
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
此时 actual 是它构造方法传进来的 SafeSubscriber,p 是 FromArrayProducer ,此时看父类中的 setProducer() 方法,上面分析过,先是 SafeSubscriber中的 setProducer() 方法,会走到 subscriber.setProducer(producer) 这一行代码,此时 subscriber 代表的是 initRxJava4() 中的 new Subscriber<String>() 对象,然后继续,这一次会执行到 producer.request(Long.MAX_VALUE) 这行代码,此刻 producer 仍旧是 FromArrayProducer,上面已经有了,再次看它中的代码
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
final Subscriber<? super T> child;
final T[] array;
public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}
@Override
public void request(long n) {
...
fastPath();
...
}
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
}
child 是 MapSubscriber,也就是说会执行它的 onNext(t) 和 onCompleted() 方法,我们just()方法中数组数据,就是在这里通过遍历分发出去的,看看 MapSubscriber 的源码
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
return;
}
actual.onNext(result);
}
@Override
public void onCompleted() {
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
简化后的代码,actual 是 SafeSubscriber ,mapper 是 map()中的类型转换Func1,都是通过构造方法传递进来的。此时 mapper.call(t) 执行了数据转换,得到新的数据 result,然后调用了 SafeSubscriber 的 onNext(result) 方法,进而调用 initRxJava4()中的 new Subscriber<String>() 对象中的方法,onCompleted() 也是同样的道理。
private static void initRxJava5(){
Observable
.just("a b c", "12 3")
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String file) {
return Observable.from(file.split(" "));
}
})
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return "a".equals(s);
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + s;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println( " onCompleted " );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s + " " + Thread.currentThread().getName());
}
});
}
运行的结果是
a main
onCompleted
这个方法中添加了转换、拦截等功能,flatMap() 方法把 Observable 对象中的一个数组中的两个对象,转换为了两个 Observable 对象,每个对象中的数组内容是 "a b c" 、 "12 3" 这两个字符串分别根据 " " 来拆分出来的新数组 {a,b,c} 、{12,3} ,通过 Observable.from() 生成对象;接着就是拦截功能,看看里面的判断,这里是只有返回 true 才会往下面执行,false的话就到此为止了,继续下一个数据;map() 中是个转换,把字符串叠加;最后一个方法是打印。这个方法的源码和前面一样,都是通过包装和回调,执行代码。
RxJava 的初级用法就是回调和包装类,单纯这样使用基本无用,本篇文章的介绍只是一个入门。