RxJava2笔记(5)—操作符Creating

前言

本文主要过一遍RxJava的Creating操作符
Rx官网地址:http://reactivex.io/documentation/operators.html

正文

create

  • 该操作符是所有创建操作符的根,其他操作符都是基于此操作符来实现的。

    package zj.com.creating;

    import io.reactivex.Observable;
    import io.reactivex.ObservableEmitter;
    import io.reactivex.ObservableOnSubscribe;
    import io.reactivex.Observer;
    import io.reactivex.disposables.Disposable;

    public class Create {

        private static String TAG = "Create";

        public static void main(String[] args) {

            Observable.create(new ObservableOnSubscribe<String>() {

                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("RxJava " + TAG + " 操作符 ....  1");
                    emitter.onNext("RxJava " + TAG + " 操作符 ....  2");
                    emitter.onNext("RxJava " + TAG + " 操作符 ....  3");
                }

            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("RxJava " + TAG + "onSubscribe  __ ");
                }

                @Override
                public void onNext(String s) {
                    System.out.println("RxJava " + TAG + "onNext  __ " + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("RxJava " + TAG + "onError  __ ");
                }

                @Override
                public void onComplete() {
                    System.out.println("RxJava " + TAG + "onComplete  __ ");
                }
            });
        }

    }

运行结果:

RxJava CreateonSubscribe  __ 
RxJava CreateonNext  __ RxJava Create 操作符 ....  1
RxJava CreateonNext  __ RxJava Create 操作符 ....  2
RxJava CreateonNext  __ RxJava Create 操作符 ....  3

just

这个操作符相当于一个Create的变种,他相对Create操作符而言,写法更加简便。他提供了多个构造。


    package zj.com.creating;

    import io.reactivex.Observable;
    import io.reactivex.Observer;
    import io.reactivex.disposables.Disposable;

    public class Just {

        private static String TAG = "Just";

        public static void main(String[] args) {

            Observable.just(TAG + " 操作符 ....  3",TAG + " 操作符 ....  2",TAG + " 操作符 ....  3").subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("RxJava " + TAG + "onSubscribe  __ ");
                }

                @Override
                public void onNext(String s) {
                    System.out.println("RxJava " + TAG + "onNext  __ " + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("RxJava " + TAG + "onError  __ ");
                }

                @Override
                public void onComplete() {
                    System.out.println("RxJava " + TAG + "onComplete  __ ");
                }
            });
        }
    }

运行结果:


RxJava JustonSubscribe  __ 
RxJava JustonNext  __ Just 操作符 ....  3
RxJava JustonNext  __ Just 操作符 ....  2
RxJava JustonNext  __ Just 操作符 ....  3
RxJava JustonComplete  __ 

from

  • convert various other objects and data types into Observables
    将其他各式各样的对象和数据类型转换成Observables

  • 看看他有哪些实现方法

    可以看到挺多的,然后,这个方法跟rx1.0还不一样,Rx1.0是重载的形式,它这个2.0直接细化到了api方法名上面了。

  • 本例先从简单的入手,fromArray,fromIterable…,至于这个Feture这个入参,这牵涉到与”Runnable、Callable、Future、FutureTask”这一系列的内容,本文暂时不展开,等研究透彻之后再补上这块内容。

  • 示例:

    package zj.com.creating;

    import java.util.ArrayList;

    import io.reactivex.Observable;
    import io.reactivex.Observer;
    import io.reactivex.disposables.Disposable;

    public class From {

        private static String TAG = "From";

        public static void main(String[] args) {

            ArrayList list = new ArrayList();
            for (int i = 0; i < 7; i++) {
                list.add("星期" + i);
            }

            Observable.fromIterable(list).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("RxJava " + TAG + "onSubscribe  __ ");
                }

                @Override
                public void onNext(String s) {
                    System.out.println("RxJava " + TAG + "onNext  __ " + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("RxJava " + TAG + "onError  __ ");
                }

                @Override
                public void onComplete() {
                    System.out.println("RxJava " + TAG + "onComplete  __ ");
                }
            });

        }

    }

运行结果:


RxJava FromonSubscribe  __ 
RxJava FromonNext  __ 星期0
RxJava FromonNext  __ 星期1
RxJava FromonNext  __ 星期2
RxJava FromonNext  __ 星期3
RxJava FromonNext  __ 星期4
RxJava FromonNext  __ 星期5
RxJava FromonNext  __ 星期6
RxJava FromonComplete  __ 

Defer

  • do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
    在observer订阅之前(也就是调用subscribe方法之前)都不创建Observable对象,然后当产生订阅关系的时候,才会为每一个obser创新创建一个Observable。

下边我们来进行验证。

首先看到这个defer就一个方法,没有重载函数。它的入参是泛型Callable


    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
        ObjectHelper.requireNonNull(supplier, "supplier is null");
        return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
    }
  • 示例代码1:

        deferObservable = Observable.just(valueStr);

        valueStr = "22222222222";

        deferObservable.subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

                System.out.println(TAG + "________" + s);

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }

        });

运行结果:

Defer________1111111111111
  • 示例代码2:


        deferObservable = Observable.defer(new Callable<ObservableSource<String>>() {

            @Override
            public ObservableSource<String> call() throws Exception {

                return Observable.just(valueStr);

            }
        });

        valueStr = "22222222222";

        deferObservable.subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

                System.out.println(TAG + "________" + s);

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }

        });

运行结果:

Defer________22222222222

可以看出用defer,因为在subscribe之前,Observable并没有创建,所以对其进行赋值是有效的,所以输出结果是修改之后的22222。而直接用create或者just,在一开始初始化的时候Observable对象就已经创建了,所以再次对其赋值已经没用,所以最后输出的结果是一开始的111111。

Range

  • create an Observable that emits a particular range of sequential integers
    创建一个发射特定连续证书区间的Observable

  • range does not by default operate on any particular Scheduler, but there is a variant that allows you to set the Scheduler by passing one in as a parameter.
    range不会再特定的调度器上起作用,但是有个入参允许你将一个调度器作为参数传递进来设置

  • 可以看到range这个方法重载挺多

代码示例1:


        Observable.range(1, 10).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(Integer s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }
        });

输出:


    RxJava RangeonSubscribe  __ 
    RxJava RangeonNext  __ 1
    RxJava RangeonNext  __ 2
    RxJava RangeonNext  __ 3
    RxJava RangeonNext  __ 4
    RxJava RangeonNext  __ 5
    RxJava RangeonNext  __ 6
    RxJava RangeonNext  __ 7
    RxJava RangeonNext  __ 8
    RxJava RangeonNext  __ 9
    RxJava RangeonNext  __ 10
    RxJava RangeonComplete  __ 

代码示例2:

最复杂的一个method:


   /**
     * Signals a range of long values, the first after some initial delay and the rest periodically after.
     * <p>
     * The sequence completes immediately after the last value (start + count - 1) has been reached.
     * <p>
     * <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/intervalRange.s.png" alt="">     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>you provide the {@link Scheduler}.</dd>
     * </dl>
     * @param start that start value of the range
     * @param count the number of values to emit in total, if zero, the operator emits an onComplete after the initial delay.
     * @param initialDelay the initial delay before signalling the first value (the start)
     * @param period the period between subsequent values
     * @param unit the unit of measure of the initialDelay and period amounts
     * @param scheduler the target scheduler where the values and terminal signals will be emitted
     * @return the new Observable instance
     */

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        if (count < 0) {
            throw new IllegalArgumentException("count >= 0 required but it was " + count);
        }

        if (count == 0L) {
            return Observable.<Long>empty().delay(initialDelay, unit, scheduler);
        }

        long end = start + (count - 1);
        if (start > 0 && end < 0) {
            throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
        }
        ObjectHelper.requireNonNull(unit, "unit is null");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");

        return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
    }

看下参数:

     * //start ->range的start值
     * @param start that start value of the range
     * 
     * //count ->总共发射数值的总量,如果为0,就会在delay参数之后直接调用onComplete方法
     * @param count the number of values to emit in total, if zero, the operator emits an onComplete after the initial delay.
     * 
     * //initialDelay 首个值开始发射之前的延迟时间
     * @param initialDelay the initial delay before signalling the first value (the start)
     * 
     * //period 值之间的时间间隙
     * @param period the period between subsequent values
     * 
     * //unit 测量过后的延迟时间的单位以及周期总量
     * @param unit the unit of measure of the initialDelay and period amounts
     * 
     * //scheduler 指定调度器
     * @param scheduler the target scheduler where the values and terminal signals will be emitted

意思就是:延迟initialDelay个unit单位后,以period为周期,依次发射count个以start为初始值并递增的数字。

代码


    //从1开始输出10个数据,延迟0秒执行,每隔1秒执行一次:
    Observable.intervalRange(0, 11, 0, 1, TimeUnit.SECONDS/*,Schedulers.newThread()*/).subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("RxJava " + TAG + "onSubscribe  __ ");
        }

        @Override
        public void onNext(Long s) {
            System.out.println("RxJava " + TAG + "onNext  __ " + s);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("RxJava " + TAG + "onError  __ ");
        }

        @Override
        public void onComplete() {
            System.out.println("RxJava " + TAG + "onComplete  __ ");
        }

    });


    //这行是在main方法内才需要用到
    try {
        Thread.sleep(20000); // if this is missing then the JVM exits immediately and the new thread is stopped.
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

Empty/Never/Throw

Empty

这个操作符是直接调onComplete的。

  Observable.<String>empty().subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(String s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }
        });
RxJava EmptyonSubscribe  __ 
RxJava EmptyonComplete  __ 

Never
这个操作符是创建一个什么都不做的Observable。(但是一旦订阅会调onSubscribe)


     Observable.<String>never().subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(String s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }
        });

Throw
这个操作符是创建一个直接调onError的Observable。


   Observable.<String>error(new Callable<Throwable>() {
            @Override
            public Throwable call() throws Exception {
                return null;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(String s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }
        });

输出结果:

RxJava ThrowonSubscribe  __ 
RxJava ThrowonError  __ 

Interval

  • create an Observable that emits a sequence of integers spaced by a given time interval
    创建一个发送序列带时间间隔的Observable。

还记得我们刚才写了一个rangeInterverl吗? 类似于定时器那个,大同小异。


   Observable.interval(1,TimeUnit.SECONDS).subscribe(new Observer<Long>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(Long s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }

        });

这个方法会一直运行,直到当前线程被杀死。


RxJava IntervalonSubscribe  __ 
RxJava IntervalonNext  __ 0
RxJava IntervalonNext  __ 1
RxJava IntervalonNext  __ 2
RxJava IntervalonNext  __ 3
RxJava IntervalonNext  __ 4
RxJava IntervalonNext  __ 5
RxJava IntervalonNext  __ 6
RxJava IntervalonNext  __ 7
RxJava IntervalonNext  __ 8
RxJava IntervalonNext  __ 9
RxJava IntervalonNext  __ 10
RxJava IntervalonNext  __ 11

...

Repeat

repeate不会创建一个Observable,它是一个辅助操作符。譬如,一个just操作符创建对象并发射,然后这个.repeate(3),表示重复发射3轮。

    Observable.just(TAG + " 操作符 ....  3",TAG + " 操作符 ....  2",TAG + " 操作符 ....  1").repeat(3).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(String s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }
        });

输出结果:

RxJava RepeatonSubscribe  __ 
RxJava RepeatonNext  __ Repeat 操作符 ....  3
RxJava RepeatonNext  __ Repeat 操作符 ....  2
RxJava RepeatonNext  __ Repeat 操作符 ....  1
RxJava RepeatonNext  __ Repeat 操作符 ....  3
RxJava RepeatonNext  __ Repeat 操作符 ....  2
RxJava RepeatonNext  __ Repeat 操作符 ....  1
RxJava RepeatonNext  __ Repeat 操作符 ....  3
RxJava RepeatonNext  __ Repeat 操作符 ....  2
RxJava RepeatonNext  __ Repeat 操作符 ....  1
RxJava RepeatonComplete  __ 

Timer

  • 创建一个Observable,它在一个给定的延迟后发射一个值,需要注意,它只发射一次,所以有别于之前用的TimerTask。timer()只是用来创建一个Observable,并延迟发送一次的操作符,并不会按周期执行。

   Observable.<String>timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println(TAG + "_________" +"onSubscribe");
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println(TAG + "_____onNext____" +aLong);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(TAG + "_________" +"onError");
            }

            @Override
            public void onComplete() {
                System.out.println(TAG + "_________" +"onComplete");
            }
        });

输出:

Timer_________onSubscribe
Timer_____onNext____0
Timer_________onComplete

Start

这个start也不是一个创建对象的操作符,他就跟repeat一样,是一个辅助操作符。它主要是在发射之前插入一个我们定义好的元素在发射队列的头。

具体有这么几个方法:


    private static String TAG = "Start";

    public static void main(String[] args) {

        Observable.just(TAG + " 操作符 ....  3",TAG + " 操作符 ....  2",TAG + " 操作符 ....  3").startWith("乎乎").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("RxJava " + TAG + "onSubscribe  __ ");
            }

            @Override
            public void onNext(String s) {
                System.out.println("RxJava " + TAG + "onNext  __ " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("RxJava " + TAG + "onError  __ ");
            }

            @Override
            public void onComplete() {
                System.out.println("RxJava " + TAG + "onComplete  __ ");
            }
        });
    }

输出:

RxJava StartonSubscribe  __ 
RxJava StartonNext  __ 乎乎
RxJava StartonNext  __ Start 操作符 ....  3
RxJava StartonNext  __ Start 操作符 ....  2
RxJava StartonNext  __ Start 操作符 ....  3
RxJava StartonComplete  __ 

Thanks

https://blog.csdn.net/jdsjlzx/article/details/52912701
https://www.imooc.com/video/15533

Demo

想获取demo的朋友:

这个链接是需要3分的,分多的朋友想散分给小弟可以下载这个链接。小弟先谢啦。
https://download.csdn.net/download/user11223344abc/10404664

当然我也会给出免分的链接,让像我这样没分的朋友也可以下载demo,我的github:
https://github.com/zj614android/AopDemo

猜你喜欢

转载自blog.csdn.net/user11223344abc/article/details/80257235