RxJava 学习使用——操作符
下面开始学习 Rxjava 的操作符,了解Rxjava 更对的用途,让Rxjava更好为我们所用。
废话不多说上码
1.RxJava最基本用法(不带操作符)
//创建被观察者
Observable observable=createObservable();
//创建观察者
Subscriber subscriber=createSubscriber();
mText.append("开始订阅,准备观察...\n");
//事实上,observable不止可以订阅subscriber,也可以订阅ActionX()
observable.subscribe(subscriber);
1.1 相关方法
private Subscriber createSubscriber() {
//创建观察者
Subscriber subscriber=new Subscriber<String>() {
@Override
public void onCompleted() {
mText.append("执行观察者中的onCompleted()...\n");
mText.append("订阅完毕,结束观察...\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
mText.append("执行观察者中的onNext()...\n");
mText.append(s+"...\n");
}
};
return subscriber;
}
1.2相关方法
private Observable createObservable(){
//创建被观察者,这是最正常的创建方法
Observable observable=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("一二三四五");
subscriber.onNext("上山打老虎");
subscriber.onNext("老虎一发威");
subscriber.onNext("武松就发怵");
subscriber.onCompleted();
}
});
//想要图方便,可以这样创建
//from(T[])
// String [] kk={"一二三四五","上山打老虎","老虎一发威","武松就发怵"};
// Observable observable=Observable.from(kk);
//或者这样
//just(T...)
// Observable observable=Observable.just("一二三四五","上山打老虎","老虎一发威","武松就发怵");
return observable;
}
2 操作符 map
Observable.from(number) //之前提到的创建Observable方法
.map(new Func1<Integer, Boolean>() {//Func1方法泛型中的<两个参数>
@Override
public Boolean call(Integer integer) {
mText.append("\n\n map() Integer--->Boolean");
return (integer<3);
}
})
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
mText.append("\n观察到输出结果:\n");
mText.append(aBoolean.toString());
}
});
2.1 发现map方法中new了一个Func1(之能是Func1),Func1的泛型中有两个参数类型,第一个是传入的类型,第二个是返回的类型。
2.2 我们可以在call方法中对 返回值进行控制了
3.线程调度
可以根据不同的步骤进行 线程的切换
Observable.create(new Observable.OnSubscribe<Drawable>(){
@Override
public void call(Subscriber<? super Drawable> subscriber) {
sb.append(" Observable.create(): 线程: "+Thread.currentThread().getName()+"\n\n");
Drawable dd=getResources().getDrawable(R.mipmap.gril);
subscriber.onNext(dd);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(new Func1<Drawable, ImageView>() {
@Override
public ImageView call(Drawable drawable) {
sb.append("map(): drawable -->imageview 的线程: "+Thread.currentThread().getName()+"\n\n");
ImageView img=new ImageView(RxSchuderActivity.this);
LinearLayout.LayoutParams params= new LinearLayout.LayoutParams(LinearLayout.LayoutParams.WRAP_CONTENT, LinearLayout.LayoutParams.WRAP_CONTENT);
img.setLayoutParams(params);
img.setImageDrawable(drawable);
return img;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<ImageView>() {
@Override
public void call(ImageView imageView) {
sb.append("call(): 线程: "+Thread.currentThread().getName()+"\n");
mText.setText(sb);
mLinearlayout.addView(imageView);
}
});
4.flatMap
将第一次的返回值,再进行一次订阅并处理,所以会看到两个Call方法
Observable.from(getSchoolClass())
.flatMap(new Func1<SchoolClass, Observable<Student>>() {
@Override
public Observable<Student> call(SchoolClass schoolClass) {
//将Student列表使用from方法一个一个发出去
return Observable.from(schoolClass.getStudents());
}
})
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
mText.append("打印单个学生信息:\n");
mText.append("name:"+student.name+" age: "+student.age+"\n");
}
});
5 合并
Observable.merge(obs1,obs2)
(两个任务合并执行,并且在全部处理完成后,显示结果。)
Observable obs1=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(2000);
subscriber.onNext(" aaa");
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.newThread());
Observable obs2=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(2100);
subscriber.onNext("bbb");
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.newThread());
Observable.merge(obs1,obs2)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
StringBuffer sb=new StringBuffer();
@Override
public void onCompleted() {
mText.append("两个任务都处理完毕!!\n");
mText.append("更新数据:"+sb+"\n");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
sb.append( s+",");
mText.append("得到一个数据:"+s+"\n");
}
});
}
6 RXJAVA的BINGDING
(将欢声控件原生与Rxjava 实现 点击 监听等)
RxTextView.textChanges(mEdit)
//在一次事件发生后的一段时间内没有新操作,则发出这次事件
.debounce(1000,TimeUnit.MILLISECONDS)
//转换线程
.observeOn(Schedulers.newThread())
//通过输入的数据,来匹配"数据库"中的数据从而提示。。
.map(new Func1<CharSequence, List<String>>() {
List<String> list=new ArrayList<String>();
@Override
public List<String> call(CharSequence charSequence) {
if (charSequence.toString().contains("1")){
for (int i=0;i<5;i++){
list.add("11"+i);
}
}
return list;
}
})
//由于我不想要listl列表,所以使用了flatMap来分解成一个一个的数据发送
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> strings) {
return Observable.from(strings);
}
})
//这里切换成主线程,不然没法操作组件
.observeOn(AndroidSchedulers.mainThread())
//这里做一些过滤动作
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return !mText.getText().toString().contains(s);
}
})
//订阅
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//这里展示提示数据
mText.append(s + "\n");
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.w("DDDDDDDD",throwable.getMessage().toString());
}
});
mBtn.setText("连续点击防误触");
RxView.clicks(mBtn)
//防误触(设定点击后500ms内丢弃新事件,或者说点击后500ms毫秒无响应)
.throttleFirst(500, TimeUnit.MILLISECONDS)
.subscribe(new Action1<Void>() {
//这就相当于OnClickListener中的OnClick方法回调
@Override
public void call(Void aVoid) {
mText.append("\n 防误触 测试 \n");
}
});
7 过滤 FILTER
(常用的请求过滤方式)
Integer[] integers={1,2,3,4,5,6,7,8,9,10};
Observable.from(integers)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%2!=0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
mText.append(integer.toString()+",");
}
});
8 take 与 doOnNext()
take(4) 获取过滤 后返回结果的 前4个,对这四个还可子进行部分获取
doOnNext() 在 onNext之前加一次操作。
Observable.from(number)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%2!=0;
}
})
//取前四个 结果
.take(4)
//取前四个中的后两个
.takeLast(2)
.doOnNext(new Action1<Integer>() { //在每次的返回前,在加一步
@Override
public void call(Integer integer) {
mText.append("before onNext()\n");
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
mText.append("onNext()--->"+integer+"\n");
}
});
9 interval 间隔执行任务 (当定时器来用)
//interval()是运行在computation Scheduler线程中的,因此需要转到主线程
mSubscription=Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
mText.setText(aLong+"");
}
});
取消订阅
if (mSubscription!=null && !mSubscription.isUnsubscribed()){
mSubscription.unsubscribe();
}
10 排序 (toSortedList)
.toSortedList() 还有其他API带研究
Observable.from(words)
.toSortedList()
.flatMap(new Func1<List<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(List<Integer> strings) {
return Observable.from(strings);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer strings) {
mText.append(strings+"\n");
}
});
11 CONNECT 模式
普通的模式
Observable observable= Observable.from(integer);
Action1 a1=new Action1<Integer>(){
@Override
public void call(Integer o) {
mText.append("观察者A 收到: "+o+"\n");
}
};
Action1 a2=new Action1<Integer>(){
@Override
public void call(Integer o) {
mText.append("观察者B 收到: "+o+"\n");
}
};
observable.subscribe(a1);
observable.subscribe(a2)
CONNECT 模式
(连接之后两个任务一任执行一次)
ConnectableObservable observable=
Observable.from(integer)
.publish();//将一个Observable转换为一个可连接的Observable
Action1 a1=new Action1<Integer>(){
@Override
public void call(Integer o) {
mText.append("观察者A 收到: "+o+"\n");
}
};
Action1 a2=new Action1<Integer>(){
@Override
public void call(Integer o) {
mText.append("观察者B 收到: "+o+"\n");
}
};
observable.subscribe(a1);
observable.subscribe(a2);
observable.connect();
12 timestamp 操作符
Observable.from(words)
.timestamp()
// .timestamp(Schedulers.io()) 可指定线程环境,如果指定到子线程,请在最后切换成主线程
.subscribe(new Action1<Timestamped<Integer>>() {
@Override
public void call(Timestamped<Integer> integerTimestamped) {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
mText.append("value: "+integerTimestamped.getValue()+" time: ");
mText.append(sdf.format(new Date(integerTimestamped.getTimestampMillis()))+"\n");
}
});