RxJava2 Flowable 静态操作符

RxJava的学习成本其实蛮高的,我写RxJava这个系列是希望缩短学习RxJava的时间,我这里完全根据官网来学习一遍,写用例,做笔记,对于对英文不太感冒的同学,我相信看我的博客可以舒服不少!

国庆两天假初步整理了一下Flowable的静态操作符,后续将会继续写Flowable的实例操作符,ObservableSingleCompletableMaybe操作符以及对各操作符的分类和总结,使用场景,目标是可以当做RxJava的字典来查,并且快速学习看懂。希望喜欢的同学收藏点赞,给我鼓励和批评,谢谢!

目录

 

1 Flowable简介

2 Flowable的静态作符

1 amb

2 ambArray

3 bufferSize

4 combineLatest

5 combineLatestDelayError

6 Concat 

7 concatArray

8 concatArrayDelayError

9 concatArrayEager

10 concatArrayEagerError

11 concatDelayError

12 concatEager

13 create

14 defer

15 empty

16 error

17 fromArray

18 fromCallable

19 fromFuture

20 fromIterable

21 fromPublisher

22 generate

23 interval

24 intervalRange

25 just

26 merge

27 mergeArray

28 mergeDelayError

29 never

30 range

31 rangeLong

32 sequenceEqual

33 switchOnNext

34 switchOnNextDelayError

35 timer

36 unsafeCreate

37 using

38 zip、zipArray、zipIterable


1 Flowable简介

见上一篇:RxJava2系列 (一) 五大重要角色基本功能介绍 

2 Flowable的静态作符

1 amb

static <T> Flowable<T>

amb(Iterable<? extends Publisher<? extends T>> sources)

参数传入的是Publisher的集合,Iterable是Collection的父类,Collection是List的父类

给定两个或多个源Publisher,仅从这些Publisher中的第一个发出所有项目以发出项目或通知

1.1 amb图解

注意并不是一定是排在第一位的20、40、60获得优先权,可能有时候Publisher会有延迟操作

1.2 amb测试用例

测试

    private void doAmb() {

        List<Flowable<Integer>> flowables = new ArrayList<Flowable<Integer>>();
        flowables.add(Flowable.just(20,40,60).delay(1, TimeUnit.SECONDS));
        flowables.add(Flowable.just(1,2,3));
        flowables.add(Flowable.just(0,0,0));

        Flowable
                .amb(flowables)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("value is " + integer);
                    }
                });
    }

测试结果:
10-02 21:11:27.958 14733-14733/hq.demo.net I/System.out: value is 1
10-02 21:11:27.958 14733-14733/hq.demo.net I/System.out: value is 2
10-02 21:11:27.958 14733-14733/hq.demo.net I/System.out: value is 3
10-02 21:11:30.970 14733-14763/hq.demo.net I/System.out: Done!

从上面例子来看,也就是在一个以Flowable为元素的集合中,那个元素最先开始发射数据,那么它只发射其中首先发射数据或通知(onError或onCompleted)的那个Flowable的中的数据流,而其他的Flowable将被丢弃。

2 ambArray

static <T> Flowable<T>

ambArray(Publisher<? extends T>... sources)

参数传入的是Publisher的数组

给定两个或多个源Publisher,仅从这些Publisher中的第一个发出所有项目以发出项目或通知

2.1 图解

其实ambArray跟amb的用法类似,只是换成以Publisher为元素的数组,所以上面的图解与amb相同

2.2 测试用例

private void doAmbArray() {
        Log.d("#Flowable#","do ambArray###############");
        Flowable[] flowables = {Flowable.just(20,40,60).delay(1,TimeUnit.SECONDS),Flowable.just(1,2,3),Flowable.just(0,0,0),Flowable.just(1,2,3)};
        Flowable.ambArray(flowables).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("accept:"+integer);
            }
        });
    }


结果输出:
09-30 20:58:23.684 5885-5885/hq.demo.net D/#Flowable#: do ambArray###############
09-30 20:58:23.716 5885-5885/hq.demo.net I/System.out: accept:1
09-30 20:58:23.716 5885-5885/hq.demo.net I/System.out: accept:2
09-30 20:58:23.716 5885-5885/hq.demo.net I/System.out: accept:3

3 bufferSize

static int

bufferSize()

返回大多数异步运算符使用的默认内部缓冲区大小

3.1  bufferSize图解

无,只是返回一个内部缓冲区大小的int值,返回值默认大小128

3.2 bufferSize测试用例

  private void doBufferSize() {
        Log.d("#Flowable#", "do bufferSize###############");
        int bufferSize = Flowable.bufferSize();
        System.out.println("bufferSize = " + bufferSize);
    }

结果输出:
09-30 21:16:07.242 6381-6381/hq.demo.net D/#Flowable#: do bufferSize###############
09-30 21:16:07.242 6381-6381/hq.demo.net I/System.out: bufferSize = 128

4 combineLatest

combineLatest一共有13个重载方法,说明的都是一个功能,就是自定义一个函数来,根据函数规则处理两个以上被观察源发出的消息,得到一个结果,再把这个结果发射出去,只是这些重载方法涵盖的比较全面,传入参数可以使数组(动态数组、已知长度数组)、集合、2个到9个发布源。

static <T,R> Flowable<R>

combineLatest(Function<? super Object[],? extends R> combiner, Publisher<? extends T>... sources)

当一个项目由两个以上的Publisher中的任何一个发出时,通过指定的函数组合每个Publisher发出的最新项目,并根据此函数的结果发出最终项目

static <T,R> Flowable<R>

combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> combiner)

通过发出一个项目来合并源发布者的集合,该项目在每次从任何源发布者接收项目时聚合每个源发布者的最新值,其中此聚合由指定的函数定义。

static <T,R> Flowable<R>

combineLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> combiner, int bufferSize)

通过发出一个项目来合并源发布者的集合,该项目在每次从任何源发布者接收项目时聚合每个源发布者的最新值,其中此聚合由指定的函数定义。

static <T,R> Flowable<R>

combineLatest(Publisher<? extends T>[] sources, Function<? super Object[],? extends R> combiner)

通过发出一个项目来合并源发布者的集合,该项目在每次从任何源发布者接收项目时聚合每个源发布者的最新值,其中此聚合由指定的函数定义。

static <T,R> Flowable<R>

combineLatest(Publisher<? extends T>[] sources, Function<? super Object[],? extends R> combiner, int bufferSize)

通过发出一个项目来合并源发布者的集合,该项目在每次从任何源发布者接收项目时聚合每个源发布者的最新值,其中此聚合由指定的函数定义。

注意:上面连着的4个函数都是一个意思,其中两个重载方法能够指定缓存大小

static <T1,T2,R> Flowable<R>

combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> combiner)

通过发出一个项目来合并两个源发布者,每次从任一源发布者接收项目时,该项目都会聚合每个源发布者的最新值,其中此聚合由指定的函数定义。

<T1,T2,T3,T4,T5,T6,T7,T8,T9,R>

这里省略了其他7个重载方法,说的都是一个意思

每次从任何源发布者接收项目时,通过发出一个聚合每个源发布者的最新值的项目来合并3(4、5、6、7、8、9)个源发布者,其中此聚合由指定的函数定义。

4.1 combineLatest图解

4.2 combineLatest测试用例 

这个方法是一个多重重载的静态方法,这里只举一个最简单的例子,根据上图做2个发射源的发出的消息的合并

 private void doCombineLatest() {
        Log.d("#Flowable#", "do combineLatest###############");
        Flowable sourceA = Flowable.just(1, 2, 3);
        Flowable sourceB = Flowable.just(4, 5, 6);
        Flowable sourceC = Flowable.just(7, 8, 9);

        Flowable.combineLatest(sourceA, sourceB, sourceC, new Function3<Integer,Integer,Integer, String>() {
            @Override
            public String apply(Integer o, Integer o2, Integer o3) throws Exception {
                return (o + o2 + o3) + "";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String o) throws Exception {
                System.out.println("accept:value="+o);
            }
        });
    }


测试结果:
09-30 23:56:40.523 11747-11747/hq.demo.net D/#Flowable#: do combineLatest###############
09-30 23:56:40.523 11747-11747/hq.demo.net I/System.out: accept:value=16
09-30 23:56:40.523 11747-11747/hq.demo.net I/System.out: accept:value=17
09-30 23:56:40.524 11747-11747/hq.demo.net I/System.out: accept:value=18

combineLatestDelayError

combineLatestDelayError在combineLatest的基础上增加了对error的延迟处理,让error延迟到所有项目被发射完毕

static <T,R> Flowable<R>

combineLatestDelayError(Function<? super Object[],? extends R> combiner, int bufferSize, Publisher<? extends T>... sources)

通过发出一个项来合并源发布者的集合,该项在每次从任何源发布者接收项时聚合每个源发布者的最新值,其中此聚合由指定的函数定义并延迟来自源的任何错误 直到所有源发布者终止。

static <T,R> Flowable<R>

combineLatestDelayError(Function<? super Object[],? extends R> combiner, Publisher<? extends T>... sources)

通过发出一个项来合并源发布者的集合,该项在每次从任何源发布者接收项时聚合每个源发布者的最新值,其中此聚合由指定的函数定义并延迟来自源的任何错误 直到所有源发布者终止。

static <T,R> Flowable<R>

combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> combiner)

通过发出一个项来合并源发布者的集合,该项在每次从任何源发布者接收项时聚合每个源发布者的最新值,其中此聚合由指定的函数定义并延迟来自源的任何错误 直到所有源发布者终止。

static <T,R> Flowable<R>

combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> combiner, int bufferSize)

通过发出一个项来合并源发布者的集合,该项在每次从任何源发布者接收项时聚合每个源发布者的最新值,其中此聚合由指定的函数定义并延迟来自源的任何错误 直到所有源发布者终止。

static <T,R> Flowable<R>

combineLatestDelayError(Publisher<? extends T>[] sources, Function<? super Object[],? extends R> combiner)

通过发出一个项目来合并源发布者的集合,该项目在每次从任何源发布者接收项目时聚合每个源发布者的最新值,其中此聚合由指定的函数定义。

static <T,R> Flowable<R>

combineLatestDelayError(Publisher<? extends T>[] sources, Function<? super Object[],? extends R> combiner, int bufferSize)

通过发出一个项来合并源发布者的集合,该项在每次从任何源发布者接收项时聚合每个源发布者的最新值,其中此聚合由指定的函数定义并延迟来自源的任何错误 直到所有源发布者终止。

5.1 combineLatestDelayError图解

同4.1

5.2 combineLatestDelayError测试用例

6 Concat 

static <T> Flowable<T>

concat(Iterable<? extends Publisher<? extends T>> sources)

将通过Iterable序列提供的以Publisher为元素的集合中的每一个Publisher连接到单个元素序列中,而不进行交错。

static <T> Flowable<T>

concat(Publisher<? extends Publisher<? extends T>> sources)

返回一个Flowable,它一个接一个地发出源Publisher发出的每个发布者发出的项,而不进行交错。

static <T> Flowable<T>

concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch)

返回一个Flowable,它一个接一个地发出源Publisher发出的每个项,而不进行交错。

static <T> Flowable<T>

concat(Publisher<? extends T> source1, Publisher<? extends T> source2)

返回一个Flowable,它一个接一个地发出2个Publishers发出的项,而不进行交错。

static <T> Flowable<T>

concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3)

返回一个Flowable,它一个接一个地发出3个Publishers发出的项,而不进行交错。

static <T> Flowable<T>

concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3,Publisher<? extends T> source4)

返回一个Flowable,它一个接一个地发出4个发布者发出的项目,而不进行交错。

6.1 Concat图解

6.2 Concat测试用例


测试1:
private void doConcat() {
        Flowable<Integer> sourceA = Flowable.just(1, 2, 3);
        Flowable<Integer> sourceB = Flowable.just(11, 12, 13);
        Flowable<Integer> sourceC = Flowable.just(21, 22, 23);
        List<Publisher<Integer>> source = new ArrayList<>();
        source.add(sourceA);
        source.add(sourceB);
        source.add(sourceC);

        Flowable.concat(source).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("value:" + integer);
            }
        });
    }

测试1结果:
10-01 10:04:39.391 28084-28084/hq.demo.net I/System.out: value:1
10-01 10:04:39.392 28084-28084/hq.demo.net I/System.out: value:2
10-01 10:04:39.392 28084-28084/hq.demo.net I/System.out: value:3
10-01 10:04:39.392 28084-28084/hq.demo.net I/System.out: value:11
10-01 10:04:39.393 28084-28084/hq.demo.net I/System.out: value:12
10-01 10:04:39.393 28084-28084/hq.demo.net I/System.out: value:13
10-01 10:04:39.393 28084-28084/hq.demo.net I/System.out: value:21
10-01 10:04:39.396 28084-28084/hq.demo.net I/System.out: value:22
10-01 10:04:39.397 28084-28084/hq.demo.net I/System.out: value:23


测试2:
private void doConca2() {
        Flowable<String> sourceA = Flowable.just("A", "B", "C");
        Flowable<String> sourceB = Flowable.just("X", "Y", "Z");
        Flowable<String> sourceC = Flowable.just("H", "I", "J");
        Flowable<Publisher<String>> publishers = Flowable.just(sourceA,sourceB,sourceC);

        Flowable.concat(publishers).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("value:" + v);
            }
        });
    }

测试2结果:
10-01 10:11:38.014 28482-28482/hq.demo.net I/System.out: value:A
10-01 10:11:38.014 28482-28482/hq.demo.net I/System.out: value:B
10-01 10:11:38.014 28482-28482/hq.demo.net I/System.out: value:C
10-01 10:11:38.014 28482-28482/hq.demo.net I/System.out: value:X
10-01 10:11:38.014 28482-28482/hq.demo.net I/System.out: value:Y
10-01 10:11:38.015 28482-28482/hq.demo.net I/System.out: value:Z
10-01 10:11:38.015 28482-28482/hq.demo.net I/System.out: value:H
10-01 10:11:38.015 28482-28482/hq.demo.net I/System.out: value:I
10-01 10:11:38.015 28482-28482/hq.demo.net I/System.out: value:J

测试3
private void doConcat3() {
        System.out.println("doConcat3----------------->");
        Flowable<String> sourceA = Flowable.just("A", "B", "C");
        Flowable<String> sourceB = Flowable.just("X", "Y", "Z");
        Flowable<String> sourceC = Flowable.just("H", "I", "J");

        Flowable.concat(sourceC, sourceA, sourceB).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("value:" + v);
            }
        });
    }

测试3结果:
10-01 10:17:35.711 28715-28715/hq.demo.net I/System.out: doConcat3----------------->
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:H
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:I
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:J
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:A
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:B
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:C
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:X
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:Y
10-01 10:17:35.713 28715-28715/hq.demo.net I/System.out: value:Z

6.3 Concat总结

通过上面的测试我们发现,无论是集合(元素Publisher),还是Publisher(源元素Publisher),还是one by one的传入Publisher,concat操作符就是将每个Publisher按照顺序一个个串起来,然后根据他们发射项目按顺序发射

7 concatArray

static <T> Flowable<T>

concatArray(Publisher<? extends T>... sources)

连接可变数量的Publisher源。

7.1 concatArray图解 

同6.1

7.2 concatArray测试用例

测试代码
private void doConcatArray() {
        System.out.println("doConcatArray----------------->");
        Flowable<String> sourceA = Flowable.just("A", "B", "C");
        Flowable<String> sourceB = Flowable.just("X", "Y", "Z");
        Flowable<String> sourceC = Flowable.just("H", "I", "J");
        Flowable[] flowables = {sourceA,sourceB,sourceC};

        Flowable.concatArray(sourceC, sourceA).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("value:" + v);
            }
        });
        System.out.println("换个姿势再来一次----------------->");
        Flowable.concatArray(flowables).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("value:" + v);
            }
        });
    }

测试结果:
10-01 10:28:03.883 29369-29369/hq.demo.net I/System.out: doConcatArray----------------->
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:H
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:I
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:J
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:A
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:B
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:C
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: 换个姿势再来一次----------------->
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:A
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:B
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:C
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:X
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:Y
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:Z
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:H
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:I
10-01 10:28:03.884 29369-29369/hq.demo.net I/System.out: value:J

7.3 concatArray总结

concatArray其实和concat一样,只是传入的是个以Publisher为元素的数组

8 concatArrayDelayError

static <T> Flowable<T>

concatArrayDelayError(Publisher<? extends T>... sources)

连接可变数量的Publisher源并延迟其中任何一个的错误,直到所有项目被发射完毕为止。

8.1 concatArrayDelayError图解 

同concat

8.2 concatArrayDelayError测试用例

测试代码:
private void doConcatArrayDelayError() {
        System.out.println("doconcatArrayDelayError----------------->");
        Flowable<String> sourceA = Flowable.just("A", "B", "C");
        Flowable<String> sourceB = Flowable.just("X", "Y", "Z");
        Flowable[] flowables = {sourceA,sourceB};
        Flowable.concatArrayDelayError(flowables).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("value:" + v);
            }
        });
        doConcatArrayEager();
    }

测试结果:

10-01 10:34:23.845 29655-29655/hq.demo.net I/System.out: doconcatArrayDelayError----------------->
10-01 10:34:23.847 29655-29655/hq.demo.net I/System.out: value:A
10-01 10:34:23.847 29655-29655/hq.demo.net I/System.out: value:B
10-01 10:34:23.847 29655-29655/hq.demo.net I/System.out: value:C
10-01 10:34:23.848 29655-29655/hq.demo.net I/System.out: value:X
10-01 10:34:23.848 29655-29655/hq.demo.net I/System.out: value:Y
10-01 10:34:23.848 29655-29655/hq.demo.net I/System.out: value:Z

8.3 concatArrayDelayError测试结果说明

从测试结果来看鱼concat并没有什么什么不同,只是其在发送项目时如果遇到error并不会停止发射,而是继续发射下一条,直到所有项目被发射完毕,但是concat活着concatArray如果遇到error则会停下来。

9 concatArrayEager

static <T> Flowable<T>

concatArrayEager(int maxConcurrency, int prefetch, Publisher<? extends T>... sources)

将一组Publishers急切地连接成一个单独的值流。

static <T> Flowable<T>

concatArrayEager(Publisher<? extends T>... sources)

将一组Publishers急切地连接成一个单独的值流。

9.1 concatArrayEager图解

同conact

9.2 concatArrayEager测试用例

测试代码:
private void doConcatArrayEager() {
        System.out.println("doConcatArrayEager----------------->");
        Flowable<String> sourceA = Flowable.just("A", "C", "B");
        Flowable<String> sourceB = Flowable.just("Z", "Y", "X");
        Flowable[] flowables = {sourceA,sourceB};
        Flowable.concatArrayEager(flowables).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("value:" + v);
            }
        });
    }

测试结果:
10-01 11:00:51.710 30507-30507/hq.demo.net I/System.out: doConcatArrayEager----------------->
10-01 11:00:51.716 30507-30507/hq.demo.net I/System.out: value:A
10-01 11:00:51.716 30507-30507/hq.demo.net I/System.out: value:C
10-01 11:00:51.716 30507-30507/hq.demo.net I/System.out: value:B
10-01 11:00:51.716 30507-30507/hq.demo.net I/System.out: value:Z
10-01 11:00:51.716 30507-30507/hq.demo.net I/System.out: value:Y
10-01 11:00:51.716 30507-30507/hq.demo.net I/System.out: value:X

9.3 concatArrayEager测试说明

从测试结果看不出与conactArray有何不同,应该体现在Eager这个单词上,热切的,急切的

源码注释上说:

急切连接意味着一旦订阅者订阅,该运操作就会订阅所有源发布者。该运算符会缓存这些发布者发出的值,每当前一个发布者发射完它的数据后,就会按发射顺序排列

10 concatArrayEagerError

static <T> Flowable<T>

concatArrayEagerDelayError(int maxConcurrency, int prefetch, Publisher<? extends T>... sources)

将发布者数组急切地连接到单个值流中,并延迟所有错误,直到所有源发射数据终止。

static <T> Flowable<T>

concatArrayEagerDelayError(Publisher<? extends T>... sources)

Concatenates an array of Publishers eagerly into a single stream of values and delaying any errors until all sources terminate.

10.1 concatArrayEagerError图解

同conact

10.2 concatArrayEagerError测试用例

略,同上面遇到的所有以Error结尾的操作符一样,都是延迟error,直到所有数据发射完毕

11 concatDelayError

static <T> Flowable<T>

concatDelayError(Iterable<? extends Publisher<? extends T>> sources)

通过一个接一个地订阅每个发布者,将Iterable发布者序列连接成一个序列,一次一个,并延迟任何错误,直到所有内部发布者终止。

static <T> Flowable<T>

concatDelayError(Publisher<? extends Publisher<? extends T>> sources)

通过一个接一个地订阅每个内部发布者,将发布者的发布者序列连接成单个序列,一次一个,并延迟任何错误,直到所有内部和外部发布者终止。

static <T> Flowable<T>

concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch, boolean tillTheEnd)

通过一个接一个地订阅每个内部发布者,将发布者的发布者序列连接成单个序列,一次一个,并延迟任何错误,直到所有内部和外部发布者终止。

11.1 concatDelayError图解

同concat

11.2 concatDelayError测试用例

略,与concat一样,只是对error做了延迟操作

12 concatEager

static <T> Flowable<T>

concatEager(Iterable<? extends Publisher<? extends T>> sources)

将发布者的集合中的发布者一个个热切地连接到单个值流中。

static <T> Flowable<T>

concatEager(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)

将发布者的集合中的发布者一个个热切地连接到单个值流中,可设置最大并发数

static <T> Flowable<T>

concatEager(Publisher<? extends Publisher<? extends T>> sources)

将以发布者为项目的发布者序列急切地连接到单个值流中。

static <T> Flowable<T>

concatEager(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency, int prefetch)

将以发布者为项目的发布者序列急切地连接到单个值流中。

12.1 concatEager图解

同concat

12.2 concatEager测试用例

同concat

13 create

static <T> Flowable<T>

create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)

Provides an API (via a cold Flowable) that bridges the reactive world with the callback-style, generally non-backpressured world.

提供一个API(通过冷Flowable),将响应式世界与回调式模式联系起来,一般没有背压。

简单点理解就是,create这个操作符就是创建一个Flowable

13.1 冷Flowable 与 热Flowable

所谓的冷Flowable就是只有在被订阅者订阅的时候才会发射数据Flowable,没有订阅者订阅则不发射数据,什么时候订阅者开始订阅,什么时候发射数据,所有的订阅者无论时间先后,收到的数据都是相同的。

相对的热Flowable,只要被创建了,不管有没有订阅者订阅都会发射数据的Flowable,当订阅者订阅后,如果之前有发射数据,那么订阅者将不会收到Flowable之前发射的数据。

13.1 create测试用例

测试代码
    private void doCreate() {
        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                e.onNext("Hello");
                e.onNext("World");
            }
        }, BackpressureStrategy.BUFFER).subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {

                System.out.println("accept,value = " + v);
            }
        });

    }

测试结果
10-01 13:11:06.221 908-908/hq.demo.net I/System.out: accept,value = Hello
10-01 13:11:06.222 908-908/hq.demo.net I/System.out: accept,value = World

14 defer

static <T> Flowable<T>

defer(Callable<? extends Publisher<? extends T>> supplier).

返回一个Flowable,它调用Publisher工厂为每个订阅的新订阅者创建一个Publisher。 也就是说,对于每个订阅者,订阅者观察的实际发布者由工厂功方法决定。

注意defer会为每一个订阅者单独创建一个Publisher实例,一对一的关系。

14.1 defer图解

14.2、defer测试用例

测试代码
private void doDefer() {
        Flowable<String> flowable = Flowable.defer(new Callable<Publisher<String>>() {
            @Override
            public Publisher<String> call() throws Exception {
                System.out.println("创建Publisher");
                return Flowable.just("A", "B", "C");
            }
        });

        flowable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

        flowable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }





测试结果
10-01 14:13:08.125 2673-2673/hq.demo.net I/System.out: 创建Publisher
10-01 14:13:08.127 2673-2673/hq.demo.net I/System.out: A
10-01 14:13:08.127 2673-2673/hq.demo.net I/System.out: B
10-01 14:13:08.127 2673-2673/hq.demo.net I/System.out: C
10-01 14:13:08.128 2673-2673/hq.demo.net I/System.out: 创建Publisher
10-01 14:13:08.128 2673-2673/hq.demo.net I/System.out: A
10-01 14:13:08.128 2673-2673/hq.demo.net I/System.out: B
10-01 14:13:08.129 2673-2673/hq.demo.net I/System.out: C

14.3 defer说明

一个被观察者可以被多个订阅者订阅,如果需要每个被观察者被订阅的时候都去重新创建(也就是一对一),可以选defer操作符,从上面测试用例的输出也可以得出结论,创建一个Publisher,并且发射完成后才会再创建下一个实例。

15 empty

static <T> Flowable<T>

empty()

返回一个Flowable,它不向订阅服务器发送任何项,并立即调用其onComplete方法。

15.1 empty图解

15.2 empty测试用例

private void doEmpty() {
        Flowable.empty().subscribe(new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe:" + s);
            }

            @Override
            public void onNext(Object o) {
                System.out.println("onNext:" + o.toString());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("onError:"+ t.getMessage());
            }

            @Override
            public void onComplete() {

                System.out.println("onComplete");
            }
        });
    }

结果输出:
09-30 20:06:06.207 4312-4312/hq.demo.net I/System.out: onSubscribe:EmptySubscription
09-30 20:06:06.207 4312-4312/hq.demo.net I/System.out: onComplete


 

16 error

static <T> Flowable<T>

error(Callable<? extends Throwable> supplier)

返回一个Flowable,当订阅者订阅它时,它调用订阅者的onError方法。

static <T> Flowable<T>

error(Throwable throwable)

返回一个Flowable,当订阅者订阅它时,它调用订阅者的onError方法。

16.1 error图解

16.2 error测试用例

测试代码
private void doError() {
        Flowable.error(new RuntimeException("test error")).subscribe(new Subscriber<Object>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe," + s);
            }

            @Override
            public void onNext(Object o) {
                System.out.println("onNext," + o.toString());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("onError:"+t.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
    }



测试结果:
10-01 14:33:18.146 3242-3242/hq.demo.net I/System.out: onSubscribe,EmptySubscription
10-01 14:33:18.147 3242-3242/hq.demo.net I/System.out: onError:test error

16.3 error测试用例说明

从测试用例我们可以得出结论,error创建的Flowable只会发出error通知,回调观察者的onError接口

17 fromArray

static <T> Flowable<T>

fromArray(T... items)

将数组转换为一个Publisher,它发送到项目就是数组中的元素。

17.1 fromArray图解

17.2 fromArray测试用例

测试代码:
private void doFromArray() {
        String[] dataSource = {"hello","rxjava"};
        Flowable.fromArray(dataSource).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(""+s);
            }
        });
    }

测试结果:
10-01 14:47:33.958 3843-3843/hq.demo.net I/System.out: hello
10-01 14:47:33.958 3843-3843/hq.demo.net I/System.out: rxjava

17.3 fromArray测试用例说明

测试用例说明通过fromArray操作符可以以一个数组创建一个Publisher,而数组中的元素就是Publisher发射的数据;

18 fromCallable

static <T> Flowable<T>

fromCallable(Callable<? extends T> supplier)

返回一个Flowable,当订阅者订阅它时,调用指定的函数,然后发出从该函数返回的值。

18.1 fromCallable图解

18.2 fromCallable测试用例

测试代码
private void doFromCallable() {
        Flowable.fromCallable(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "国庆节快乐!";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }


测试结果
10-01 15:02:06.183 4374-4374/hq.demo.net I/System.out: 国庆节快乐!

18.3 fromCallable

其实fromCallable就是一个创建Flowable的操作符,被创建的Flowable发射的值就是fromCallable指定的函数的返回值

19 fromFuture

static <T> Flowable<T>

fromFuture(Future<? extends T> future)

将Future转换为Publisher。

static <T> Flowable<T>

fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)

将Future转换为Publisher,并可以设置Future的超时时间。

static <T> Flowable<T>

fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)

将Future转换为Publisher,并在Future上超时。

static <T> Flowable<T>

fromFuture(Future<? extends T> future, Scheduler scheduler)

将在指定调度程序上运行的Future转换为Publisher。

19.1 fromFuture图解

同from

19.2 fromFuture测试用例

测试代码
private void doFromFuture() {
        Flowable publisher = Flowable.fromFuture(new Future<String>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return false;
            }

            @Override
            public String get() throws InterruptedException, ExecutionException {
                return "future is good";
            }

            @Override
            public String get(long timeout, @NonNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                System.out.println("timeout is " + timeout);
                
                return "future is good, but time too long";
            }
        },3, TimeUnit.SECONDS);

        publisher.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("value is " + s);
            }
        });

    }


测试结果
10-01 15:25:28.716 5369-5369/hq.demo.net I/System.out: timeout is 3
10-01 15:25:28.716 5369-5369/hq.demo.net I/System.out: value is future is good, but time too long

如果去掉 timeout和时间单位使用另一个重载方法得到的结果是


10-01 15:25:29.434 5369-5369/hq.demo.net I/System.out: value is future is good

20 fromIterable

static <T> Flowable<T>

fromIterable(Iterable<? extends T> source) 

将Iterable序列转换为Publisher,其发出的项目便是这个序列中的元素。Iterable序列指的其实就是集合,Collection 继承了 Iterable接口。

20.1 fromIterable图解

同from图解

20.2 fromIterable测试用例

测试代码
 private void doFromIterable() {
        List<String> values = new ArrayList<>();
        values.add("Hello");
        values.add("RxJava");
        Flowable.fromIterable(values)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("" + s);
                    }
                });
    }


测试结果:
10-01 15:40:52.636 5810-5810/hq.demo.net I/System.out: Hello
10-01 15:40:52.636 5810-5810/hq.demo.net I/System.out: RxJava

21 fromPublisher

static <T> Flowable<T>

fromPublisher(Publisher<? extends T> source)

如果不是Flowable,则将任意Reactive-Streams Publisher转换为Flowable。

21.1 fromPublisher图解

21.2 fromPublisher测试用例

21.3 fromPublisher说明

fromPublisher操作符就是将其他Publisher的子类转换成Flowable

22 generate

static <T,S> Flowable<T>

generate(Callable<S> initialState, BiConsumer<S,Emitter<T>> generator)

返回一个冷,同步,有状态和背压感知的值生成器。

static <T,S> Flowable<T>

generate(Callable<S> initialState, BiConsumer<S,Emitter<T>> generator, Consumer<? super S> disposeState)

返回一个冷,同步,有状态和背压感知的值生成器。

static <T,S> Flowable<T>

generate(Callable<S> initialState, BiFunction<S,Emitter<T>,S> generator)

返回一个冷,同步,有状态和背压感知的值生成器。

static <T,S> Flowable<T>

generate(Callable<S> initialState, BiFunction<S,Emitter<T>,S> generator, Consumer<? super S> disposeState)

返回一个冷,同步,有状态和背压感知的值生成器。

static <T> Flowable<T>

generate(Consumer<Emitter<T>> generator)

返回一个冷,同步,有状态和背压感知的值生成器。

22.1 generate测试用例

测试代码
private void doGenerate() {
        Flowable flowable  = Flowable.generate(new Consumer<Emitter<String>>() {
            @Override
            public void accept(Emitter<String> emitter) throws Exception {
                emitter.onNext("Hello God!");
            }
        });

        flowable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String v) throws Exception {
                System.out.println("" + v);
            }
        });
    }


测试结果
10-02 00:29:02.820 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:03.978 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 65535 lines
10-02 00:29:04.395 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 18800 lines
10-02 00:29:04.395 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:04.395 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:05.715 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 65535 lines
10-02 00:29:05.972 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 13601 lines
10-02 00:29:05.972 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:05.972 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:07.253 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 65535 lines
10-02 00:29:08.399 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 65535 lines
10-02 00:29:09.435 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 50905 lines
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 11 lines
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/chatty: uid=10080(hq.demo.net) identical 1 line
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:09.435 4300-4300/hq.demo.net I/System.out: Hello God!
10-02 00:29:28.752 4300-4300/hq.demo.net I/System.out: Hello God!
...还在继续输出不停止

22.2 generate 总结

generate操作符创建Flowable时,传入的其中一个参数是Consumer,其accept接口返回一个Emitter,通过它发射一个字符串“Hello God!”,如果下面没有flowable.subscribe(Consumer)订阅,是不会发射“Hello God!”,而且这里会不停的同步发射,产生背压现象,也就是为什么会又不停输出。

generate生成的Flowable具有背压敏感性

23 interval

static Flowable<Long>

interval(long initialDelay, long period, TimeUnit unit)

返回一个Flowable,它在初始化后延迟initialDelay开始发射数据,从0L开始,每间隔period时间发射一次,每次+1

static Flowable<Long>

interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

返回一个Flowable,它在初始化后延迟initialDelay开始发射数据,从0L开始,每间隔period时间发射一次,每次+1,通过传入的Scheduler来调度

static Flowable<Long>

interval(long period, TimeUnit unit)

返回一个Flowable,它在初始化后延迟period开始发射数据,从0L开始,每间隔period时间发射一次,每次+1,注意如果初始延迟时间不设置,会使用period,可以看源码

static Flowable<Long>

interval(long period, TimeUnit unit, Scheduler scheduler)

返回一个Flowable,它在初始化后延迟period开始发射数据,从0L开始,每间隔period时间发射一次,每次+1,通过传入的Scheduler来调度

23.1 interval图解

23.2 interval测试用例

测试代码

private void doInterval() {
        Flowable.interval(10,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("aLong = " + aLong);
            }
        });
    }

10-02 01:14:27.415 6768-7055/hq.demo.net I/System.out: aLong = 0
10-02 01:14:28.414 6768-7055/hq.demo.net I/System.out: aLong = 1
10-02 01:14:29.414 6768-7055/hq.demo.net I/System.out: aLong = 2
... ...每隔1秒钟会发射一次,第一个参数是初始化后,第一次发射延迟10秒,第二个参数是间隔时间,从0开始每隔1秒钟发射一次,数值增加1

23.2 interval测试用例说明

interval里面封装了一个Scheduler,可以执行调度,interval相关的操作符也就是做定时,重复操作用的,重载方法中也可以自定义一个Scheduler传进去。一旦使用这个操作符不做其他操作,它会不停运行。这里涉及到另外一个操作符take,后续会讲到。

interval的重载方法功能头差不多,只是设置不同。

24 intervalRange

static Flowable<Long>

intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)

返回一个Flowable,在初始化后延迟initialDelay开始发射数据,每间隔period发射一次,数据从start开始,每次+1,共发射count次

static Flowable<Long>

intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit,Scheduler scheduler)

返回一个Flowable,在初始化后延迟initialDelay开始发射数据,每间隔period发射一次,数据从start开始,每次+1,共发射count次,使用传入的调度器scheduler操作

24.1 intervalRange图解

与interval是一样的,只是限定了次数

24.2 intervalRange测试用例

测试代码
private void doIntervalRange() {
        System.out.println("######doIntervalRange#####");
        Flowable.intervalRange(5, 6, 3, 5,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("aLong = " + aLong);
            }
        });
    }


测试结果:
10-02 01:38:02.302 8242-8242/hq.demo.net I/System.out: ######doIntervalRange#####
10-02 01:38:05.372 8242-8317/hq.demo.net I/System.out: aLong = 5
10-02 01:38:10.371 8242-8317/hq.demo.net I/System.out: aLong = 6
10-02 01:38:15.370 8242-8317/hq.demo.net I/System.out: aLong = 7
10-02 01:38:20.371 8242-8317/hq.demo.net I/System.out: aLong = 8
10-02 01:38:25.370 8242-8317/hq.demo.net I/System.out: aLong = 9
10-02 01:38:30.370 8242-8317/hq.demo.net I/System.out: aLong = 10

25 just

just有10个重载方法,参数个数从1个到10个,返回的发布者将一个个将这1到10个项目发射出去

static <T> Flowable<T>

just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

将十个项目转换为发出这些项目的发布者。

25.1 图解 

25.2 测试用例

测试 
private void doJust() {
        Flowable.just(1, 2,3,4,5,6,7,8,9,10).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("accept value is "+ integer);
            }
        });
    }

测试结果:
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 1
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 2
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 3
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 4
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 5
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 6
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 7
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 8
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 9
10-02 20:15:02.435 11483-11483/hq.demo.net I/System.out: accept value is 10
10-02 20:15:05.459 11483-11576/hq.demo.net I/System.out: Done!

26 merge

static <T> Flowable<T>

merge(Iterable<? extends Publisher<? extends T>> sources)

将一个以源发布者为元素的集合合并为一个Publisher,而不进行任何转换

static <T> Flowable<T>

merge(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency)

将一个以源发布者为元素的集合合并为一个Publisher,而不进行任何转换,同时限制这些发布者的最大并发订阅数。

static <T> Flowable<T>

merge(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency, int bufferSize)

将一个源发布者的集合合并为一个Publisher,而不进行任何转换,同时限制这些发布者的最大并发订阅数,并且可以设置缓存大小

static <T> Flowable<T>

merge(Publisher<? extends Publisher<? extends T>> sources)

将一个可发布多个源发布者的发布者合并为单个Publisher,该发布者发布这些源发布者发出的项目,而不进行任何转换

static <T> Flowable<T>

merge(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency)

将一个可发布多个源发布者的发布者合并为单个Publisher,该发布者发布这些源发布者发出的项目,而不进行任何转换,同时限制这些发布者的最大并发订阅数。

static <T> Flowable<T>

merge(Publisher<? extends T> source1, Publisher<? extends T> source2)

将2个发布者合并为一个发布者,无需任何转换。

static <T> Flowable<T>

merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3)

将3个发布者展平为一个发布者,无需任何转换。

static <T> Flowable<T>

merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3,Publisher<? extends T> source4)

将4个发布者展平为一个发布者,无需任何转换。

26.1 merge图解

26.2 merge测试用例

测试用例
private void doMerge() {
        System.out.println("######doMerge#####");
        List<Publisher<Integer>> publishers = new ArrayList<>();
        publishers.add(Flowable.just(1,2));
        publishers.add(Flowable.just(9,10,11,12,13));
        publishers.add(Flowable.just(6,7));

        Flowable.merge(publishers).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("integer = " + integer);
            }
        });
    }

测试结果
10-02 01:52:11.088 8774-8774/hq.demo.net I/System.out: ######doMerge#####
10-02 01:52:11.153 8774-8774/hq.demo.net I/System.out: integer = 1
10-02 01:52:11.153 8774-8774/hq.demo.net I/System.out: integer = 2
10-02 01:52:11.153 8774-8774/hq.demo.net I/System.out: integer = 9
10-02 01:52:11.153 8774-8774/hq.demo.net I/System.out: integer = 10
10-02 01:52:11.153 8774-8774/hq.demo.net I/System.out: integer = 11
10-02 01:52:11.154 8774-8774/hq.demo.net I/System.out: integer = 12
10-02 01:52:11.154 8774-8774/hq.demo.net I/System.out: integer = 13
10-02 01:52:11.154 8774-8774/hq.demo.net I/System.out: integer = 6
10-02 01:52:11.154 8774-8774/hq.demo.net I/System.out: integer = 7

26.2 merge说明

可见merge操作符其实就是将n个Publisher连接起来合并成一个,并按照源发射顺序发射数据,merge参数中传入的Publisher参数形式或者是多个Publisher,或则是Publisher的集合等

27 mergeArray

与merge功能一样,只是传入数据的形式使用的是数组

static <T> Flowable<T>

mergeArray(int maxConcurrency, int bufferSize, Publisher<? extends T>... sources)

将以Publisher为元素的数组合并成一个Publisher,无需任何转换,同时限制对这些Publisher的并发订阅数量。

static <T> Flowable<T>

mergeArray(Publisher<? extends T>... sources)

将以Publisher为元素的数组合并成一个Publisher,无需任何转换,

27.1 mergeArray图解

28 mergeDelayError

static <T> Flowable<T>

mergeDelayError(Iterable<? extends Publisher<? extends T>> sources)

Flattens an Iterable of Publishers into one Publisher, in a way that allows a Subscriber to receive all successfully emitted items from each of the source Publishers without being interrupted by an error notification from one of them.

将一个以源发布者为元素的集合合并为一个Publisher,允许订阅者从每个源发布者接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断。

static <T> Flowable<T>

mergeDelayError(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency)

将一个以源发布者为元素的集合合并为一个Publisher,允许订阅者从每个源发布者接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断,并控制最大并发数

static <T> Flowable<T>

mergeDelayError(Iterable<? extends Publisher<? extends T>> sources, int maxConcurrency, int bufferSize)

将一个以源发布者为元素的集合合并为一个Publisher,允许订阅者从每个源发布者接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断,并控制最大并发数和缓存大小

static <T> Flowable<T>

mergeDelayError(Publisher<? extends Publisher<? extends T>> sources)

Flattens a Publisher that emits Publishers into one Publisher, in a way that allows a Subscriber to receive all successfully emitted items from all of the source Publishers without being interrupted by an error notification from one of them.

将以Publisher为发布源的Publisher中的所有Publisher合并成一个Flowable,使订阅者能够从所有Publisher接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断。

static <T> Flowable<T>

mergeDelayError(Publisher<? extends Publisher<? extends T>> sources, int maxConcurrency)

将以Publisher为发布源的Publisher中的所有Publisher合并成一个Flowable,使订阅者能够从所有Publisher接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断,并控制最大并发数

static <T> Flowable<T>

mergeDelayError(Publisher<? extends T> source1, Publisher<? extends T> source2)

将2个发布者合并为一个发布者,使订阅者能够从所有Publisher接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断

static <T> Flowable<T>

mergeDelayError(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3)

将3个发布者合并为一个发布者,使订阅者能够从所有Publisher接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断

static <T> Flowable<T>

mergeDelayError(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4)

将4个发布者合并为一个发布者,使订阅者能够从所有Publisher接收所有成功发出的项目,而不会被其中一个发布者的错误通知中断

28.1 mergeDelayError图解

28.2 mergeDelayError测试用例

这里不举例,只需要知道在merger的基础上增加延迟所有错误操作,不中断发送数据就可以了

28.3 mergeDelayError图说明

mergeDelayError在merge的基础上增加了对发射数据出错的处理,以免中断发射

29 never

static <T> Flowable<T>

never()

返回一个永远不会向订阅者发送任何项目或通知的Flowable。

29.1 never图解

29.2 never说明

目前还找不到好的例子来说明

30 range

static Flowable<Integer>

range(int start, int count) 返回一个Flowable,它发出指定范围内的整数序列。

30.1 range图解

30.2 range测试用例

测试代码
private void doRange() {
        System.out.println("######doRange#####");
        Flowable.range(4,3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("integer = " + integer);
            }
        });
    }


测试结果:
10-02 02:22:16.640 9733-9733/hq.demo.net I/System.out: ######doRange#####
10-02 02:22:16.715 9733-9733/hq.demo.net I/System.out: integer = 4
10-02 02:22:16.716 9733-9733/hq.demo.net I/System.out: integer = 5
10-02 02:22:16.716 9733-9733/hq.demo.net I/System.out: integer = 6

31 rangeLong

static Flowable<Long>

rangeLong(long start, long count) 返回一个Flowable,它在指定范围内发出一系列Longs。

31.1 rangeLong图解

31.1 rangeLong说明

与range一样,只是数值类型是Long

32 sequenceEqual

static <T> Single<Boolean>

sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2)

返回一个Single,比较每个Publisher成对发出的项目,看两个Publisher序列是否相同,根据是否全部两两相等,发出一个布尔值。

static <T> Single<Boolean>

sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T,? super T> isEqual)

返回一个Single,根据指定的相等函数成对地比较每个Publisher发出的项目,根据是否全部两两相等,发出一个布尔值。

static <T> Single<Boolean>

sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, BiPredicate<? super T,? super T> isEqual, int bufferSize)

返回一个Single,根据指定的相等函数成对地比较每个Publisher发出的项目,根据是否全部两两相等,发出一个布尔值,可设置缓存大小

static <T> Single<Boolean>

sequenceEqual(Publisher<? extends T> source1, Publisher<? extends T> source2, int bufferSize)

返回一个Single,比较每个Publisher成对发出的项目,看两个Publisher序列是否相同,根据是否全部两两相等,发出一个布尔值,可设置缓存大小

32.1 sequenceEqual图解

32.2 sequenceEqual测试用例

测试代码
private void doSequenceEqual() {
        System.out.println("######doSequenceEqual#####");
        Flowable.sequenceEqual(Flowable.just(9,10, 11), Flowable.just(9, 10, 11))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        System.out.println("aBoolean:" + aBoolean);
                    }
                });
        Flowable.sequenceEqual(Flowable.just(8,10, 11), Flowable.just(9, 10, 11))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        System.out.println("aBoolean:" + aBoolean);
                    }
                });

    }

测试结果
10-02 02:47:41.220 11096-11096/hq.demo.net I/System.out: ######doSequenceEqual#####
10-02 02:47:41.289 11096-11096/hq.demo.net I/System.out: aBoolean:true
10-02 02:47:41.290 11096-11096/hq.demo.net I/System.out: aBoolean:false

32.2 sequenceEqual说明

从测试用例可以知道,必须Publisher中所有的值相等才会返回true,数目不等,大小不等不都行,不过当指定比较的函数的时候另当别论,相等的条件由,指定函数决定。

33 switchOnNext

static <T> Flowable<T>

switchOnNext(Publisher<? extends Publisher<? extends T>> sources)

将发出FlowableFlowable转换为单个Flowable,该Flowable发出最近发出的FlowableS发出的项目

static <T> Flowable<T>

switchOnNext(Publisher<? extends Publisher<? extends T>> sources, int bufferSize)

将发出FlowableFlowable转换为单个Flowable,该Flowable发出最近发出的FlowableS发出的项目,可设置缓存大小.

33.1 switchOnNext图解

33.2 switchOnNext测试用例

测试代码
private void doSwitchOnNext() {
        System.out.println("######doSwitchOnNext#####");
        Flowable sourceFlowable = Flowable.just(Flowable.just(8,10, 11),Flowable.just(5,10, 11), Flowable.just(10,20, 30));
        Flowable.switchOnNext(sourceFlowable).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer value) throws Exception {
                System.out.println("value:" + value);
            }
        });
    }



测试结果
10-02 03:01:01.374 11544-11544/hq.demo.net I/System.out: value:8
10-02 03:01:01.374 11544-11544/hq.demo.net I/System.out: value:10
10-02 03:01:01.374 11544-11544/hq.demo.net I/System.out: value:11
10-02 03:01:01.375 11544-11544/hq.demo.net I/System.out: value:5
10-02 03:01:01.375 11544-11544/hq.demo.net I/System.out: value:10
10-02 03:01:01.375 11544-11544/hq.demo.net I/System.out: value:11
10-02 03:01:01.375 11544-11544/hq.demo.net I/System.out: value:10
10-02 03:01:01.377 11544-11544/hq.demo.net I/System.out: value:20
10-02 03:01:01.377 11544-11544/hq.demo.net I/System.out: value:30

33.3 switchOnNext解析

从测试结果上看switchOnNext实际上是将一系列的Publishers合成了一个,并且将最近他们发射的项目发射出去

34 switchOnNextDelayError

static <T> Flowable<T>

switchOnNextDelayError(Publisher<? extends Publisher<? extends T>> sources)

将发出FlowableFlowable转换为单个Flowable,该Flowable发出最近发出的FlowableS发出的项目,将发射中遇到的所有错误延迟到所有项目发射结束后

static <T> Flowable<T>

switchOnNextDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch)

Converts a Publisher that emits Publishers into a Publisher that emits the items emitted by the most recently emitted of 将发出FlowableFlowable转换为单个Flowable,该Flowable发出最近发出的FlowableS发出的项目,将发射中遇到的所有错误延迟到所有项目发射结束后,prefetch表示预取出来的项目个数

34.1 switchOnNextDelayError图解

同switch,不过中途遇到error不会中断

34.2 switchOnNextDelayError 测试用例

35 timer

static Flowable<Long>

timer(long delay, TimeUnit unit)

Returns a Flowable that emits 0L after a specified delay, and then completes.

返回在指定的延迟后发出0L的Flowable,然后完成。

static Flowable<Long>

timer(long delay, TimeUnit unit, Scheduler scheduler)

Returns a Flowable that emits 0L after a specified delay, on a specified Scheduler, and then completes.

35.1 timer

35.2 timer测试用例

测试代码
 private void doTimer() {
        System.out.println("######doTimer#####");
        Flowable.timer(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println("value:" + aLong);
                    }
                });
    }

测试结果
10-02 03:20:22.653 12464-12464/hq.demo.net I/System.out: ######doTimer#####
10-02 03:20:25.655 12464-12532/hq.demo.net I/System.out: value:0

时间间隔操作符timer,可以指定一段时间发送数据(固定值0L):

35.2 timer测试说明

测试用例会延迟3秒后发射数据,timer操作符有个重载方法可以接受一个参数Scheduler

36 unsafeCreate

static <T> Flowable<T>

unsafeCreate(Publisher<T> onSubscribe)

Create a Flowable by wrapping a Publisher which has to be implemented according to the Reactive-Streams specification by handling backpressure and cancellation correctly; no safeguards are provided by the Flowable itself.

通过正确处理背压和取消来包装必须根据Reactive-Streams规范实现的发布者来创建Flowable; Flowable本身没有提供任何保护措施。

36 unsafeCreate分析

unsafeCreate其实就是对Publisher进行一个包装然后返回一个正确处理了背压和取消

37 using

static <T,D> Flowable<T>

using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceDisposer)

Constructs a Publisher that creates a dependent resource object which is disposed of on cancellation.

构造一个创建依赖资源对象的Publisher,该对象在取消时被处理掉。

static <T,D> Flowable<T>

using(Callable<? extends D> resourceSupplier, Function<? super D,? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceDisposer, boolean eager)

如果已将disposeEagerly设置为true并且在终止之前未发生取消,则构造一个创建依赖资源对象的Publisher,该对象在终止之前处理。

37.1 using图解

37.1 using测试用例

 
private void doUsing() {
        System.out.println("######doUsing#####");
        Flowable.using(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello friend!";
            }
        }, new Function<String, Publisher<String>>() {
            @Override
            public Publisher<String> apply(String s) throws Exception {
                return Flowable.just(s + "Welcome to Beijing");
            }
        }, new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("accept:" + s);
            }
        });
    }


测试结果
10-02 14:36:08.677 22766-22766/hq.demo.net I/System.out: ######doUsing#####
10-02 14:36:08.740 22766-22766/hq.demo.net I/System.out: accept:Hello friend!Welcome to Beijing
10-02 14:36:08.740 22766-22766/hq.demo.net I/System.out: Hello friend!

37.2 using说明

其实using函数就是指定函数处理callback返回的值,返回一个新的Publisher

38 zip、zipArray、zipIterable

static <T,R> Flowable<R>

zip(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> zipper)

返回一个Flowable,它发出指定函数的运算结果,该函数会将Publisher集合中的Publisher的项目按顺序取出来按照函数组合运算,并返回一个结果,这个结果便是返回的Flowable将要发出的项目

static <T,R> Flowable<R>

zip(Publisher<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> zipper)

返回一个Flowable,它发出指定函数的运算结果,该函数会将Publisher中的Publisher的项目按顺序取出来,然后按顺序取出这些Publisher中的项目按照函数组合运算,并返回一个结果,这个结果便是返回的Flowable将要发出的项目

static <T1,T2,R> Flowable<R>

zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> zipper)

返回一个Flowable,它发出指定函数的运算结果,该函数会将两个Publisher中的项目按顺序取出来组合运算

static <T1,T2,R> Flowable<R>

zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> zipper, boolean delayError)

返回一个Flowable,它发出指定函数的运算结果,该函数会将两个Publisher中的项目按顺序取出来组合运算,通过设置delayError来决定是否延迟处理error

static <T1,T2,R> Flowable<R>

zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, BiFunction<? super T1,? super T2,? extends R> zipper, boolean delayError, int bufferSize)

返回一个Flowable,它发出指定的组合函数的元素结果,该函数应用于2个其他发布者按顺序发出的两个项目的组合,通过设置delayError来决定是否延迟处理error,以及设置bufferSize的大小

这里有该项的多个重载最多传入9个Publisher源进行指定运算,(source1 ... source9)

static <T,R> Flowable<R>

zipArray(Function<? super Object[],? extends R> zipper, boolean delayError, int bufferSize, Publisher<? extends T>... sources)

Returns a Flowable that emits the results of a specified combiner function applied to combinations of items emitted, in sequence, by an array of other Publishers.

返回一个Flowable,它发出指定函数zipper的运算结果,该函数会将Publisher数组中的Publisher取出来,并按顺序取出Publisher的项目来按照函数要求进行运算,并返回一个结果,这个结果便是返回的Flowable将要发出的项目

static <T,R> Flowable<R>

zipIterable(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> zipper, boolean delayError, int bufferSize)

返回一个Flowable,它发出指定函数的运算结果,该函数会将Publisher集合中的Publisher的项目按顺序取出来按照函数组合运算,并返回一个结果,这个结果便是返回的Flowable将要发出的项目,并可以通过设置delayError决定是否延迟处理error,设置bufferSize的大小

38 zip图解

38 zip测试用例

测试代码 
 private void donZipList() {
        List<Publisher<Integer>> publishers = new ArrayList<>();
        publishers.add(Flowable.just(1, 2));
        publishers.add(Flowable.just(9, 10, 11, 12, 13));
        publishers.add(Flowable.just(6, 7));
        Flowable.zip(Flowable.just(1, 2),Flowable.just(9, 10, 11, 12, 13),Flowable.just(6, 7), new Function3<Integer, Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2, Integer integer3) throws Exception {
                return integer + integer2 + integer3;
            }
        });
    }


测试结果:
10-02 13:22:48.300 20083-20083/hq.demo.net I/System.out: ######donZip#####
10-02 13:22:48.369 20083-20083/hq.demo.net I/System.out: result:16
10-02 13:22:48.369 20083-20083/hq.demo.net I/System.out: result:19

38 zip解析

多个Publisher压缩成单个的操作可以使用zip操作符,如果多个Publisher数量不同,则以少的为基准,可以使用Funtion来自定义zipper函数,

如上面的测试用例结算的是1+9+6= 6,2+10+7=17,以少的为基准

其他重载方法,以及zipArray、zipIterable跟上面测试用例原理一样


 

猜你喜欢

转载自blog.csdn.net/weixin_36709064/article/details/82919785