简介
RxJava其实就是提供一套异步编程的API,这套API是基于观察者模式的,而且是链式调用的,所以使用RxJava编写的代码的逻辑会非常简单。
RxJava有以下三个基本的元素:
1. 被观察者(Observable)
2. 观察者(Observer)
3. 订阅(subscribe)
它的gradle配置:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
我们可以用两根水管替代观察者和被观察者:
上面的一根水管为事件产生的水管,叫它上游
,下面一根水管为事件接收的水管,叫它下游
。
两根水管通过一定的方式连接起来,使得上游每产生一个事件,下游就能收到该事件。
这里的上游
和下游
就分别对应RxJava中的Observable
和Observer
,它们之间的连接就是对应着subscribe()
,因此这个关系用RxJava来表示就是:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
observable.subscribe(observer);
运行的结果:
08-27 18:01:18.505 9011-9011/com.example.asus1.learnrxjava D/Rxjava:
onSubscribe:
onNext: 1
onNext: 2
onNext: 3
onComplete:
把它改成链式就是:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
ObservableEmitter:Emitter是发射器的意思,那个ObservableEimtter就是来发出事件的,它可以发出三种类型的事件:通过调用emitter的onNext(T value)
、onComplete()
和onError(Throwable error)
就可以分别发出next事件、complete事件和error事件。
注意,不能随意的发射事件,要遵循一定的规律:
- 上游可以发送无限个oNext,下游也可以接收无限个onNext
- 当上游发送了一个onComplete后,上游onComplete之后的事件将会继续发送,而下游收到onComplete事件之后将不再继续接收事件
- 当上游按发送了一个onError后,上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
- 上游可以不发送onComplete或onError.
- 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
Dispoable:这个单词的字面意思是一次性用品,用完即可丢弃,在RxJava中,我们可以把它理解为两根管道之间的一个机关,当调用它的dispose()
方法时,它就会将两根管道切断,而导致下游收不到事件。
注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.
例如:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "emitter: 1");
e.onNext(1);
Log.d(TAG, "emitter: 2");
e.onNext(2);
Log.d(TAG, "emitter: 3");
e.onNext(3);
Log.d(TAG, "emitter: complete");
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: "+value);
if(value == 2){
Log.d(TAG, "onNext: dispose");
disposable.dispose();
Log.d(TAG, "onNext: "+disposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果为:
08-27 18:17:37.256 12507-12507/? D/Rxjava:
emitter: 1
onNext: 1
emitter: 2
onNext: 2
onNext: dispose
onNext: true
emitter: 3
emitter: complete
我们可以看到当value == 2时,切断了联系,但是上游还在发,但是下游不再接收了
另外,subscribe()
有多个重载的方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
最后一个带有Observer参数的我们已经使用过了,这里对其他几个方法进行说明.
- 不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
- 带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
在上面的代码里面,不管是上游还是下游,都是默认在同一个线程中执行的,对于Android来说,如果没有特别启用一个线程,就是在主线程中。
那么我们想让改变上游发送事件的线程,让它在子线程中发射,然后改变下游的线程,让它在主线程中接收事件,怎么做呢?
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is : " + Thread.currentThread().getName());
Log.d(TAG, "accept: "+integer);
}
});
结果:
D/Rxjava: Observable thread is : RxCachedThreadScheduler-1
emit 1
D/Rxjava: Observer thread is : main
accept: 1
比起之前,我们只是增加了两行代码:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribeOn()
指定的是上游发送事件的线程,
observeOn()
指定的是下游接收事件的线程.
多次指定上游的线程只有第一次指定的有效,也就是说多次调用subscribeOn()
只有第一次的有效,其余的会被忽略
多次指定下游的线程是可以的, 也就是说每调用一次observeOn()
, 下游的线程就会切换一次.
在RxJava中, 已经内置了很多线程选项供我们选择, 例如有
- Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
- Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
- Schedulers.newThread() 代表一个常规的新线程
- AndroidSchedulers.mainThread() 代表Android的主线程
操作符
create()
创建一个被观察者
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
用法上面都有了,就不演示了
just()
创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
用法
Observable.just(1,2,3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
fromArray
和just方法类似,只不过fromArray可以传入多于10个的变量,并且可以传入一个数组。
public static <T> Observable<T> fromArray(T... items)
用法
Integer[] arrays = {1,2,3,4};
Observable.fromArray(arrays)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
fromCallable
这里的Callable是java.util.concurrent中的Callable,Callable和Runnable的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
用法
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
fromFuture
参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。
public static <T> Observable<T> fromFuture(Future<? extends T> future)
用法
FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
@Override
public String call() throws Exception {
Log.d(TAG, "CallableDemo is Running");
return "返回结果";
}
});
Observable.fromFuture(futureTask)
.doOnSubscribe(new Consumer < Disposable > () {
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();
}
})
.subscribe(new Consumer < String > () {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "================accept " + s);
}
});
doOnSubscribe() 的作用就是只有订阅时才会发送事件
fromIterable
直接发送一个List集合数据给观察者
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
用法
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: "+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
defer
直到被观察者被订阅后才会创建被创查者
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
用法
// i 要定义为成员变量
Integer i = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
i = 200;
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
i = 300;
observable.subscribe(observer);
05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
================onNext 300
因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。
timer
当倒指定时间后就会发送一个0L的值给观察者
public static Observable<Long> timer(long delay, TimeUnit unit)
用法
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "===============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
interval
每隔一段时间就会发送一个事件,这个事件时从0开始的,不断增加1的数字
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......
用法
Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
intervalRange
可以指定发送事件的开始值和数量,其他与interval()的功能一样
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
用法
Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Range
同时发送一定范围的事件序列
public static Observable<Integer> range(final int start, final int count)
用法
Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==============onSubscribe ");
}
@Override
public void onNext(Integer aLong) {
Log.d(TAG, "==============onNext " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
rangeLong
作用与 range() 一样,只是数据类型为 Long
public static Observable<Long> rangeLong(long start, long count)
用法
用法与 range() 一样,这里就不再赘述了。
empty,never,error
empty() : 直接发送 onComplete() 事件
never():不发送任何事件
error():发送 onError() 事件
public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)
用法
Observable.empty()
.subscribe(new Observer < Object > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "==================onNext");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete");
}
});
打印结果:
D/chan: ==================onSubscribe
==================onComplete
换成 never() 的打印结果:
D/chan: ==================onSubscribe
换成 error() 的打印结果:
D/chan: ==================onSubscribe
==================onError java.lang.NullPointerException
转换符
map()
可以将被观察者发送的数据类型转变成其他的类型
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
用法
Observable.just(1,2,3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "I am "+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: "+s);
}
});
flatMap
可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
用法
flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。
上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面. 如果需要保证顺序则需要使用concatMap.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: "+s);
}
});
结果:
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 1
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 3
D/TAG: I am value 2
D/TAG: I am value 2
D/TAG: I am value 2
buffer
从需要发送的事件的当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出
public final Observable<List<T>> buffer(int count, int skip)
用法
buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。这样说可能还是有点抽象,直接看代码:
Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer < List < Integer >> () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List < Integer > integers) {
Log.d(TAG, "================缓冲区大小: " + integers.size());
for (Integer i: integers) {
Log.d(TAG, "================元素: " + i);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
结果:
05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================缓冲区大小: 2
================元素: 1
================元素: 2
================缓冲区大小: 2
================元素: 2
================元素: 3
================缓冲区大小: 2
================元素: 3
================元素: 4
================缓冲区大小: 2
================元素: 4
================元素: 5
================缓冲区大小: 1
================元素: 5
从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。
groupBy
将发送的数据进行分组,每个分组都会返回一个被观察者
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
用法
Observable.just(5,2,3,4,1,6,8,9,7,10)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer%3;
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(final GroupedObservable<Integer, Integer>
integerIntegerGroupedObservable) throws Exception {
integerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integerIntegerGroupedObservable.getKey()+
" accept: "+integer);
}
});
}
});
结果:
08-27 19:54:09.605 4811-4811/com.example.asus1.learnrxjava D/Rxjava: 2 accept: 5
2 accept: 2
0 accept: 3
1 accept: 4
1 accept: 1
08-27 19:54:09.605 4811-4811/com.example.asus1.learnrxjava D/Rxjava: 0 accept: 6
08-27 19:54:09.606 4811-4811/com.example.asus1.learnrxjava D/Rxjava: 2 accept: 8
0 accept: 9
1 accept: 7
1 accept: 10
在 groupBy() 方法返回的参数是分组的名字,每返回一个值,那就代表会创建一个组,以上的代码就是将1~10的数据分成3组,组名分别为0,1,2
scan
将数据以一定的逻辑聚合起来
用法
Observable.just(1,2,3,4,5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "apply: "+integer+"----"+integer2);
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
结果:
08-27 19:59:53.358 6619-6619/? D/Rxjava: accept: 1
apply: 1----2
accept: 3
apply: 3----3
accept: 6
apply: 6----4
accept: 10
apply: 10----5
accept: 15
window
发送指定数量的事件时,就将这些事件分为一组,window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。
public final Observable<Observable<T>> window(long count)
用法
Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=====================onSubscribe ");
}
@Override
public void onNext(Observable < Integer > integerObservable) {
integerObservable.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "=====================integerObservable onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "=====================integerObservable onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=====================integerObservable onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=====================integerObservable onComplete ");
}
});
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "=====================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "=====================onComplete ");
}
});
结果
com.example.rxjavademo D/chan: =====================onSubscribe
com.example.rxjavademo D/chan: =====================integerObservable onSubscribe
com.example.rxjavademo D/chan: =====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete
=====================integerObservable onSubscribe
=====================integerObservable onNext 5
=====================integerObservable onComplete
=====================onComplete
从结果可以发现,window() 将 1~5 的事件分成了3组。
retry
如果出现错误事件,则会重新发送所有事件序列。times是代表重新发的次数。
public final Observable<T> retry(long times)
用法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retry(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
retryUntil
出现错误事件之后,可以通过此方法判断是否继续发送事件
public final Observable<T> retryUntil(final BooleanSupplier stop)
用法
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
if (i == 6) {
return true;
}
return false;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
过滤操作符
filter
通过一定逻辑来过滤被观察者发送的事件,如果返回ture则会发送事情,否则不会发送。
用法
Observable.just(1, 2, 3)
.filter(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 2;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
ofType
可以过滤不符合该类型的事件
用法
Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
sikp
跳过正序某些事件,count代表跳过事件的数量
用法
Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
结果:
D/chan: ==================onSubscribe
D/chan: ==================onNext 3
==================onComplete
skipLast() 作用也是跳过某些事件,不过它是用来跳过正序的后面的事件
distinct
过滤事件序列中的重复事件
用法
Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
distinctUntilChanged
过滤掉连续重复的事件
Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
take
控制观察者接收的事件的数量
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "==================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "==================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "==================onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete ");
}
});
结果只会接收1,2,3
takeLast() 的作用就是控制观察者只能接受事件序列的后面几件事情