compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
最简单的使用,类似于AsyncTask
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.e(TAG, "onNext: " + value );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" );
}
});
Disposable :一次性的意思,dispose()方法被调用后,类似于断开连接,接收方不再接收事件,但是发送方可以继续
ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
注意事项:
1.可以发送无限个onNext,就可接收无限个onNext。
2.发送onComplete后,可以继续发送onNext,但不会接收到事件了。
3.onError同上
4.可以不发送onComplete、onError
5.onComplete、onError唯一且互斥,不能多次发送onComplete、onError,也不可发送onComplete后再发送onError,也不可发送onError后再发送onComplete
注意事项:
1.可以发送无限个onNext,就可接收无限个onNext。
2.发送onComplete后,可以继续发送onNext,但不会接收到事件了。
3.onError同上
4.可以不发送onComplete、onError
5.onComplete、onError唯一且互斥,不能多次发送onComplete、onError,也不可发送onComplete后再发送onError,也不可发送onError后再发送onComplete
另一种常用写法:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onNext("3");
e.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
Consumer只关注onNext事件,其他的事件都会略过。
发送事件和接收事件默认在同一线程内,android中若都在主线程中,有耗时操作可能导致ANR,在非UI线程中更新UI会奔溃,所以引入subscribeOn、observeOn方法。
subscribeOn:指定发送事件所在线程
observeOn:指定接收事件所在线程
常用参数:
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
例子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onNext("2");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});