上一篇是对RxJava基本概念的介绍RxJava2.x学习教程(一)基本概念,本文主要介绍常用操作符!操作符官方doc,里面有对各类操作符的讲解!
创建类的操作符
Create
最常用的操作符,用于创建一个具有发射事件能力的被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("TAG", s);
}
});
java8箭头函数版:
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}).subscribe(s ->
Log.e("TAG", s)
);
05-19 15:25:11.290 7170-7170/com.example.rxjava.rxjavademo E/TAG: Hello
05-19 15:25:11.290 7170-7170/com.example.rxjava.rxjavademo E/TAG: World
just
just:只是简单的原样发射,可将数组或Iterable当做单个数据。它接受一至九个参数:
String[] arr = new String[]{"ss1", "ss2", "ss3"};
List<String> stringList = Arrays.asList(arr);
Observable.just(arr).subscribe(new Consumer<String[]>() {
@Override
public void accept(String[] s) throws Exception {
Log.e("TAG", "" + s.length);
}
});
Observable.just("ss1", "ss2", "ss3").subscribe(s -> Log.e("TAG", s));
Observable.just(stringList).subscribe(s -> Log.e("TAG", "" + s.size()));
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: 3
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: ss1
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: ss2
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: ss3
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: 3
from系列
from系列可以将数组或Iterable的元素拿出来,做成多个事件进行发射
String[] arr = new String[]{"ss1", "ss2", "ss3"};
List<String> stringList = Arrays.asList(arr);
Observable.fromArray(arr).subscribe(s -> Log.e("TAG", "array: " + s));
Observable.fromIterable(stringList).subscribe(s -> Log.e("TAG", "list: " + s));
// fromCallable接收一个函数,并将该函的返回值装入事件进行发射
Observable.fromCallable(() -> Arrays.asList(arr)).subscribe(s -> {
Log.e("TAG", "" + s.size());
});
05-19 15:47:47.870 8647-8647/com.example.rxjava.rxjavademo E/TAG: array: ss1
05-19 15:47:47.870 8647-8647/com.example.rxjava.rxjavademo E/TAG: array: ss2
05-19 15:47:47.870 8647-8647/com.example.rxjava.rxjavademo E/TAG: array: ss3
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: list: ss1
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: list: ss2
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: list: ss3
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: 3
查看源码可以知道Observable.just("ss1", "ss2", "ss3")...
底层调用的就是Observable.fromArray...
Interval
//每隔500ms 在io线程发射一次事件
Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("TAG", "" + aLong);
}
});
05-22 11:26:57.209 1174-1227/com.example.rxjava.rxjavademo E/TAG: 0
05-22 11:26:57.719 1174-1227/com.example.rxjava.rxjavademo E/TAG: 1
05-22 11:26:58.209 1174-1227/com.example.rxjava.rxjavademo E/TAG: 2
...
除了以上,还有一些别的可以创建被观察者的操作符,如:
empty:创建一个不发射任何数据但是正常终止的被观察者
never:创建一个不发射数据也不终止的被观察者
error:创建一个不发射数据以一个错误终止的被观察者
timer 在延迟一段给定的时间后发射一个数字0
Observable.timer(3000, TimeUnit.MILLISECONDS).subscribe(aLong -> Log.e("TAG", "" + aLong));//0
range 其创建的被观察者 可以发射一个指定范围的整数序列
Observable.range(10, 3).subscribe(s -> Log.e("TAG", "" + s));//10 11 12
...
转化类操作符
转化类操作符,可以将一个被观察者通过某种方法,转化为另外的被观察者!
Map
map的作用是将每一个发射的事件,应用于一个函数,从而可以对事件做相应处理,事件数量不会变化:
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}).map(integer -> {
//将数字减一 ,并转化为字符串
return "this integer = " + --integer;
}).subscribe(s -> {
//接收map处理后的事件
Log.e("TAG", "" + s);
});
05-19 16:25:03.220 10865-10865/com.example.rxjava.rxjavademo E/TAG: this integer = 0
05-19 16:25:03.220 10865-10865/com.example.rxjava.rxjavademo E/TAG: this integer = 1
05-19 16:25:03.220 10865-10865/com.example.rxjava.rxjavademo E/TAG: this integer = 2
实质上,map就是将一个被观察者,通过某种函数关系,转化为另一个被观察者!
FlatMap
flatMap将一个被观察者转化为多个被观察者,然后再将他们合并到一个被观察者中,比较拗口,看例子:
Observable.just(1,2,3)//这里创建一个被观察者 发射多个事件
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
这里,每进来一个事件,就创建返回一个新的被观察者,并发射自己的事件
Log.e("TAG", "apply: " + integer);
return Observable.just(integer + "---", "item2---");
}
})//最后上面产生的多个被观察者都被合并成一个,将他们的事件发送到下游观察者
.subscribe(s -> Log.e("TAG","event: " + s));
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: apply: 1
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: 1---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: item2---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: apply: 2
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: 2---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: item2---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: apply: 3
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: 3---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: item2---
flatMap转化后 并不能保证事件的顺序。如果需要保证顺序,可以使用ConcatMap。
ConcatMap
concatMap和flatMap的区别在于 它能保证顺序
Observable.fromArray(1,10,20)
.concatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
return Observable.range(integer, 2);
}
}).subscribe(e -> Log.e("TAG", "" + e));
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 1
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 2
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 10
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 11
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 20
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 21
buffer系列
buffer的作用是定时或定量缓存打包事件,将其发射到下游,而不是一次一个发射!
count变体
//buffer(count,skip) 第一个参数count代表每次缓存的数量,skip表示下一次缓存的间隔,
//如skip是1 表示下一次从2开始缓存,可以不写默认skip等于count
Observable.just(1,2,3,4,5)
.buffer(3)
.subscribe(integers -> {
Log.e("TAG", "integers");
for (Integer integer : integers) {
Log.e("TAG", "i = " + integer);
}
});
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 1
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 2
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 3
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 4
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 5
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 2)
.subscribe(integers -> {
Log.e("TAG", "integers");
for (Integer integer : integers) {
Log.e("TAG", "i = " + integer);
}
});
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 1
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 2
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 3
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 3
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 4
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 5
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 5
time变体
//interval表示每隔500ms发射一个事件
Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.io())
.take(10)//表示只取前面10个事件
.buffer(1, TimeUnit.SECONDS, 2)//每隔1s 打包一次事件将其发射到下游,count是每次打包的最大数量
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
for (Long aLong : longs) {
Log.e("TAG", "" + aLong);
}
}
});
过滤类操作符
distinct
去掉重复的事件
Observable.just(1,1,2,1,3,2)
.distinct()
.subscribe(integer -> Log.e("TAG", "" + integer));
05-19 18:26:17.620 29437-29437/com.example.rxjava.rxjavademo E/TAG: 1
05-19 18:26:17.620 29437-29437/com.example.rxjava.rxjavademo E/TAG: 2
05-19 18:26:17.620 29437-29437/com.example.rxjava.rxjavademo E/TAG: 3
Filter
接收一个参数,过滤掉不符合条件的
Observable.just(1,1,2,1,3,2)
.filter(integer -> integer == 2)
.subscribe(integer -> Log.e("TAG", "" + integer));
05-19 18:30:02.920 30098-30098/com.example.rxjava.rxjavademo E/TAG: 2
05-19 18:30:02.920 30098-30098/com.example.rxjava.rxjavademo E/TAG: 2
elementAt
只返回指定下标的事件,如果没有这一项,就返回默认值
Observable.just(1,2,3)
.elementAt(5, 0)//只发射第6个数据, 若找不到就发射默认值0
.subscribe(integer -> Log.e("TAG", "" + integer));//0
Observable.just(1,2,3)
.elementAt(2)//只发射第三个数据,若找不到就不发射
.subscribe(integer -> Log.e("TAG", "" + integer));//3
skip系列
变体count系列,丢弃被观察者发射的前N项数据
Observable.range(0, 5)
.skip(3)//丢弃前3个数据
.subscribe(r -> Log.e("TAG", "" + r));
05-19 18:18:51.170 28109-28109/com.example.rxjava.rxjavademo E/TAG: 3
05-19 18:18:51.170 28109-28109/com.example.rxjava.rxjavademo E/TAG: 4
变体time系列, 丢弃Observable开始的那段时间发射 的数据:
//interval 表示在io线程 每隔500ms 发射一个事件
Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.io())
.skip(3000, TimeUnit.MILLISECONDS)//丢弃前3000ms发射的数据
.subscribe(aLong -> Log.e("TAG", "" +aLong));
05-19 22:57:01.959 5268-5287/com.example.hp.test1 E/tag: 5
05-19 22:57:02.459 5268-5287/com.example.hp.test1 E/tag: 6
05-19 22:57:02.958 5268-5287/com.example.hp.test1 E/tag: 7
05-19 22:57:03.459 5268-5287/com.example.hp.test1 E/tag: 8
...
还有skipLast,take系列等用法相似。
合并类操作符
Zip
使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合!
Observable<Integer> observable1=Observable.just(1,2,3,4);
Observable<Integer> observable2=Observable.just(1,2);
Observable<String> observable3=Observable.just("s1", "s2", "s3");
Observable.zip(observable1, observable2, observable3, new Function3<Integer, Integer, String, String>() {
@Override
public String apply(Integer integer, Integer integer2, String s) throws Exception {
return integer - integer + s;
}
}).subscribe(s -> Log.e("TAG", s));
05-22 11:44:16.635 4269-4269/? E/TAG: 0s1
05-22 11:44:16.635 4269-4269/? E/TAG: 0s2
merge/mergeWith
Observable<Integer> observable1=Observable.just(1,2);
Observable<Integer> observable2=Observable.just(5);
//下面两中用法等价
Observable.merge(observable1, observable2)
.subscribe(integer -> Log.e("TAG", "" + integer));
Log.e("TAG", "------------------");
observable1.mergeWith(observable2)
.subscribe(integer -> Log.e("TAG", "" + integer));
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 1
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 2
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 5
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: ------------------
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 1
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 2
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 5
startWith/startWithArray
在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列
Observable.just("data1")
.startWith("data2")
.startWithArray("data3", "data4")
.subscribe(s -> Log.e("TAG", s));
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data3
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data4
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data2
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data1
限于篇幅过长,下一篇文章RxJava2.x学习教程(三)常用操作符,会接着介绍剩下的操作符!