RxJava学习使用——操作符

RxJava 学习使用——操作符

下面开始学习 Rxjava 的操作符,了解Rxjava 更对的用途,让Rxjava更好为我们所用。

废话不多说上码

1.RxJava最基本用法(不带操作符)

//创建被观察者
    Observable observable=createObservable();
//创建观察者
    Subscriber subscriber=createSubscriber();

    mText.append("开始订阅,准备观察...\n");
//事实上,observable不止可以订阅subscriber,也可以订阅ActionX()
    observable.subscribe(subscriber);

1.1 相关方法

 private Subscriber createSubscriber() {
    //创建观察者
    Subscriber subscriber=new Subscriber<String>() {
        @Override
        public void onCompleted() {
            mText.append("执行观察者中的onCompleted()...\n");
            mText.append("订阅完毕,结束观察...\n");
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(String s) {
            mText.append("执行观察者中的onNext()...\n");
            mText.append(s+"...\n");
        }
    };
    return  subscriber;
}

1.2相关方法

private Observable createObservable(){
    //创建被观察者,这是最正常的创建方法
    Observable observable=Observable.create(new Observable.OnSubscribe<String>(){
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("一二三四五");
            subscriber.onNext("上山打老虎");
            subscriber.onNext("老虎一发威");
            subscriber.onNext("武松就发怵");
            subscriber.onCompleted();
        }
    });
    //想要图方便,可以这样创建
    //from(T[])
//        String [] kk={"一二三四五","上山打老虎","老虎一发威","武松就发怵"};
//        Observable observable=Observable.from(kk);

    //或者这样
    //just(T...)
//        Observable observable=Observable.just("一二三四五","上山打老虎","老虎一发威","武松就发怵");

    return observable;
}

2 操作符 map

        Observable.from(number)           //之前提到的创建Observable方法
              .map(new Func1<Integer, Boolean>() {//Func1方法泛型中的<两个参数>
                  @Override
                  public Boolean call(Integer integer) {
                      mText.append("\n\n map()  Integer--->Boolean");
                      return (integer<3);
                  }
              })
              .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    mText.append("\n观察到输出结果:\n");
                    mText.append(aBoolean.toString());
                }
            });

2.1 发现map方法中new了一个Func1(之能是Func1),Func1的泛型中有两个参数类型,第一个是传入的类型,第二个是返回的类型。

2.2 我们可以在call方法中对 返回值进行控制了

3.线程调度

可以根据不同的步骤进行 线程的切换

  Observable.create(new Observable.OnSubscribe<Drawable>(){
        @Override
        public void call(Subscriber<? super Drawable> subscriber) {
           sb.append(" Observable.create(): 线程: "+Thread.currentThread().getName()+"\n\n");
            Drawable dd=getResources().getDrawable(R.mipmap.gril);
            subscriber.onNext(dd);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(Schedulers.newThread())
      .map(new Func1<Drawable, ImageView>() {
          @Override
          public ImageView call(Drawable drawable) {
              sb.append("map():  drawable -->imageview 的线程: "+Thread.currentThread().getName()+"\n\n");
              ImageView img=new ImageView(RxSchuderActivity.this);
              LinearLayout.LayoutParams params= new LinearLayout.LayoutParams(LinearLayout.LayoutParams.WRAP_CONTENT, LinearLayout.LayoutParams.WRAP_CONTENT);
              img.setLayoutParams(params);
              img.setImageDrawable(drawable);
              return img;
          }
      }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<ImageView>() {
            @Override
            public void call(ImageView imageView) {
                sb.append("call(): 线程: "+Thread.currentThread().getName()+"\n");
                mText.setText(sb);
                mLinearlayout.addView(imageView);

            }
        }); 

4.flatMap

将第一次的返回值,再进行一次订阅并处理,所以会看到两个Call方法

    Observable.from(getSchoolClass())
            .flatMap(new Func1<SchoolClass, Observable<Student>>() {
                @Override
                public Observable<Student> call(SchoolClass schoolClass) {
                    //将Student列表使用from方法一个一个发出去
                    return Observable.from(schoolClass.getStudents());
                }
            })
            .subscribe(new Action1<Student>() {
                @Override
                public void call(Student student) {
                    mText.append("打印单个学生信息:\n");
                    mText.append("name:"+student.name+"    age: "+student.age+"\n");
                }
            });

5 合并

Observable.merge(obs1,obs2)

(两个任务合并执行,并且在全部处理完成后,显示结果。)

    Observable obs1=Observable.create(new Observable.OnSubscribe<String>(){

        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                Thread.sleep(2000);
                subscriber.onNext(" aaa");
                subscriber.onCompleted();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).subscribeOn(Schedulers.newThread());

    Observable obs2=Observable.create(new Observable.OnSubscribe<String>(){

        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                Thread.sleep(2100);
                subscriber.onNext("bbb");
                subscriber.onCompleted();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).subscribeOn(Schedulers.newThread());

    Observable.merge(obs1,obs2)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>() {
                StringBuffer sb=new StringBuffer();
                @Override
                public void onCompleted() {
                    mText.append("两个任务都处理完毕!!\n");
                    mText.append("更新数据:"+sb+"\n");
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(String s) {
                    sb.append( s+",");
                    mText.append("得到一个数据:"+s+"\n");
                }
            });
}

6 RXJAVA的BINGDING

(将欢声控件原生与Rxjava 实现 点击 监听等)

        RxTextView.textChanges(mEdit)
            //在一次事件发生后的一段时间内没有新操作,则发出这次事件
            .debounce(1000,TimeUnit.MILLISECONDS)
            //转换线程
            .observeOn(Schedulers.newThread())
            //通过输入的数据,来匹配"数据库"中的数据从而提示。。
            .map(new Func1<CharSequence, List<String>>() {
                List<String> list=new ArrayList<String>();
                @Override
                public List<String> call(CharSequence charSequence) {

                    if (charSequence.toString().contains("1")){
                        for (int i=0;i<5;i++){
                            list.add("11"+i);
                        }
                    }
                    return list;
                }
            })
            //由于我不想要listl列表,所以使用了flatMap来分解成一个一个的数据发送
            .flatMap(new Func1<List<String>, Observable<String>>() {
                @Override
                public Observable<String> call(List<String> strings) {

                    return Observable.from(strings);
                }
            })
            //这里切换成主线程,不然没法操作组件
            .observeOn(AndroidSchedulers.mainThread())
            //这里做一些过滤动作
            .filter(new Func1<String, Boolean>() {
                @Override
                public Boolean call(String s) {
                    return !mText.getText().toString().contains(s);
                }
            })
            //订阅
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    //这里展示提示数据
                    mText.append(s + "\n");
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.w("DDDDDDDD",throwable.getMessage().toString());
                }
            });
    mBtn.setText("连续点击防误触");
    RxView.clicks(mBtn)
            //防误触(设定点击后500ms内丢弃新事件,或者说点击后500ms毫秒无响应)
            .throttleFirst(500, TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Void>() {
                //这就相当于OnClickListener中的OnClick方法回调
                @Override
                public void call(Void aVoid) {
                   mText.append("\n 防误触 测试  \n");
                }
            });

7 过滤 FILTER

(常用的请求过滤方式)

        Integer[] integers={1,2,3,4,5,6,7,8,9,10};
    Observable.from(integers)
            .filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer%2!=0;
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    mText.append(integer.toString()+",");
                }
            }); 

8 take 与 doOnNext()

take(4) 获取过滤 后返回结果的 前4个,对这四个还可子进行部分获取
doOnNext() 在 onNext之前加一次操作。

Observable.from(number)
              .filter(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer%2!=0;
                  }
              })
                //取前四个 结果
                .take(4)
                //取前四个中的后两个
                .takeLast(2)
                .doOnNext(new Action1<Integer>() {  //在每次的返回前,在加一步
                    @Override
                    public void call(Integer integer) {
                        mText.append("before onNext()\n");
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        mText.append("onNext()--->"+integer+"\n");
                    }
                });

9 interval 间隔执行任务 (当定时器来用)

        //interval()是运行在computation Scheduler线程中的,因此需要转到主线程
    mSubscription=Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      mText.setText(aLong+"");
                  }
              });

取消订阅

     if (mSubscription!=null && !mSubscription.isUnsubscribed()){
                     mSubscription.unsubscribe();
                 }

10 排序 (toSortedList)

.toSortedList() 还有其他API带研究

        Observable.from(words)
              .toSortedList()
               .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
                   @Override
                   public Observable<Integer> call(List<Integer> strings) {
                       return Observable.from(strings);
                   }
               })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer strings) {
                      mText.append(strings+"\n");
                  }
              });

11 CONNECT 模式

普通的模式

Observable  observable= Observable.from(integer);
    Action1 a1=new Action1<Integer>(){
        @Override
        public void call(Integer o) {
            mText.append("观察者A  收到:  "+o+"\n");
        }
    };
    Action1 a2=new Action1<Integer>(){
        @Override
        public void call(Integer o) {
            mText.append("观察者B  收到:  "+o+"\n");
        }
    };

    observable.subscribe(a1);
    observable.subscribe(a2)

CONNECT 模式
(连接之后两个任务一任执行一次)

     ConnectableObservable  observable= 
            Observable.from(integer)
            .publish();//将一个Observable转换为一个可连接的Observable

    Action1 a1=new Action1<Integer>(){
        @Override
        public void call(Integer o) {
            mText.append("观察者A  收到:  "+o+"\n");
        }
    };
    Action1 a2=new Action1<Integer>(){
        @Override
        public void call(Integer o) {
            mText.append("观察者B  收到:  "+o+"\n");
        }
    };

    observable.subscribe(a1);
    observable.subscribe(a2);
    observable.connect();

12 timestamp 操作符

        Observable.from(words)
            .timestamp()
           // .timestamp(Schedulers.io()) 可指定线程环境,如果指定到子线程,请在最后切换成主线程
            .subscribe(new Action1<Timestamped<Integer>>() {
                @Override
                public void call(Timestamped<Integer> integerTimestamped) {
                    SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
                    mText.append("value: "+integerTimestamped.getValue()+"       time:   ");
                    mText.append(sdf.format(new Date(integerTimestamped.getTimestampMillis()))+"\n");

                }
            });
发布了26 篇原创文章 · 获赞 6 · 访问量 7812

猜你喜欢

转载自blog.csdn.net/weixin_37558974/article/details/82597579