Rxjava2操作符入门
1. 概述:
用官网的一句话:”a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。也就是咱们常说的链式编程
2. Rxjava的好处
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁(本段内容摘自扔物线的博客)。
3. 观察者和被观察者
- Observable(被观察者)/Observer(观察者)
- Flowable(被观察者)/Subscriber(观察者) (2.0出现的支持背压)
4. 简单使用
发射源有多少个onNext就会发射多少次,onComplete 和 onError是冲突的两个方法,有你没我,有我没你
如果在onComplete或者onError调用OnNext方法不会再起作用
//创建被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
e.onNext(3);
//e.onError(new Throwable());
}
});
//创建观察者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG,"onSubscribe订阅了");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"onNext"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"onError");
}
@Override
public void onComplete() {
Log.i(TAG,"onComplete");
}
};
// 开始订阅
observable.subscribe(observer);
打印出来的Log为
onSubscribe订阅了
onNext1
onNext2
onComplete
4.1 Observer(观察者)中的方法
- onSubscribe(Disposable d)
当订阅到被观察者的时候调用 , Disposable 用来解除订阅的,防止内存泄漏 - onNext(T t)
被观察者发送OnNext方法的时候调用 - onComplete()
当被观察者调用onComplete方法时执行 - onError(Throwable e)
当被观察者调用onError方法时执行
4.2 subscribe()方法内可以传递的东西
Consumer <? super T> onNext 表示被观察者只关注onNext
Consumer <? super T> onNext, Consumer<? super Throwable> onError) 表示被观察者只关注onNext 和 onError
Consumer <? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) 表示被观察者只关注onNext 和 onError 和 onComplete
Consumer <? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer <? super Disposable> onSubscribe) 表示被观察者四个方法都关注
5. 大致的操作符分为以下几点:
当然了下面的也不是所有的操作符
创建型 | Create | Just | fromIterable | Timer | Interval |
转化型 | Map | FlatMap | Buffer | Scan | Window |
过滤型 | Filter | Distince | Skip | Take | Last |
组合型 | Zip | Join | And | Switch | Merge |
错误处理性 | Retry | Catch | |||
辅助型 | SubscribeOn | ObserveOn | Timer | Interval | DoOnNext |
条件和布尔 | All | SkipUntil | TakeUntil | Contains | Amb |
算数和聚合型 | Conact | Count | Max | Min | Sum |
连接型 | Connect | Publish | Replay | RefCount | |
异步操作 | Start | ToAsync | StartFuture | FromAction | FromCallable |
阻塞操作 | ForEach | Firsh | Last | MostRecent | Next |
字符串操作 | Split | Decode | Encode | Join | Form |
6. 创建型(Creating): 也就是创建 Observable (被观察者)
6.1 Create (表示只发送OnNext方法)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG,integer+"");
}
});
打印出来
12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1
12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 2
12-12 05:17:00.307 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3
6.2 just (将传入的参数依次发送出来)
Observable.just(1,2,3).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG,integer+"");
}
});
打印出来
12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1
12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 2
12-12 05:17:32.349 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3
6.3 fromIterable (将Iterable中的对象依次发送出去)
同样 fromArray 是将 数组 中的数据依次发送出去
ArrayList<String > arrayList = new ArrayList<>();
for(int i = 0;i<3;i++) {
arrayList.add(""+i+i);
}
Observable.fromIterable(arrayList).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG,s+"");
}
});
打印出来
12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 00
12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 11
12-12 05:17:46.458 19635-19635/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 22
6.4 Timer (它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法)
// DAYS,HOURS,MICROSECONDS,MILLISECONDS,MINUTES,NANOSECONDS,SECONDS;
// 天 小时 微秒 毫秒 分钟 纳秒 秒
final long start = System.currentTimeMillis();
Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
long end = System.currentTimeMillis();
Log.i(TAG,"时间差:"+(end-start)+"ms");
}
});
打印出来
12-12 05:20:22.483 20471-20629/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 时间差:1008ms
6.5 Interval (创建一个按固定时间间隔发射整数序列)
可以用来当做计时器,或者间隔性请求网络数据
Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG,""+aLong);
}
});
打印出来
12-12 05:25:35.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 0
12-12 05:25:36.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 1
12-12 05:25:37.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 2
12-12 05:25:38.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 3
12-12 05:25:39.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 4
12-12 05:25:40.379 21915-21971/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 5
...
...
...
6.6 repeat (创建一个重复发射特定数据的Observable)
可以用来当做计时器,或者间隔性请求网络数据
Observable.just(1).repeat(2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
mCount++;
Log.i(TAG,"第:"+mCount+"次"+"数据为:"+i);
}
});
打印出来
12-12 05:25:56.585 21915-21915/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 第:1次数据为:1
12-12 05:25:56.585 21915-21915/niezhiyang.cn.rxjava2_android_samples I/CreatActivity: 第:2次数据为:1
7. 既然讲了创建被观察者(Observable)和观察者(Observer), 那么先讲一下Schedulers线程调度器
如果Observable默认的是在主线程中,Observer默认跟随Observable的线程
- Schedulers.computation()
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- Schedulers.newThread()
开启一个新的线程
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName());
}
});
打印出来
12-13 06:59:25.085 11730-12398/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-2
12-13 06:59:25.086 11730-12573/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:RxNewThreadScheduler-1
- Schedulers.io()
主要用于一些耗时操作,比如读写文件,数据库存取,网络交互等。
这个调度器根据需要,增加或者减少线程池中的线程数量。需要注意的是Schedulers.i0()中的线程池数量是无限制大的,大量的I/0操作将创建许多线程,我们需要在性能和线程数量中做出取舍。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName());
}
});
打印出来
12-13 06:58:22.448 11730-12398/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-2
12-13 06:58:22.448 11730-12399/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:RxCachedThreadScheduler-3
- AndroidSchedulers.mainThread() Android中专用的,指定的操作在Android的主线程(UI线程中)运行
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Log.i(TAG, "发布的线程是:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "订阅的线程是:" + Thread.currentThread().getName());
}
});
打印出来
12-13 06:54:57.969 11730-11782/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 发布的线程是:RxCachedThreadScheduler-1
12-13 06:54:57.970 11730-11730/niezhiyang.cn.rxjava2_android_samples I/SchedulersActivity: 订阅的线程是:main