本篇学习RxAndroid的使用。
通过前面的学习,已经掌握了Thread + Runnable和Handler、AsyncTask的两种方式,不了解的可以看以前的文章。
Android异步学习(一):Thread和Runnable
Android异步学习(二):Handler和AsyncTask
Android异步学习(三):初探 RxAndroid 3.0
RxJava
那么RxJava是什么?
在GitHub上的说明是这么描述的:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
太难了,翻译过来就是
一个XXX的异步的库。
RxJava和RxAndroid的关系:RxAndroid是RxJava在Android的一个扩展库。
那么RxAndroid对于其他异步工具的优势是什么呢?
简洁!因为他相对于其他的足够简洁,尤其是在复杂的逻辑中。
观察者模式
RxAndroid采用的是观察者模式。下面引用扔物线大神的原话解释RxAndroid中的观察者模式:
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxAndroid的使用
- 依赖
implementation ‘io.reactivex.rxjava3:rxandroid:3.0.0’
-
创建Observable
即被观察者,决定什么时候触发事件以及触发怎样的事件。创建Observable有多种方式,下面是基本的create、just和from。
// 常用的创建方式,手动调用发送
Observable mObservable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {
emitter.onNext("琴伤 - Jay");
emitter.onComplete();
}
});
// 可变参数创建,使用just创建,最多9个参数,自动调用onNext发送
Observable mObservable = Observable.just("琴伤", " - ", "Jay");
// 使用form创建,进行遍历依次发送
String [] arrString = {
"琴伤", " - ", "Jay"};
Observable mObservable = Observable.fromArray(arrString);
-
创建Observer / Subscriber
即观察者,它决定事件触发的时候将有怎样的行为。
Subscriber是对Observer接口的实现,Observer也会被转换为Subscriber再使用。
基本上没有区别,区别就在于Subscriber增加了onStart和unsubscribe,这两个方法分别用来做事件发送之前的准备工作和取消订阅。
Observer mObserver = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(String s) {
Toast.makeText(mContext, s, Toast.LENGTH_SHORT).show();
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
-
订阅Subscribe
被观察者订阅观察者。
mObservable.subscribe(mObserver);
另外也有匿名的创建方式
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
// 处理业务逻辑,然后onNext()传递处理结果给观察者
emitter.onNext("自导自演 - Jay");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
// 得到处理结果后,执行需要的操作
Toast.makeText(mContext, s, Toast.LENGTH_SHORT).show();
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
-
指定线程
创建完成之后,我们可以通过指定线程的方式让耗时操作和更新UI分别在不同的线程中。
mObservable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
subscribeOn表示订阅所在的线程即我们处理耗时操作的逻辑所在的线程。
observeOn表示通知的线程,处理完成后会在指定的线程进行通知。
- 取消订阅
// 通过Disposable的dispose进行取消订阅
Disposable subscribe = Observable.just("琴伤", " - ", "Jay")
.subscribeOn(Schedulers.newThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Toast.makeText(mContext, s, Toast.LENGTH_SHORT).show();
}
});
subscribe.dispose();
- 延时定时
// 延时1s
mObservable.delay(1, TimeUnit.SECONDS);
// 定时执行,每24小时执行一次
Observable.interval(24, TimeUnit.HOURS);
参考文档
扔物线大神的RxAndroid详解
Observable创建方式
RxJava的基础使用
如若有错误之处,烦请指出。