目录
附:RxJava源码地址: https://github.com/ReactiveX/RxJava
1 概念和说明
1.1 响应式编程
即rective programming, 是一种通过异步和数据流来构建事物关系的编程模型,用事件来驱动事务。
1.2 RxJava
一个通过观察者模式实现响应式编程的库。
优点:a. 代码体现为为链式调用,而非嵌套式。
b. 方便的实现了线程调度和数据转换(即中间事务的转换)
c. android中常用来和retrofit配合
1.3 关于RxJava和RxAndroid
两者差不多,RxAndroid是RxJava针对Android平台做了一些调整。可以理解为RxAndroid是专供 android的RxJava。
1.4 关于响应式编程和普通编程
场景:有三个任务A、B、C,B的执行依赖于A的执行结果,同样,C的执行结果依赖于B,则两种编程的作废分别为:
普通编程:先执行A,A执行完后执行B,B执行完执行C,即除了3个任务本身的执行过程外,3个任务之间的依赖执行逻辑也要由开发者编写。
响应式编程:会将3个任务做依赖关系绑定:C 依赖 B 依赖 A,二这个依赖绑定的先后执行逻辑由语言或调用库来支持,开发者只需专注于3个任务本身的执行逻辑,而不必处理3个任务间的执行逻辑,只需要把它委托给语言或调用库即可。之后,B会自动响应A的执行结果,C会自动响应B的执行结果。
总结:响应式编程优雅的处理了任务(业务或事务)间的依赖关系。
2. 基本使用
2.1 基本元素关系图
----> observables 是被观察者、事件源,用来处理事件的派发。
----> observer/subscriber 是观察者、订阅者,观察/订阅目标是observables,observables派发出来的事件由他处理。
observer 和 subscriber 都是观察者/订阅者,后者是前者的扩展,内部增加了 onStart()方法:在时间发送之前订阅,用于做一些准备工作;还增加了 unSubscribe(),用于取消订阅。
----> subscribe 是一个动作,将两者建立“订阅”关系,一旦订阅关系建立,observer/subscriber就可以立即接受、响应observables的变化。
2.2 代码示例:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("emit A");
observableEmitter.onNext("emit B");
observableEmitter.onNext("emit C");
observableEmitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println("onNext");
System.out.println(s);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
observable.subscribe(observer);
执行结果:
onSubscribe
onNext
emit A
onNext
emit B
onNext
emit C
onComplete
----> observer 的 onSubscribe(Disposable disposable)方法在执行订阅动作后立即执行回调,在这里可以用 disposable 取消订阅或者将该对象保存起来以后再取消订阅。
----> oberver 的 onNext()在emitter.onNext()执行后回调执行,表示响应被观察者事件,onNext()方法会执行多次,表示有多个事件,具体执行次数由被观察者决定。
----> observer 的 onComplete()在被观察者的 onComplete()执行后回调执行,表示事件全部完成,通常在这里做一些收尾工作。
----> observer 的 onError()在被观察者执行过程中的任何位置出现异常时回调执行,表示事件执行异常。同 onComplete()互斥。
通过以上说明可以发现:在 observer 中,除了 onSubscribe(Disposable d)以外,其他方法都和创建 observable 时发射器 observableEmitter 的方法同名,且都在同名方法执行后回调。比如,observableEmitter 执行一次 onNext(),则 observer 中的 onNext()方法执行一次。
onComplete()和onError()互斥,两者智能触发其一(且其中一个触发后,onNext()变不会再执行):
a. onComplete()触发后,后续 observableEmitter 调用任何方法都不在生效;
b. onError()触发后,如果 observableEmitter 再调用 onComplete()或 onError(),RxJava会抛出异常,开发者需要自行保证唯一性。
特别需要注意的是:ObservableOnSubscribe 的 subscribe()方法在每次有新的observer加入时,都会在 observer 的 onSubscribe()回调后触发,这就保证了所有的观察者都可以接收到事件。
2.3 关于subscribe()
即订阅动作,他将观察者和被观察者建立了订阅关系,通过该方法,observable 回调 observer 的对应方法,从而达到 观察者相应被观察者发出的事件(实际上被观察者只负责产生事件,他是事件源,真正发送事件的是他在订阅时,即subscribe()被调用时)。
有多个重载的方法:
// 1.观察者对被观察者发送的任何事件都响应
@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}
// 2.观察者对被观察者发出的事件作出响应(被观察者还可以继续发送事件)
@SchedulerSupport("none")
public final Disposable subscribe() {
return this.subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
// 3.观察者只对被观察者发出的onNext()事件作出响应
@SchedulerSupport("none")
public final Disposable subscribe(Consumer<? super T> onNext) {
return this.subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
// 4.观察者只对被观察者发出的onNext()、onError()事件作出响应
@SchedulerSupport("none")
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
// 5.观察者只对被观察者发出的onNext()、onError()、onComplete()事件作出响应
@SchedulerSupport("none")
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
return this.subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}
// 6.观察者只对被观察者发出的onNext()、onError()、onComplete()、onSubscribe()事件作出响应
@SchedulerSupport("none")
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);
this.subscribe((Observer)ls);
return ls;
}
其中,2、3、4、5都this到6,而6最后又调用了1。另外,上面增加了两个新类:Consumer 和 Action :observer 中的 onNext()、onComplete()、onError()被 Consumer.accept()代替;observer中的 onComplete()方法被 Action.run()方法代替。因此,如果只关心observer中的一个或几个回调方法,则可以通过使用 Consumer 或 Action 来替换observer注册到被观察者中。 比如:上面的示例代码,如果只关心onNext(),则可以写为:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("emit A");
observableEmitter.onNext("emit B");
observableEmitter.onNext("emit C");
observableEmitter.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
observable.subscribe(consumer);
2.4 线程调度
2.4.1 线程调度
默认情况下,被观察者和观察者处于同一线程中。而异步才是RxJava的核心,所有需要用到线程调用,其实现通过 Scheduler 来实现:
a. subscribeOn():指定被被观察者在subscribe所发生的线程(即指定事件源的线程)。
多次指定发射事件的线程只有第一次指定有效,也就是多次调用只有第一次有效,之后的调用全被忽略。
b. observerOn():指定观察者/订阅者接受/响应事件的线程(即指定接受事件的线程)。
多次指定接受事件的线程是可以的,每指定一次,线程就切换一次,所以以最终指定的为准,即多次指定,以最后一次为准。
2.4.2 RxJava内置的常用的线程项:
a. Schedulers.io(): 代表执行io操作的线程,常用于网络请求、文件读写等io密集型操作的地方。行为模式和 new Thread 类似,只是其内部维护了一个无上限的线程池,更高效。
b. Schedulers.newThread():总是启用新线程,在新线程中执行当前事务。
c. Schedulers.complutation():代表cpu计算密集型的操作,既不会被io操作限制性能的操作,如图形计算等。内部维护了一个固定大小(大小为cpu的核数)的线程池。不要把io操作放到该线程项中,浪费cpu的资源。
d. AndroidSchedulers.mainThread():android的主线程,用于更新UI,为android独有。