上一篇文章介绍了,创建、转化类、过滤类、合并类的操作符RxJava2.x学习教程(二)常用操作符,本文接着学习其余操作符,如下:
错误处理类
Retry
当原始Observable在遇到错误时进行重试,目的是希望本次订阅不以失败事件结束!
Observable.just(1, "2")
.cast(Integer.class)//将被观察者发送的事件数据类型强转为Integer
.retry(3)//如果出错 重试三次, 如果不写参数,表示一直重复尝试
.subscribe(integer -> Log.e("TAG", "" + integer),
throwable -> Log.e("TAG", throwable.getMessage()));
//运行结果:1 1 1 1 java.lang.String cannot be cast to java.lang.Integer
Catch类
捕获错误,并进行处理!区别于retry的重试!
//onErrorReturn 当原先的被观察者遇到错误时,再去发射一个事件
Observable.just(1,"2")
.cast(Integer.class)
.onErrorReturn(throwable -> {
Log.e("TAG", throwable.getMessage());
return 0;
}).subscribe(integer -> Log.e("TAG", "" + integer));
//结果:1
// java.lang.String cannot be cast to java.lang.Integer
// 0
//onErrorReturnItem底层调用就是onErrorReturn
Observable.just(1,"2")
.cast(Integer.class)
.onErrorReturnItem(0)
.subscribe(integer -> Log.e("TAG", "" + integer));
//结果:1 0
//onErrorResumeNext: 当原始Observable在遇到错误时,使用其他Observable的数据序列
Observable.just(1, "2")
.cast(Integer.class)
.onErrorResumeNext(Observable.just(3,4))
.subscribe(integer -> Log.e("TAG", "" + integer));
//结果:1 3 4
实用工具类( 辅助类)
Delay
延迟发射
Observable.fromArray(1,2,3)
.delay(3, TimeUnit.SECONDS)//3s以后再去发射事件 整体延迟 区别于interval对每个事件的延迟效果
.subscribeOn(Schedulers.io())//io线程发射事件
.observeOn(AndroidSchedulers.mainThread())//主线程获取通知
.subscribe(integer -> Log.e("TAG", "" + integer));
// 1 2 3
Do系列
主要是通过注册回调的方式,监听被观察者的生命周期(链式操作的各个过程)。
doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle
doOnNext doOnSubscribe doOnTerminate onTerminateDetach
doOnNext 监听emitter.onNext()事件, 可以做一些想做的事情,比如数据保存,不会原来改变发射的数据,发生在emitter.onNext()之前
Observable.just(1,2,3)
.doOnNext(integer -> Log.e("TAG",++integer + "监听emitter.onNext() 不会原来改变发射的数据"))
.subscribe(integer -> Log.e("TAG", "" + integer));
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 2监听emitter.onNext() 不会原来改变发射的数据
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 1
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 3监听emitter.onNext() 不会原来改变发射的数据
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 2
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 4监听emitter.onNext() 不会原来改变发射的数据
05-23 10:23:14.095 3702-3702/com.example.rxjava.rxjavademo E/TAG: 3
doAfterNext 发生在onNext()之后,因为它在订阅过程中是共享的,所以应当注意线程安全的问题!
Observable.just(1,2,3)
.doAfterNext(integer -> Log.e("TAG", "监听: " + ++integer))
.subscribe(integer -> Log.e("TAG", "" + integer));
doOnComplete:监听OnComplete
Observable.just(1,2,3)
.doOnComplete(() -> Log.e("TAG", "onComplete over"))
.subscribe(integer -> Log.e("TAG", "" + integer));
//1 2 3 onComplete over
还有一些,用法类似:
Observable.just("1","2")
.doOnNext(s -> Log.e("TAG", "doOnNext: " + s))
.doAfterNext(s -> Log.e("TAG", "doAfterNext: " + s))
.doOnComplete(() -> Log.e("TAG", "doOnComplete: "))
//订阅之后回调的方法
.doOnSubscribe(disposable -> Log.e("TAG", "doOnSubscribe: "))
.doAfterTerminate(() -> Log.e("TAG", "doAfterTerminate: "))
.doFinally(() -> Log.e("TAG", "doFinally: "))
//Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted
.doOnEach(stringNotification -> Log.e("TAG", "doOnEach: "+(stringNotification.isOnNext()?"onNext":stringNotification.isOnComplete()?"onComplete":"onError")))
//订阅后可以进行取消订阅
.doOnLifecycle(
disposable -> {
Log.e("TAG", "doOnLifecycle: "+disposable.isDisposed());
//disposable.dispose();
}, () -> Log.e("TAG", "doOnLifecycle run: "))
.subscribe(s -> Log.e("TAG", "收到消息: " + s));
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnSubscribe:
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnLifecycle: false
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnNext: 1
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnEach: onNext
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: 收到消息: 1
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doAfterNext: 1
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnNext: 2
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnEach: onNext
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: 收到消息: 2
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doAfterNext: 2
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnComplete:
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doOnEach: onComplete
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doFinally:
05-23 11:04:25.805 11261-11261/com.example.rxjava.rxjavademo E/TAG: doAfterTerminate:
materialize/dematerialize
materialize将Observable转换成一个通知列表,dematerialize作用正好相反
Observable.just(1,2)
.materialize()
.subscribe(notification -> Log.e("TAG", (notification.isOnNext()?"onNext":notification.isOnComplete()?"onComplete":"onError") + " " + notification.getValue()));
//onNext 1
//onNext 2
//onComplete null
subscribeOn/observeOn(线程调度)
subscribeOn,指定被观察者(上游)执行任务的调度器,observeOn指定观察者(下游)的调度器。
- 简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程
- 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略
- 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
Log.e("TAG", "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(integer -> Log.e("TAG", "After observeOn(mainThread) thread is : " + Thread.currentThread().getName())).observeOn(Schedulers.io())
.subscribe(integer -> Log.e("TAG", "After observeOn(io) thread is : " + Thread.currentThread().getName()));
05-23 11:38:05.725 16861-16885/com.example.rxjava.rxjavademo E/TAG: Observable thread is : RxNewThreadScheduler-1
05-23 11:38:05.745 16861-16861/com.example.rxjava.rxjavademo E/TAG: After observeOn(mainThread) thread is : main
05-23 11:38:05.745 16861-16887/com.example.rxjava.rxjavademo E/TAG: After observeOn(io) thread is : RxCachedThreadScheduler-2
RxJava 中,已经内置了很多线程选项供我们选择,如:
- Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
- Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
- Schedulers.newThread() 代表一个常规的新线程;
- AndroidSchedulers.mainThread() 代表Android的主线程
条件和布尔操作符
All
判定被观察者发射的数据是否都满足某个条件
Observable.just(0,2,24)
.all(integer -> integer%2 == 0)//所有发射的数据都是偶数 下游结果才是true
.subscribe(aBoolean -> Log.e("TAG", "" +aBoolean));
//true
Amb
给定多个Observable,只让第一个发射数据的Observable发射全部数据
List list = new ArrayList<ObservableSource>();
Observable<Integer> observable1 = Observable.just(1,3).delay(1, TimeUnit.SECONDS);//延迟1s发射
Observable<Integer> observable2 = Observable.just(2,4);
list.add(observable1);
list.add(observable2);
Observable.amb(list)
.subscribe(integer -> Log.e("TAG", "" +integer));
//2 4
contains
判定一个Observable是否发射一个特定的值
Observable.just(1,22,3)
.contains(2)//发射的数据是否有2
.subscribe(aBoolean -> Log.e("TAG", "" + aBoolean));
//false
SequenceEqual
判定两个Observables是否发射相同的数据序列(相同的数据,相同的顺序,相同的终止状态),和是否延迟无关!
Observable<Integer> observable1 = Observable.just(1,3).delay(1, TimeUnit.SECONDS);//延迟1s发射 1 3
Observable<Integer> observable2 = Observable.just(3).startWith(1);//发射3之前 先发射1
Observable.sequenceEqual(observable1, observable2)
.subscribe(aBoolean -> Log.e("TAG", "" + aBoolean));
//true
SkipUntil:丢弃原被观察者的发射物,直到第二个被观察者发射了一项数据那一刻!
SkipWhile:当你设定的条件为false的时候, 才发射原来的发射物
TakeUntil:第二个被观察者发射了一项数据那一刻,停止原始被观察者的发射物,和SkipUntil相反
TakeWhile:一直发射原始发射物,直到你设定的条件为fasle,和SkipWhile相反
数学和聚合类
concat
合并多个被观察者,前面的被观察者的数据发射完毕,才会发送后面的,区别于merge
Observable<Integer> observable1 = Observable.just(1,3).delay(1, TimeUnit.SECONDS);
Observable<Integer> observable2 = Observable.just(2,4);
Observable.concat(observable1, observable2)
.subscribe(integer -> Log.e("TAG", "" + integer));
//1 3 2 4
count
拦截原始发射事件并返回其数量,将其发射到下游!
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}).count()
.subscribe(integer -> Log.e("TAG", "" + integer));
//2
Reduce
Reduce操作符对原始Observable发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程知道原始Observable发射它的最后一项数据并终止,此时Reduce返回的Observable发射这个函数返回的最终值。
Observable.just(1,2,5)
.reduce((integer, integer2) -> {
Log.e("TAG", integer + " --- " + integer2);
return integer+integer2;
}).subscribe(integer -> Log.e("TAG", "" + integer));
05-23 20:09:58.894 32133-32133/com.example.rxjava.rxjavademo E/TAG: 1 --- 2
05-23 20:09:58.894 32133-32133/com.example.rxjava.rxjavademo E/TAG: 5 --- 3
05-23 20:09:58.894 32133-32133/com.example.rxjava.rxjavademo E/TAG: 结果: 8
链接操作符
可连接的被观察者在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始!
Publish
Publish可将普通的被观察者转为可连接的被观察者。如果一个连接的被观察者已经开始发射数据,再对其进行订阅只能接受之后发射的数据,订阅之前已经发射过的数据就丢失了。
ConnectableObservable<Long> publish = Observable.interval(1000, TimeUnit.MILLISECONDS).take(4).publish();
publish.subscribe(aLong -> Log.e("TAG", "beforeConnect: " + aLong));
publish.connect();//如果不connect 都收不到数据
publish.delaySubscription(2, TimeUnit.SECONDS)//延迟2两秒订阅 前两秒的数据丢失
.subscribe(aLong -> Log.e("TAG", "**afterConnect: " + aLong));
05-24 23:36:05.294 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 0
05-24 23:36:06.294 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 1
05-24 23:36:07.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 2
05-24 23:36:07.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: **afterConnect: 2
05-24 23:36:08.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: beforeConnect: 3
05-24 23:36:08.304 17077-17123/com.example.rxjava.rxjavademo E/TAG: **afterConnect: 3
Replay
通过上面的介绍我们了解到,ConnectableObservable和普通的被观察者最大的区别就是,调用Connect操作符开始发射数据,后面的订阅者会丢失之前发射过的数据。
Replay返回的ConnectableObservable 会缓存之前已经发射的数据,这样即使有订阅者在其发射数据开始之后进行订阅也能收到之前发射过的数据。Replay操作符能指定缓存的大小或者时间,这样能避免耗费太多内存。
//没有缓存的情况
ConnectableObservable<Long> publish = Observable.interval(1000, TimeUnit.MILLISECONDS).take(4).publish();
publish.connect();
publish.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
// 2 3 丢失前两秒的数据
//缓存所有数据
ConnectableObservable<Long> replay1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(4).replay();
replay1.connect();
replay1.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
//0 1 2 3 没有数据丢失
//缓存一个数据
ConnectableObservable<Long> replay2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(4).replay(1);
replay2.connect();
replay2.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
//1 2 3 缓存了订阅前的一个数据
//缓存一个数据
ConnectableObservable<Long> replay3 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(4).replay(1, TimeUnit.SECONDS);
replay3.connect();
replay3.delaySubscription(2, TimeUnit.SECONDS)
.subscribe(aLong -> Log.e("TAG", "" + aLong));
//1 2 3 缓存了订阅前1s的数据
转换类To系列
toList
收集原始被观察者发射的所有数据到一个列表,然后返回这个列表.!
Observable.just(1, 4, 3)
.toList().subscribe(list -> {
Log.e("TAG", list.toString());
});//[1, 4, 3]
toSortedList
收集原始被观察者发射的所有数据到一个有序列表,然后返回这个列表!
Observable.just(1,4,3)
.toSortedList()//默认升序
.subscribe(integers -> Log.e("TAG", integers.toString()));
// [1, 3, 4]
Observable.just(1,4,3, 4)
.toSortedList(new Comparator<Integer>() {
@Override
public int compare(Integer integer, Integer t1) {
//return 0或正整数 按原有顺序返回 [1, 4, 3, 4]
//return 负数 倒序(不是降序) [4, 3, 4, 1]
//return t1- integer 降序 [4, 4, 3, 1]
return integer - t1;//升序 [1, 3, 4, 4]
}
})
.subscribe(integers -> Log.e("TAG", integers.toString()));
toMap
将序列数据转换为一个Map。我们可以根据数据项生成key和生成value!
Observable.just("b","a","c")
//一个参数的用于生成key,value默认是事件数据
//第一个泛型是事件数据类型,第二个泛型是key的类型
.toMap(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s + "-key: ";
}
})//这部分用箭头函数改写:.toMap(s -> s + "-key: ")
.subscribe(new Consumer<Map<String, String>>() {
@Override
public void accept(Map<String, String> stringStringMap) throws Exception {
Log.e("TAG", stringStringMap.toString());
}
});
//{c-key: =c, a-key: =a, b-key: =b}
Observable.just(1,2,3)
//两个参数的 第一个生成key 第二个生成vlaue
.toMap(i -> i + 10, i -> i + "*")
.subscribe(integerIntegerMap -> Log.e("TAG", integerIntegerMap.toString()));
//{12=2*, 11=1*, 13=3*}
还有一些比如blockingIterable(),将原始被观察者发射的数据装进一个迭代器Iterable,可以使用forEach等操作!