RxJava2.x学习教程(二)常用操作符

上一篇是对RxJava基本概念的介绍RxJava2.x学习教程(一)基本概念,本文主要介绍常用操作符!操作符官方doc,里面有对各类操作符的讲解!

创建类的操作符

这里写图片描述
Create
最常用的操作符,用于创建一个具有发射事件能力的被观察者

   Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("World");
                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("TAG", s);
            }
        });

java8箭头函数版:

   Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("Hello");
            emitter.onNext("World");
            emitter.onComplete();
        }).subscribe(s ->
            Log.e("TAG", s)
        );
05-19 15:25:11.290 7170-7170/com.example.rxjava.rxjavademo E/TAG: Hello
05-19 15:25:11.290 7170-7170/com.example.rxjava.rxjavademo E/TAG: World

just
just:只是简单的原样发射,可将数组或Iterable当做单个数据。它接受一至九个参数:

  String[] arr = new String[]{"ss1", "ss2", "ss3"};
  List<String> stringList = Arrays.asList(arr);

  Observable.just(arr).subscribe(new Consumer<String[]>() {
       @Override
       public void accept(String[] s) throws Exception {
           Log.e("TAG", "" + s.length);
       }
   });

  Observable.just("ss1", "ss2", "ss3").subscribe(s -> Log.e("TAG", s));

  Observable.just(stringList).subscribe(s -> Log.e("TAG", "" + s.size()));
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: 3
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: ss1
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: ss2
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: ss3
05-19 15:40:31.740 8273-8273/com.example.rxjava.rxjavademo E/TAG: 3

from系列
from系列可以将数组或Iterable的元素拿出来,做成多个事件进行发射

 String[] arr = new String[]{"ss1", "ss2", "ss3"};
 List<String> stringList = Arrays.asList(arr);

 Observable.fromArray(arr).subscribe(s -> Log.e("TAG", "array: " + s));

 Observable.fromIterable(stringList).subscribe(s -> Log.e("TAG", "list: " + s));

 // fromCallable接收一个函数,并将该函的返回值装入事件进行发射   
 Observable.fromCallable(() -> Arrays.asList(arr)).subscribe(s -> {
     Log.e("TAG", "" + s.size());
 });
05-19 15:47:47.870 8647-8647/com.example.rxjava.rxjavademo E/TAG: array: ss1
05-19 15:47:47.870 8647-8647/com.example.rxjava.rxjavademo E/TAG: array: ss2
05-19 15:47:47.870 8647-8647/com.example.rxjava.rxjavademo E/TAG: array: ss3
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: list: ss1
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: list: ss2
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: list: ss3
05-19 15:47:47.880 8647-8647/com.example.rxjava.rxjavademo E/TAG: 3

查看源码可以知道Observable.just("ss1", "ss2", "ss3")... 底层调用的就是Observable.fromArray...

Interval

//每隔500ms 在io线程发射一次事件
 Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.io())
          .subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  Log.e("TAG", "" + aLong);
              }
          });
05-22 11:26:57.209 1174-1227/com.example.rxjava.rxjavademo E/TAG: 0
05-22 11:26:57.719 1174-1227/com.example.rxjava.rxjavademo E/TAG: 1
05-22 11:26:58.209 1174-1227/com.example.rxjava.rxjavademo E/TAG: 2
...

除了以上,还有一些别的可以创建被观察者的操作符,如:

empty:创建一个不发射任何数据但是正常终止的被观察者
never:创建一个不发射数据也不终止的被观察者
error:创建一个不发射数据以一个错误终止的被观察者
timer 在延迟一段给定的时间后发射一个数字0
Observable.timer(3000, TimeUnit.MILLISECONDS).subscribe(aLong -> Log.e("TAG", "" + aLong));//0
range 其创建的被观察者 可以发射一个指定范围的整数序列
Observable.range(10, 3).subscribe(s -> Log.e("TAG", "" + s));//10  11  12
...

转化类操作符

转化类操作符,可以将一个被观察者通过某种方法,转化为另外的被观察者!
这里写图片描述
Map
map的作用是将每一个发射的事件,应用于一个函数,从而可以对事件做相应处理,事件数量不会变化:

 Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
      emitter.onNext(1);
      emitter.onNext(2);
      emitter.onNext(3);
  }).map(integer -> {
      //将数字减一 ,并转化为字符串
      return "this integer = " + --integer;
  }).subscribe(s -> {
      //接收map处理后的事件
      Log.e("TAG", "" + s);
  });
05-19 16:25:03.220 10865-10865/com.example.rxjava.rxjavademo E/TAG: this integer = 0
05-19 16:25:03.220 10865-10865/com.example.rxjava.rxjavademo E/TAG: this integer = 1
05-19 16:25:03.220 10865-10865/com.example.rxjava.rxjavademo E/TAG: this integer = 2

实质上,map就是将一个被观察者,通过某种函数关系,转化为另一个被观察者!

FlatMap
flatMap将一个被观察者转化为多个被观察者,然后再将他们合并到一个被观察者中,比较拗口,看例子:

扫描二维码关注公众号,回复: 1099591 查看本文章
Observable.just(1,2,3)//这里创建一个被观察者 发射多个事件
          .flatMap(new Function<Integer, ObservableSource<String>>() {
                 @Override
                 public ObservableSource<String> apply(Integer integer) throws Exception {
                  这里,每进来一个事件,就创建返回一个新的被观察者,并发射自己的事件
                     Log.e("TAG", "apply: " + integer);
                     return Observable.just(integer + "---", "item2---");
                 }
             })//最后上面产生的多个被观察者都被合并成一个,将他们的事件发送到下游观察者
             .subscribe(s -> Log.e("TAG","event: " +  s));
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: apply: 1
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: 1---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: item2---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: apply: 2
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: 2---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: item2---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: apply: 3
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: 3---
05-19 17:54:39.950 23528-23528/com.example.rxjava.rxjavademo E/TAG: event: item2---

flatMap转化后 并不能保证事件的顺序。如果需要保证顺序,可以使用ConcatMap。

ConcatMap
concatMap和flatMap的区别在于 它能保证顺序

Observable.fromArray(1,10,20)
         .concatMap(new Function<Integer, ObservableSource<Integer>>() {
             @Override
             public ObservableSource<Integer> apply(Integer integer) throws Exception {
                 return Observable.range(integer, 2);
             }
         }).subscribe(e -> Log.e("TAG", "" + e));
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 1
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 2
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 10
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 11
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 20
05-19 18:11:19.070 26841-26841/com.example.rxjava.rxjavademo E/TAG: 21

buffer系列
buffer的作用是定时或定量缓存打包事件,将其发射到下游,而不是一次一个发射!

count变体

        //buffer(count,skip) 第一个参数count代表每次缓存的数量,skip表示下一次缓存的间隔,
        //如skip是1 表示下一次从2开始缓存,可以不写默认skip等于count
        Observable.just(1,2,3,4,5)
                .buffer(3)
                .subscribe(integers -> {
                    Log.e("TAG", "integers");
                    for (Integer integer : integers) {
                        Log.e("TAG", "i = " + integer);
                    }
                });
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 1
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 2
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 3
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 4
05-22 10:36:58.989 24586-24586/com.example.rxjava.rxjavademo E/TAG: i = 5
Observable.just(1, 2, 3, 4, 5)
        .buffer(3, 2)
        .subscribe(integers -> {
            Log.e("TAG", "integers");
            for (Integer integer : integers) {
                Log.e("TAG", "i = " + integer);
            }
        });
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 1
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 2
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 3
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 3
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 4
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 5
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: integers
05-22 10:39:54.359 25158-25158/com.example.rxjava.rxjavademo E/TAG: i = 5

time变体

//interval表示每隔500ms发射一个事件
 Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.io())
          .take(10)//表示只取前面10个事件
          .buffer(1, TimeUnit.SECONDS, 2)//每隔1s 打包一次事件将其发射到下游,count是每次打包的最大数量
          .subscribe(new Consumer<List<Long>>() {
              @Override
              public void accept(List<Long> longs) throws Exception {
                  for (Long aLong : longs) {
                      Log.e("TAG", "" + aLong);
                  }
              }
          });

过滤类操作符

这里写图片描述
distinct
去掉重复的事件

 Observable.just(1,1,2,1,3,2)
         .distinct()
         .subscribe(integer -> Log.e("TAG", "" + integer));
05-19 18:26:17.620 29437-29437/com.example.rxjava.rxjavademo E/TAG: 1
05-19 18:26:17.620 29437-29437/com.example.rxjava.rxjavademo E/TAG: 2
05-19 18:26:17.620 29437-29437/com.example.rxjava.rxjavademo E/TAG: 3

Filter
接收一个参数,过滤掉不符合条件的

 Observable.just(1,1,2,1,3,2)
         .filter(integer -> integer == 2)
         .subscribe(integer -> Log.e("TAG", "" + integer));
05-19 18:30:02.920 30098-30098/com.example.rxjava.rxjavademo E/TAG: 2
05-19 18:30:02.920 30098-30098/com.example.rxjava.rxjavademo E/TAG: 2

elementAt
只返回指定下标的事件,如果没有这一项,就返回默认值

Observable.just(1,2,3)
        .elementAt(5, 0)//只发射第6个数据, 若找不到就发射默认值0
        .subscribe(integer -> Log.e("TAG", "" + integer));//0
Observable.just(1,2,3)
        .elementAt(2)//只发射第三个数据,若找不到就不发射
        .subscribe(integer -> Log.e("TAG", "" + integer));//3

skip系列
变体count系列,丢弃被观察者发射的前N项数据

 Observable.range(0, 5)
         .skip(3)//丢弃前3个数据
         .subscribe(r -> Log.e("TAG", "" + r));
05-19 18:18:51.170 28109-28109/com.example.rxjava.rxjavademo E/TAG: 3
05-19 18:18:51.170 28109-28109/com.example.rxjava.rxjavademo E/TAG: 4

变体time系列, 丢弃Observable开始的那段时间发射 的数据:

//interval 表示在io线程 每隔500ms 发射一个事件
Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.io())
                .skip(3000, TimeUnit.MILLISECONDS)//丢弃前3000ms发射的数据
                .subscribe(aLong -> Log.e("TAG", "" +aLong));
05-19 22:57:01.959 5268-5287/com.example.hp.test1 E/tag: 5
05-19 22:57:02.459 5268-5287/com.example.hp.test1 E/tag: 6
05-19 22:57:02.958 5268-5287/com.example.hp.test1 E/tag: 7
05-19 22:57:03.459 5268-5287/com.example.hp.test1 E/tag: 8
...

还有skipLast,take系列等用法相似。

合并类操作符

这里写图片描述
Zip
使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合!

 Observable<Integer>  observable1=Observable.just(1,2,3,4);
 Observable<Integer>  observable2=Observable.just(1,2);
 Observable<String>  observable3=Observable.just("s1", "s2", "s3");
 Observable.zip(observable1, observable2, observable3, new Function3<Integer, Integer, String, String>() {
     @Override
     public String apply(Integer integer, Integer integer2, String s) throws Exception {
         return integer - integer + s;
     }
 }).subscribe(s -> Log.e("TAG", s));
05-22 11:44:16.635 4269-4269/? E/TAG: 0s1
05-22 11:44:16.635 4269-4269/? E/TAG: 0s2

merge/mergeWith

Observable<Integer>  observable1=Observable.just(1,2);
Observable<Integer>  observable2=Observable.just(5);
//下面两中用法等价
Observable.merge(observable1, observable2)
         .subscribe(integer -> Log.e("TAG", "" + integer));
 Log.e("TAG", "------------------");
observable1.mergeWith(observable2)
        .subscribe(integer -> Log.e("TAG", "" + integer));
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 1
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 2
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 5
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: ------------------
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 1
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 2
05-22 11:55:38.675 7026-7026/com.example.rxjava.rxjavademo E/TAG: 5

startWith/startWithArray
在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列

Observable.just("data1")
       .startWith("data2")
       .startWithArray("data3", "data4")
       .subscribe(s -> Log.e("TAG", s));
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data3
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data4
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data2
05-22 15:42:37.295 10308-10308/com.example.rxjava.rxjavademo E/TAG: data1

限于篇幅过长,下一篇文章RxJava2.x学习教程(三)常用操作符,会接着介绍剩下的操作符!

猜你喜欢

转载自blog.csdn.net/chaoyangsun/article/details/80373203