Rxjava操作符之过滤操作

前言:

本文将介绍以下过滤类操作符(基于Rxjava2.0):

  • filter
  • ofType
  • take
  • takeLast
  • first
  • firstOrError
  • last
  • lastOrError
  • skip
  • skipLast
  • elementAt
  • elementAtOrError
  • debounce
  • throttleWithTimeout
  • distinct
  • ignoreElements
  • sample
  • throttleLast

1,filter()

  • 描述:Filter操作符使用你指定的一个谓词函数测试数据项,只有通过测试的数据才会被发射。

  • 示意图:

  • 示例:

    Observable.just(1, 2, 3, 4, 5)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer > 3;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    输出:

2,ofType()

  • 描述:ofTypefilter操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

  • 示意图:

  • 示例:

    Observable.just(1, 2, 3, 4, 5)
            .ofType(Integer.class)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer value) throws Exception {
                    Log.i(TAG, "accept:" + value);
                }
            });
    
    
    Observable.just(1, 2, 3, 4, 5)
            .ofType(String.class)
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String value) throws Exception {
                    Log.i(TAG, "accept:" + value);
                }
            });     
    

    第一个示例输出1、2、3、4、5这五个数字,而第二个示例将不会输出任何值。

3,take()

  • 描述:使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。

    如果你对一个Observable使用take(n)(或它的同义词limit(n))操作符,而那个Observable发射的数据少于N项,那么take操作生成的Observable不会抛异常或发射onError通知,在完成前它只会发射相同的少量数据。

  • 示意图:

  • 示例:

    Observable.just(1, 2, 3, 4, 5)
            .take(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    输出:

4,takeLast()

  • 描述:使用TakeLast操作符修改原始Observable,你可以只发射Observable发射的后N项数据,忽略前面的数据。注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。

  • 示意图:

  • 示例:

    Observable.just(1, 2, 3, 4, 5)
            .takeLast(2)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    输出:

5,first()

  • 描述:只发射第一项(或者满足某个条件的第一项)数据。此方法的实现为:first(defaultItem),返回第一个数据,如果Observagle没有发射任何数据时发射一个你在参数中指定的默认值。

  • 示意图:

  • 示例:

    Observable.just(1, 2, 3, 4, 5)
            .first(2)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    
    
    Observable.empty()
            .first(2)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    第一个输出:1

    第二个输出:2(默认值)

6,firstOrError()

  • 描述:与frist()类似,返回第一个数据,如果Observagle没有发射任何数据时发出一个NoSuchElementException的信号。

7,last()

  • 描述:与first()类似,区别是返回最后一个数据,不再示例。

8,lastOrError()

  • 描述:与firstOrError()类似,返回最后一个数据,如果没有任何数据则发出一个NoSuchElementException

9,skip()

  • 描述:忽略Observable发射的前N项数据,只保留之后的数据。

  • 示意图:

  • 示例:

    Observable.just(1,2,3,4,5)
            .skip(2)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    输出:

  • 重载:

    • skip(long,TimeUnit)
    • skip(long,TimeUnit,Scheduler)

10,skipLast()

  • 描述:忽略Observable发射的后N项数据,只保留前面的数据。

    功能使用与skip相同,不再示例。

  • 示意图:

  • 重载:

    • skipLast(long time, TimeUnit unit)
    • skipLast(long time, TimeUnit unit, Scheduler scheduler)
    • skipLast(long time, TimeUnit unit, boolean delayError)
    • skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
    • skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

11,elementAt()

  • 描述:只发射第N项数据。注意索引从0开始。

  • 示意图:

  • 示例:

    Observable.just(1,2,3,4,5)
            .elementAt(1)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    输出:

  • 重载:elementAt(long index, T defaultItem)

    elementAt(long)相同,区别是如果index超过observable发送数据量的范围,返回一个默认的值。

12,elementAtOrError()

  • 描述:只发射第N项数据,如果不存在则发送NoSuchElementException错误。

13,debounce()

  • 描述:仅在过了一段指定的时间还没发射数据时才发射一个数据。Debounce操作符会过滤掉发射速率过快的数据项。(此操作符常用于按钮防抖动

    注意:这个操作符会会接着最后一项数据发射原始Observable的onCompleted通知,即使这个通知发生在你指定的时间窗口内(从最后一项数据的发射算起)。也就是说,onCompleted通知不会触发限流。

  • 示意图:

  • 示例:

    Observable.intervalRange(0, 5, 0, 100, TimeUnit.MILLISECONDS)
            .debounce(10, TimeUnit.MILLISECONDS)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    
    
    Observable.intervalRange(0, 5, 0, 100, TimeUnit.MILLISECONDS)
            .debounce(110, TimeUnit.MILLISECONDS)
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    第一个输出:0,1,2,3,4。因为防抖动时间小于数据源发射数据的时间间隔,所以正常接收。

    第二个输出:4。因为防抖动时间还没走完,数据源就发来的下一个数据,这时重新计时,直到最后一个数据4的时候,走完了110ms,最终发送。

  • 重载:debounce(long timeout, TimeUnit unit, Scheduler scheduler)

14,throttleWithTimeout()

  • 描述:与debounce()功能和用法一模一样。实际上此操作符是调用debounce()方法来实现的。

  • 重载:throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)

15,distinct()

  • 描述:抑制(过滤掉)重复的数据项,只允许还没有发射过的数据项通过。

  • 示意图:

  • 示例:

    Observable.just(1,2,1,1,2,3,3)
            .distinct()
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object value) throws Exception {
                    System.out.println("accept:" + value);
                }
            });
    

    输出:

  • 重载:

    • distinct(Function<? super T, K> keySelector)允许调整判定两个数据不同的标准

    这个说一下,因为还是比较常用的。很多时候我们需要给一个bean类集合去重,那这时候就需要我们自定义去重规则。网上几乎没有这个重载方法的示例,这里我简单写一个,让大家好理解使用。

    示例:

    List<StudentBean> list = new ArrayList<>();
    list.add(new StudentBean("小一", 20));
    list.add(new StudentBean("小二", 22));
    list.add(new StudentBean("小三", 20));
    list.add(new StudentBean("小四", 21));
    list.add(new StudentBean("小五", 21));
    
    Observable.fromIterable(list)
            .distinct(new Function<StudentBean, Integer>() {
                @Override
                public Integer apply(StudentBean bean) throws Exception {
                    //根据年龄去重
                    return bean.getAge();
                }
            })
            .subscribe(new Consumer<StudentBean>() {
                @Override
                public void accept(StudentBean value) throws Exception {
                    System.out.println("accept:" + value.getName());
                }
            });
    

    输出:

    • distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)

16,ignoreElements()

  • 描述:不发射任何数据,只发射Observable的终止通知。

    IgnoreElements操作符抑制原始Observable发射的所有数据,只允许它的终止通知(onError或onCompleted)通过。如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用ignoreElements操作符,它会确保永远不会调用观察者的onNext()方法。

  • 示意图:

  • 示例:

    Observable.just(1, 2, 3, 4)
            .ignoreElements()
            .subscribe(new CompletableObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError");
                }
            });
    

    输出:

17,sample()

  • 描述:定期扫描源Observable产生的结果,在指定的间隔周期内进行采样。

  • 示意图:

  • 示例:

    Observable.interval(1, TimeUnit.SECONDS)
            .sample(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("accept:" + aLong);
                }
            });
    

    输出:

  • 重载:

    • sample(long period, TimeUnit unit, boolean emitLast)
    • sample(long period, TimeUnit unit, Scheduler scheduler)
    • sample(long period, TimeUnit unit, Scheduler scheduler, boolean emitLast)
    • sample(ObservableSource<U> sampler)
    • sample(ObservableSource<U> sampler, boolean emitLast)

18,throttleLast()

  • 描述:功能和用法同sample。内部实现为调用sample对应参数的方法。

其他操作符介绍:

Rxjava操作符之创建操作符

Rxjava操作符之变换操作符

Rxjava操作符之结合类操作符

Rxjava操作符之辅助操作

猜你喜欢

转载自blog.csdn.net/wernerzeiss/article/details/81085927