去年这个时候刚刚接触RxJava整理的笔记,先放上来,有空再改改。
Rxjava学习笔记
(一)简单入门
1.Rxjava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
优点是简洁,并且随着程序逻辑变得越来越复杂,它依然能够保持简洁。
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。Rxjava基本原理就是观察者(ObServer)通过(Subscribe)订阅被观察者(Observable),看一个简单的例子就像是一个button的点击事件一样
Button的点击事是button.setOnClickListener(newOnClickListener()),buttont通过setOnClickListener()来告知OnClickListener正在点击,然后实现OnClickListener中的onclick()方法。
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
Rxjava的代码模式就是observable.subscribe(observer)与button点击事件相似。
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,所以也可以写成observable.subscribe(subscriber )。只不过Observer相对Subscriber少了onStart()和unsubscribe()两种方法。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
2创建观察者Observer
Observer<String> observer = newObserver<String>() {
@Override
public void onCompleted() {
//任务完成时出发
}
@Override
public void onError(Throwable e) {
//队列发生异常时出发
}
@Override
public void onNext(String s) {
//普通事件回调
}
}
3.创建被观察者Observable
Observable observable =Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(string);
}
})
4.通过Subscibe完成双方之间的订阅关系进行关联
observable.subscribe(observer)
Observable.subscribe(Subscriber) 的内部实现的核心代码:
public Subscription subscribe(Subscribersubscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber() 做了3件事:
(1)调用Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
(2)调用 Observable 中的OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
(3)将传入的 Subscriber 作为Subscription 返回。这是为了方便 unsubscribe()
unsubscribe()是Subscriber 实现的另一个接口Subscription中的方法,在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
(二)继续深入
1.Create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:
(1) .just(T...)//将传入的参数依次发送
Observable observable = Observable.just(“str1”,”str2”,”str3”)
(2) .from(T[])/.from(Iterable<?extends T>) //将传入的数组或 Iterable 拆分成具体对象后,依次发送出来
String[] strs = {"小明", "小红", "小芳"};
Observable observable = Observable.from(strs);
或者ArrayList<String> strLists = new ArrayList<>();
strLists.add("小明");
strLists.add("小明1");
strLists.add("小明2");
Observable observable = Observable.from(strLists);
2.除了subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava会自动根据定义创建出 Subscriber 。形式如下:
Action1<String>onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable>onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction,onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction,onErrorAction, onCompletedAction);
3.变换
1)操作符
操作符就是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件。RxJava提供了很多很有用的操作符。
(1)map
事件对象的直接变换
例子:
Observable.just("Hello")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " ——By xj";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
})
如果不看map里面的内容,这段代码的作用就是打印just传入的string“hello”,那么加上map就是将原来的“hello”转换成另一个字符串“hello——By xj”然后再打印出来。这个地方涉及到了一个Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
这个地方写的例子是String任然转换成String,但是其实map是可以转换类型的,在call中处理转换。
map进阶:
首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。
实现方法:
Student[] students = new Student[2];
Student s1 = new Student();
Student s1 = new Student();
Student[0 ] = s1;
Student[1 ] = s2;
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name)
}
})
如果每个Student中都有一个ArrayList<Course>表示该学生所学课程集合,那么我们要打印出这个学生所学课程的名字的话我们可以在Subscriber的onNext中用for循环打印
实现如下:
Observable.from(students)
.subscribe(new Subscriber<Student>(){
@Override
public void onNext(Student student) {
List<Course> courses =student.getCourses();
for (int i = 0; i < courses.size();i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
});
问题:
这个地方为什么不能用map,在map中用for循环,然后Fun1中返回值类型为AssayList<Course>,然后在onNext中使用,除了更加复杂外,还有没有别的理由,是因为map一对一的原因吗。
如果我不想在 Subscriber 中使用 for 循环,而是希望Subscriber 中直接传入单个的 Course 对象,这个地方是不能用map的转换的,因为map是一对一的转换,这个地方要求的是一对多的转换。那么rxjava还提供了另一个操作符——flatMap()
(2)flatMap
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
returnObservable.from(student.getCourses());
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
});
flatMap() 和 map() 有一个相同点:
它也是把传入的参数转化之后返回另一个对象。
和 map() 不同的是,flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:
a. 使用传入的事件对象创建一个Observable 对象;
b. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
c. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
2)变换原理(没怎么看懂,但是网上说都不建议开发者自定义Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误)
变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。lift() 的内部实现核心代码如下:
public <R> Observable<R>lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现看起来和前面讲过的 Observable.subscribe() 一样,但实际上并不相同。
subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
当含有 lift() 时:
a.lift() 创建了一个 Observable 后,加上之前的原始Observable,已经有两个 Observable 了;
b.而同样地,新 Observable 里的新OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
c.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
d.而这个新 OnSubscribe 的 call() 方法中的onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的Observable 发出的事件,并在处理后发送给 Subscriber。
举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子:
observable.lift(newObservable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<?super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext(""+ integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
3) compose: 对 Observable 整体的变换
除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换
public class LiftAllTransformer implementsObservable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer>observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = newLiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
(三)线程控制——Scheduler
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。除了灵活的变换,RxJava 另一个优点就是线程的自由控制。
1. Scheduler 的 API
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
(1) Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
(2) Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
(3) Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
(4) Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
(5)另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
* subscribeOn(): 指定 subscribe() 所发生的线程,即Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
* observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
例子代码:
Observable.just(1, 2, 3,4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" +number);
}
});
如果使用map() flatMap() 等变换方法,可多次切换线程
因为observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 不一定是subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。例子:
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
2. Scheduler 的原理
subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()
subscribeOn() 和 observeOn() 都做了线程切换的工作,不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
3. doOnSubscribe()
Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行
而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe()。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
代码示例:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了
Rxjava学习笔记二
1)throttleFirst 防抖动,在一定时间间隔内丢弃新的事件,例如按钮点击事件防止快速连续点击
2)操作符fromCallable
fromCallable和just分别实现:
使用fromCallable操作符
public void run() {
Observable<String> o =Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return getMessage();
}
});
o.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getSubscriber());
}
使用just操作符
public void run() {
Observable<String> o =Observable.just(getMessage());
o.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getSubscriber());
}
使用fromCallable时,getMessage是在子线程执行,而just是在主线程执行。
使用Observable.fromCallable()方法有两点好处:
1.获取要发送的数据的代码只会在有Observer订阅之后执行。
2.获取数据的代码可以在子线程中执行。