文章目录
简介
-
RxJava 是一个 基于事件流、实现异步操作的库
-
RxJava 和 RxAndroid 的关系:RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发
安装:
implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
- 初始化
Observable
- 初始化
Observer
- 建立订阅关系
subscribe
操作符
create
create
操作符应该是最常见的操作符了,主要产生一个Observable
发射器(被观察者对象)和一个Observer
接收器(观察者对象),通过subscribe
建立连接Disposable
可用于切断操作,让Observer
不在接收上游事件- 在发射事件中执行
emitter.onComplete()
,Observable
表示发射完成,Observer
不在接收后续事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "emitter: 1");
emitter.onNext(1);
Log.e(TAG, "emitter: 2");
emitter.onNext(2);
Log.e(TAG, "emitter: 3");
emitter.onNext(3);
Log.e(TAG, "emitter: onComplete");
emitter.onComplete();
Log.e(TAG, "emitter: 4");
emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext:" + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
输出信息:
emitter: 1
onNext:1
emitter: 2
onNext:2
emitter: 3
onNext:3
emitter: onComplete
onComplete
emitter: 4
map
Map
的作用是对Observable
发射的每一个事件按照指定的函数去变化
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "convert " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:" + s);
}
});
输出信息:
accept:convert 1
accept:convert 2
accept:convert 3
zip
zip
专用于合并事件,也就是是两两配对,也就意味着,最终配对出的 Observable
发射事件数目只和少的那个相同
Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer) throws Exception {
return s + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:" + s);
}
});
private Observable<String> getStringObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.e(TAG, "emitter String:A");
emitter.onNext("A");
Log.e(TAG, "emitter String:B");
emitter.onNext("B");
Log.e(TAG, "emitter String:C");
emitter.onNext("C");
}
});
}
private Observable<Integer> getIntegerObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "emitter Integer:1");
emitter.onNext(1);
Log.e(TAG, "emitter Integer:2");
emitter.onNext(2);
Log.e(TAG, "emitter Integer:3");
emitter.onNext(3);
Log.e(TAG, "emitter Integer:4");
emitter.onNext(4);
}
});
}
输出信息:
emitter String:A
emitter String:B
emitter String:C
emitter Integer:1
accept:A1
emitter Integer:2
accept:B2
emitter Integer:3
accept:C3
emitter Integer:4
concat
concat
把两个发射器连接成一个发射器
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept :" + integer);
}
});
输出信息:
accept :1
accept :2
accept :3
accept :4
accept :5
accept :6
flatMap
FlatMap
可以把一个发射器Observable
转换为多个Observables
,然后再把这些分散的Observables
装进一个单一的发射器Observable
flatMap
并不能保证事件的顺序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "flatMap : accept : " + s + "\n");
}
});
输出信息:
flatMap : accept : I am value 1
flatMap : accept : I am value 1
flatMap : accept : I am value 1
flatMap : accept : I am value 2
flatMap : accept : I am value 2
flatMap : accept : I am value 2
flatMap : accept : I am value 3
flatMap : accept : I am value 3
flatMap : accept : I am value 3
concatMap
concatMap
与 FlatMap
的唯一区别就是 concatMap
保证了顺序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
int delayTime = (int) (1 + Math.random() * 10);
return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:" + s);
}
});
输出信息:
accept:I am value 1
accept:I am value 1
accept:I am value 1
accept:I am value 2
accept:I am value 2
accept:I am value 2
accept:I am value 3
accept:I am value 3
accept:I am value 3
distinct
作用是去重
Observable.just(1, 1, 2, 2, 3, 4)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG", "accept:" + integer);
}
});
输出信息:
accept:1
accept:2
accept:3
accept:4
filter
过滤掉不符合条件的数据
Observable.just(10, 20, 30, 40, 50)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 30;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:30
accept:40
accept:50
buffer
buffer
接收两个参数:count
表示数量,skip
表示步长
我们依次发射1 2 3 4 5
数据,所以依次接收123,345,5
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.e(TAG, "buffer size :" + integers.size());
for (Integer i : integers) {
Log.e(TAG, "accept:" + i);
}
}
});
输出信息:
buffer size :3
accept:1
accept:2
accept:3
buffer size :3
accept:3
accept:4
accept:5
buffer size :1
accept:5
timer
设置定时任务
Log.e(TAG, "timer start:" + getNowStrTime());
Observable.timer(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "timer end:" + getNowStrTime());
}
});
输出信息:
timer start:2018-09-23 18:14:01
timer end:2018-09-23 18:14:03
interval
interval
用于间隔时间执行,initialDelay
表示第一次执行延迟时间,period
表示间隔执行时间
Log.e(TAG, "timer start:" + getNowStrTime());
Observable.interval(3, 2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept:" + getNowStrTime());
}
});
输出信息:
timer start:2018-09-23 18:15:42
accept:2018-09-23 18:15:45
accept:2018-09-23 18:15:47
accept:2018-09-23 18:15:49
accept:2018-09-23 18:15:51
accept:2018-09-23 18:15:53
accept:2018-09-23 18:15:55
...
doOnNext
作用是让订阅者在接收数据之前做一些操作
Observable.just(1, 2, 3, 4)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "doOnNext:" + integer);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
doOnNext:1
accept:1
doOnNext:2
accept:2
doOnNext:3
accept:3
doOnNext:4
accept:4
skip
跳过 count 个数目开始接收
Observable.just(1, 2, 3, 4, 5)
.skip(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:3
accept:4
accept:5
take
表示至多接收 count 个数据
Observable.just(1, 2, 3, 4, 5)
.take(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:1
accept:2
Single
Single
只会接收一个参数,而 SingleObserver
只会调用 onError()
或者 onSuccess()
Single.just(new Random().nextInt())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
Log.e(TAG, "onSuccess:" + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError:" + e.getMessage());
}
});
输出信息:
onSuccess:-788710247
debounce
去除发送频率过快的数据
// 去除发送间隔时间小于 500 毫秒的发射事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Thread.sleep(400);
emitter.onNext(2);
Thread.sleep(500);
emitter.onNext(3);
Thread.sleep(100);
emitter.onNext(4);
Thread.sleep(700);
emitter.onNext(5);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:2
accept:4
accept:5
defer
每次订阅都会创建一个新的 Observable
,并且如果没有被订阅,就不会产生新的 Observable
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1, 2, 3);
}
});
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe( Disposable d) {
}
@Override
public void onNext( Integer integer) {
Log.e(TAG, "defer : " + integer );
}
@Override
public void onError( Throwable e) {
Log.e(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
输出信息:
defer : 1
defer : 2
defer : 3
onComplete
last
last
操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项
Observable.just(1, 2, 3)
.last(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:3
merge
merge
的作用是把多个 Observable
结合起来,接受可变参数,也支持迭代器集合。注意它和 concat
的区别在于,不用等到发射器 A 发送完所有的事件再进行发射器 B 的发送
Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:1
accept:2
accept:3
accept:4
accept:5
reduce
reduce
操作符每次用一个方法处理一个值,可以有一个 seed
作为初始值
Observable.just(1, 2, 3)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:6
scan
scan
和reduce
基本一致,区别在于scan
输出每一个步骤
Observable.just(1, 2, 3)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出信息:
accept:1
accept:3
accept:6
window
按照实际划分窗口,将数据发送给不同的 Observable
// 每3秒划分一个Observable
Observable.interval(1, TimeUnit.SECONDS)
.take(15)
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> longObservable) throws Exception {
Log.e(TAG, "---");
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept:" + aLong);
}
});
}
});
输出信息:
---
accept:0
accept:1
---
accept:2
accept:3
accept:4
---
accept:5
accept:6
accept:7
---
accept:8
accept:9
accept:10
---
accept:11
accept:12
accept:13
---
accept:14
线程调度
subScribeOn
subscribeOn
用于指定 subscribe()
时所发生的线程
observeOn
observeOn
方法用于指定下游 Observer
回调发生的线程
线程切换
- 简单的
subscribeOn
指定的是发射事件的线程,observeOn
指定订阅者接收事件的线程 - 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用
subscribeOn()
只有第一次的有效,其余的会被忽略 - 但多次指定订阅者接收线程是可以的,也就是说每调用一次
observerOn()
,下游的线程就会切换一次
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
Demo:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
输出信息:
Observable thread is : RxNewThreadScheduler-1
After observeOn(mainThread),Current thread is main
After observeOn(io),Current thread is RxCachedThreadScheduler-2
说明:
该代码中,subscribeOn
分别用 Schedulers.newThread()
和 Schedulers.io()
对发射线程进行切换,但实际只响应Schedulers.newThread()
,observeOn
每调度一次,线程便会切换一次
RxJava内置线程选项
-
Schedulers.io()
代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 -
Schedulers.computation()
代表CPU计算密集型的操作, 例如需要大量计算的操作 -
Schedulers.newThread()
代表一个常规的新线程 -
AndroidSchedulers.mainThread()
代表Android的主线程