一、简介
RxJava是Reactive Extensions(Rx)的Java VM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。
它扩展了观察者模式以支持数据/事件序列,并添加了运算符,使您可以声明性地将序列组合在一起,同时消除了对低级线程,同步,线程安全和并发数据结构等问题的担忧。
RxJava是轻量级的。它作为单个JAR实施,仅关注可观察的抽象和相关的高阶函数。RxJava支持Java 6或更高版本以及基于JVM的语言,例如Groovy,Clojure,JRuby,Kotlin和Scala。
RxJava有以下五个基类,我们可以在之基础上发现RxJava里的运算符。这五个基类分别是:
- Flowable:用于实现Reactive-Streams模式,并提供了工厂方法,中间运算符以及使用反应式数据流的能力。支持背压(Backpressure)。
- Observable:Observable类是不支持背压的,它是Reactive的一个抽象类,它提供了工厂方法,中间运算符以及消费同步和/或异步数据流的功能。
- Single:Single类为单个值响应实现Reactive Pattern。Single和Observable类似,所不同的是Single只能发出一个值,要么发射成功要么发射失败,也没有“onComplete”作为完成时的回调。
- Comletable:Completable类表示延迟计算,没有任何值,只表示完成或异常。Completable的行为类似于Observable,在计算完成后只能发出完成或错误信号,由onComplete或onError接口来处理,没有onNext或onSuccess等回调接口。
- Maybe:Maybe类表示延迟计算和单个值的发射,这个值可能根本没有或异常。
二、使用
基本使用
Rxjava迄今为止已经更新到3代版本了,不过常用的还是2代版本,也就是我们常说的Rxjava2,因此本文主要介绍Rxjava2的相关内功和原理分析。
首先第一步肯定是将Rxjava2导入到项目中去,通常是通过作为Gradle编译依赖项导入的。
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Rxjava的使用需要满足三个基本要素,其他的都是通过运算符在三要素的基础上添加功能,这三个基本要素分别是:
1.创建Observable(被观察者)
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
//TODO 通过调用onNext() onComlete()onError进行发送事件
}
});
2.创建Observer(观察者)
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
//TODO 准备工作
}
@Override
public void onNext(@NonNull String s) {
//TODO 接收onNext事件并处理
}
@Override
public void onError(@NonNull Throwable e) {
//TODO 接收onError事件并处理
}
@Override
public void onComplete() {
//TODO 接收onComplete事件并处理
}
};
3.subscribe(订阅)
observable.subscribe(observer);
任何形式的Rxjava使用都可以拆分成以上三个部分,当然在实际的使用中为了方便可读大多会被写成链式调用。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
......
}
}).subscribe(new Observer<Integer>() {
......
@Override
public void onSubscribe(@NonNull Disposable d) {
......
}
@Override
public void onNext(@NonNull Integer integer) {
......
}
@Override
public void onError(@NonNull Throwable e) {
......
}
@Override
public void onComplete() {
......
}
});
线程调度Scheduler
在RxJava的默认规则中,事件的发送和消费都是在同一个线程的,而Rxjava的目的就是为了更好地解决异步问题,因此Rxjava引入了一个概念Scheduler来实现线程调度。
那么如何去进行线程调度呢?简单一句话就是:subscribeOn()指定发送事件的线程,而observeOn()指定接收事件的线程。
需要注意的是:
- 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn()只有第一次的有效。
- 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次。
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
那么Rxjava中的线程又有哪些呢?
- Schedulers.io() :代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
- Schedulers.computation() :代表CPU计算密集型的操作, 例如需要大量计算的操作
- Schedulers.newThread() :代表一个常规的新线程
- AndroidSchedulers.mainThread() :代表Android的主线程
三、操作符
Rxjava中的操作符有很多,下面就罗列出一些常用的,不需要记忆,只需要理解运用。
Create
create操作符应该是最常见的操作符了,主要用于产生一个Obserable被观察者对象。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
...
}
});
Map
map基本上算是Rxjava中最简单的操作符之一了,它的作用是对发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化。通俗来讲就是将一个Observable通过某种函数关系,转换成另一个Observable。下面的例子就是将一个发送事件类型为Interger的Observable通过Function函数转换成了发送事件类型为String的Observable。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
......
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
......
}
});
Zip
zip顾名思义专用于压缩合并事件,该合并不是连接,而是两两配对,也就意味着最终配对出的Observable发射事件数目只和少的那个相同。
需要注意的是:
- zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,组合的顺序是严格按照事件发送的顺序来进行的。
- 最终接收器收到的事件数量是和发送器发送事件最少的那个发送器的发送事件数目相同。
Observable.zip(Observable1, Observable2, new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
//通过函数生成zip后的结果
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
......
}
});
Concat
concat操作符作用是将两个Observable连接成一个Observable。注意它是有顺序的,必须等到第一个Observable中的所有事件发送完成后才会开始发送第二个Observable中的事件。如下图发送的顺序是1、2、3、4、5、6。
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
......
}
});
Merge
merge的作用是把多个Observable结合起来,它和concat的区别在于,它不具有顺序性,不用等到第一个Observable发送完所有事件才去发送其他的Observable中的事件。 如下图发送的顺序是未知的。
Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
......
}
});
FlatMap
FlatMap可以把一个Observable通过某种方法转换为多个Observables,然后再把这些分散的Observables装进一个单一的Observable。需要注意的是FlatMap并不能保证事件的顺序,如果需要保证,需要我们用到concatMap。
concatMap
concatMap与FlatMap的唯一区别是concatMap保证了顺序。
distinct
distinct这个操作符非常简单、通俗、易懂,就是简单的去重。下面例子的事件只会发送5次,1、2、3、4、5。
Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
......
}
});
Filter
Filter,顾名思义,过滤器,可以接受一个参数,让其过滤掉不符合我们条件的事件。
Observable.just(7, 13, 45, -51, 9, 10,-7)
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return integer >= 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
......
}
});
timer
timer相当于一个定时任务,其接受两个参数,定时时间和时间单位。默认在新线程中。
Observable.timer(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // timer 默认在新线程,所以需要切换回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
......
}
});
interval
interval操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。默认在新线程中。
Observable.interval(3,2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
......
}
});
Just
Just就是一个简单的Observable依次调用onNext()方法。
Observable.just("1", "2", "3")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
......
}
});
doOnNext
它的作用是让订阅者在接收到数据之前干点有意思的事情。比如保存个数据什么的。
Observable.just(1, 2, 3, 4)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
//TODO 保存数据
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
......
}
});
四、参考链接
五、补充
什么是观察者模式?
定义对象间的一种一对多的依赖关系,一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。
什么是背压?
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。