RxJava操作符——创建操作符

  1. just()
     
    /**
         * just()— 将一个或多个对象转换成发射这个或这些对象的一个Observable
         * just有10个重构方法,可以发射1-10个数据,数据是依次发射的
         * Observable:被观察着
         * observer:观察者
         * Observable.just:被观察者被观察者订阅了
         * Observable(被观察者).just(创建发射器,just直接将数据发送给被观察者)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void just(View view) {
            Observable.just(1, 2, 3, 4, 5, 6)
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.e("observer", "just:" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }

     
  2. create()
     /**
         * create()— 使用一个函数从头创建一个Observable
         * Observable:被观察着
         * observer:观察者
         * Observable.subscribe:被观察者被观察者订阅了
         * Observable(被观察者).create(创建发射器,被观察者在发射器里面发射内容)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void create(View v) {
    
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
    
                    for (int i = 0; i < 6; i++) {
                        e.onNext(i + "");
                    }
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.e("observer", "create:" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
        }
    

     
  3. fromIterable()
     /**
         * fromIterable()— 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,
         * 此方法会依次发送出去
         * Observable:被观察着
         * observer:观察者
         * Observable.just:被观察者被观察者订阅了
         * Observable(被观察者).fromIterable(创建发射器,fromIterable直接将数据发送给被观察者)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void fromIterable(View view) {
            List<String> iterable = new ArrayList<>();
            for (int i = 1; i <= 6; i++) {
                iterable.add("数据:" + i);
            }
            Observable.fromIterable(iterable)
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.e("observer", "fromIterable:" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
        }
  4. fromArray
    /**
         * fromArrray()— 将多个个Iterable, 一个Future, 或者多个个数组转换成一个Observable,
         * 此方法会依次发送出去,可以传入多个数据,然后依次发送出去
         * Observable:被观察着
         * observer:观察者
         * Observable.fromArrray:被观察者被观察者订阅了
         * Observable(被观察者).fromArrray(创建发射器,fromArrray直接将数据发送给被观察者)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void fromArrray(View view) {
            List<String> iterable = new ArrayList<>();
            for (int i = 1; i <= 6; i++) {
                iterable.add("数据:" + i);
            }
            Observable.fromArray(iterable, iterable, iterable)
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Observer<List<String>>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(List<String> value) {
                            Log.e("observer", "fromArrray:" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
        }
    
  5. timer()
      /**
         * timer()— 创建一个在给定的延时之后发射单个数据的Observable,数据为0
         * 第一个参数:延时时间,第二个参数:单位
         * Observable:被观察着
         * observer:观察者
         * Observable.timer:被观察者被观察者订阅了
         * Observable(被观察者).timer(创建发射器,timer延时设置时间后发射数据)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void timer(View view) {
            Observable.timer(2, TimeUnit.SECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.e("observer", "timer:" + aLong);
                        }
                    });
        }
  6. interval()
    /**
         * interval()— 创建一个按照给定的时间间隔发射整数序列的Observable
         * 第一个参数:延时时间,参数二:单位
         * take:执行次数
         * Observable:被观察着
         * observer:观察者
         * Observable.interval:被观察者被观察者订阅了
         * Observable(被观察者).interval(创建发射器,interval延时设置时间后发射数据)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void interval(View view) {
            Observable.interval(1, TimeUnit.SECONDS)
                    .take(3)
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.e("observer", "interval:" + aLong);
                        }
                    });
    
        }
    

  7. intervalRange()
     
     /**
         * intervalRange()— 创建一个按照给定的时间间隔发射整数序列的Observable
         * 参数一:起始发送值,参数二:发送次数,参数三:首次发送延迟时间
         * 参数四:每次发送时间间隔,参数五:时间单位
         * take:执行次数
         * Observable:被观察着
         * observer:观察者
         * Observable.intervalRange:被观察者被观察者订阅了
         * Observable(被观察者).intervalRange(创建发射器)
         * .线程调度器.subscribe(观察者接受数据)
         */
        public void intervalRange(View view) {
            Observable.intervalRange(2, 2, 3, 1, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.e("observer", "intervalRange:" + aLong);
                        }
                    });
    
        }
    

  8. range()
      /**
         * range()— 创建一个发射指定范围的整数序列的Observable,无延时发送数据
         * 参数一:起始发送值,参数二:结束发射值
         */
        public void range(View view) {
            Observable.range(2, 6)
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.e("observer", "range:" + integer);
    
                        }
                    });
    
        }

     
  9. empty()
     /**
         * 创建一个什么都不做直接通知完成的Observable
         */
        public void empty(View view) {
            Observable.empty()
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Object value) {
                            Log.e("observer", "value:" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("observer", "value:" + "onComplete");
                        }
                    });
    
        }


     
  10. error()
     /**
         * 创建一个什么都不做直接通知错误的Observable
         */
        public void error(View view) {
            Observable.error(new Exception())
                    .subscribeOn(Schedulers.io())//发射线程,被观察着执行线程
                    .unsubscribeOn(AndroidSchedulers.mainThread())//取消订阅所在执行线程
                    .observeOn(AndroidSchedulers.mainThread())//观察着执行线程
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Object value) {
                            Log.e("observer", "value:" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("observer", "value:" + "onComplete");
                        }
                    });
    
        }
    

猜你喜欢

转载自blog.csdn.net/luck_xiang/article/details/89491873