软件开发中,特别是图形化界面的软件,经常需要一种“观察”的关系,比如界面中的按钮需要观察用户对它的点击操作,进度条需要观察后台下载的情况并实时更新自己显示的进度,界面 1 显示的数据需要根据界面 2 输入的数据的变化而变化,由此前人总结出了一种设计模式:观察者模式。

观察者模式(Observer Pattern) 定义对象之间的一种一对多依赖关系,使得每当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新

在观察者模式中,有两个基本角色:Observable(被观察者),Observer(观察者),它们之间的关系是:观察者 –订阅–> 被观察者;被观察者 –通知–> 观察者;一个被观察者可以对应多个观察者,这种关系类似报社与用户的关系,报社是被观察者,用户是观察者,一个报社可以拥有多个用户,用户需要向报社订阅报纸,那么报社在有新刊出版时就会通知用户并把新刊送到用户家里

更多关于观察者模式介绍与示例代码,可以参考 这篇文章

RxJava 简介

ReactiveX 是 Reactive Extensions 的缩写,一般简写为 Rx,最初是 LINQ 的一个扩展,由微软的架构师 Erik Meijer 领导的团队开发,在 2012 年 11 月开源,Rx 是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx 库支持 .NET、JavaScript 和 C++,Rx 近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx 的大部分语言库由 ReactiveX 这个组织负责维护,比较流行的有 RxJava/RxJS/Rx.NET

总结起来,RxJava 就是是一个实现异步操作的库,它相对于传统的 AsyncTask 和 Handler 具有代码简介,功能强大的优势,可以将很复杂的业务逻辑用简洁的代码实现。网络上关于 RxJava 的介绍以及使用的文章有很多,比如 ReactiveX 文档中文版 是 RxJava 官方文档的中文翻译版,详细的介绍了 RxJava 和它的各种操作,给 Android 开发者的 RxJava 详解是一篇通俗易懂的 RxJava 入门文章,通读全文后能够基本掌握 RxJava 的使用,RxJava 系列 是知乎专栏系列,通过示例代码介绍了 RxJava 的具体使用。

目前 RxJava 已经发布了 2.x 版,相较于 1.x 有部分改进,语法也有一些不一样的地方,本文使用的是 RxJava 2.0.4 版

基础概念

如同观察者模式,在 RxJava 中有两个基本角色:Observable(被观察者),Observer(观察者),它们之间存在 subscribe(订阅) 的关系,订阅的内容是 Event(事件),下面通过代码来描述两者之间的关系

导入依赖

在 Android 平台使用 RxJava,需要导入 RxJava 和针对 Android 平台添加的 RxAndroid 这两个依赖

 
      
compile 'io.reactivex.rxjava2:rxjava:2.0.4'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

创建 Observable(被观察者)

Observable 即被观察者。按照报社和用户的关系理解,这里就是创建一个报社,并创建特定新刊然后通知订阅的用户

 
      
Observable<String> observable = Observable.create( new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.i(TAG, "Observable handle in Thread:" + Thread.currentThread().getName());
// do something...
e.onNext( "hello"); // 通知用户
e.onComplete();
}
});

创建 Observer(观察者)

Observer 接收来自被观察者的数据并根据需求处理,按照报社和用户的关系理解,这里就是用户收到报社寄过来的新刊

 
      
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubScribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
Log.i(TAG, "Observer handle in Thread:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError:" + e.toString());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
};

订阅

创建好 Observable 和 Observer 后,通过订阅将二者关联起来。当 Observable 产生事件后,会立即通知 Observer:

 
      
observable.subscribe(observer);

运行项目,得到输出结果

 
      
onSubScribe
Observable handle in Thread:main
onNext: hello
Observer handle in Thread:main
onComplete

通过分析可以看到,首先打印出 onSubScribe,这是在回调 onSubscribe 中产生的,这个回调方法是在 RxJava 2.x 新增的,用于取消 Observer 和 Observable 的连接,通过调用 d.dispose() 就能取消它们之间的关联,Observable 以后就不能接收到 Observable 发送的数据了,例如,修改上面 Observer 的代码,在 onSubscribe() 中调用 d.dispose() 方法:

 
      
public void onSubscribe(Disposable d) {
d.dispose();
Log.i(TAG, "onSubScribe");
}

得到的结果是

 
      
onSubScribe
Observable handle in Thread:main

可以看出,在 Observable 产生事件后,Observer 仅仅调用了其 onSubScribe() 方法,并没有调用 onNext() 方法,说明此时已经取消了与 Observable 的关联,以后不再接收这个 Observable 的事件。

在 Android 中,由于“主线程中不能进行耗时操作,以防界面卡顿出现 ANR”,就需要通过异步操作将耗时操作放在子线程中进行,然后将结果回调给主线程,在 RxJava 之前,Android 系统自带的 AsyncTask 和 Handler 就是为了解决这个问题而诞生的,但是它们的使用比较繁琐,如果遇到复杂的逻辑,代码会变得不易于阅读,而 RxJava 通过自上而下的串式代码,流程代码变得非常直观简洁,功能强大而操作简便。当然这主要归功于 RxJava 的线程控制,它可以很方便的指定某一步操作所在的线程,subscribeOn() 用于指定subscribe()发生时所在的线程,observeOn() 用于指定 Observer 回调发生时所在的线程:

 
      
observable
.subscribeOn(Schedulers.io()) // 在子线程中处理
.observeOn(AndroidSchedulers.mainThread()) // 在主线程中接收
.subscribe(observer);

运行结果:

 
      
onSubScribe
Observable handle in Thread:RxCachedThreadScheduler-1
onNext: hello
Observer handle in Thread:main
onComplete

通过输出可以看到 Observable 处理事件是在 “RxCachedThreadScheduler-1” 这个线程完成,而处理回调结果是在 “main” 这个线程中完成的。

更多方法

上面的例子是 RxJava 最基本的使用,在实际开发中,还有很多灵活的使用方法,这里简单总结下

简化订阅

创建一个 Observer,要重写它的 4 个方法,分别是 onSubscribe()onNext()onError()onComplete(),但是并不是所有需求都需要这 4 个方法,因此可以通过匿名内部类简化订阅的方法,在 RxJava 1.x 中,使用 Action 来完成,而在 RxJava 2.x 中,这一操作交给了 Consumer,示例代码:

 
      
observable.subscribe( new Consumer<String>() {
@Override
public void accept(String s) throws Exception { // 等同 onNext()
}
}, new Consumer<Throwable>() { // 等同 onError()
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() { // 等同 onComplete()
@Override
public void run() throws Exception {
}
});

以上 3 个在 subscribe() 中传入的匿名内部类,等同于在创建 Observer 时需要重写的 3 个方法,并且可以根据需要只重写其中某几个,这是因为 subscribe() 有不同参数的重载实现,在使用时根据不同的需求重写不同的重载就可以了。

事件队列

创建一个被观察者的事件,可以通过 Observable.create(),这也是上面代码中的创建方式,显然这种方式只能创建一个单独的事件,RxJava 还提供了可以创建一系列事件的方法,比如:

 
      
Observable<String> observable = Observable.just( "1", "2", "3");

通过 just() 这个方法创建了一个事件队列,将队列中的事件依次发送出去,just() 方法最多能够传入 10 个事件,如果有更多的事件,就可以使用fromArray(T...items) 或者 fromIterable(Iterable<? extends T> source) 来发送数组或者可迭代对象。

 
      
Observable.just( "1", "2", "3")
.subscribe( new Consumer<String>() { // 通过 Consumer 简化了订阅操作,只需要 onNext() 方法
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept: " + s);
}
});

运行结果:

 
      
accept: 1
accept: 2
accept: 3

可以看到,通过这种方式可以很灵活的组合多个事件,按照一定顺序发送,并且由于在 RxJava 2.x 中的 Observer 多了一个回调方法 onSubscribe(),更方便了根据业务逻辑决定接受哪些事件,不接受哪些事件。

线程控制

RxJava 中使用 Scheduler 来实现线程切换,默认所有事件的处理和回调都在同一个线程中,除非指明处理和回调的线程。Scheduler 内置了几个 Scheduler:

  • Schedulers.single(): 在主线程中执行操作,这是默认的 Scheduler
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
  • Schedulers.computation(): CPU 密集型计算所使用的 Scheduler
  • Schedulers.trampoline(): 当其他排队的任务完成后,在当前线程排队开始执行

操作符

RxJava 功能强大的原因在于它拥有很多操作符,不同的操作符组合起来就能完成各种个样的需求,这些操作符可以对事件序列进行变换加工,操作符包括:创建操作,变换操作,过滤操作,结合操作,错误处理,辅助操作,条件和布尔操作,算术和聚合操作,异步操作,连接操作,转换操作,阻塞操作,字符串操作,这些操作通过灵活的组合,可以实现非常强大的功能,这里以常用的 map 操作符和 flatMap() 操作符为例,介绍使用方法,其他更多的操作法的介绍和使用,可以参考 RxJava 官方文档中文版

Map 操作符

map() 函数接受一个 Function< inType,outType >(其中 inType 指上面传下来时的数据类型,outType 指输出去的数据类型) 类型的参数,然后把这个函数应用到每一个 Observable 发射的事件上(考虑事件队列),将发射的值转换成期望的值,例如将一组数字转换成字符串,可以这样实现:

 
      
Observable.just( 1, 2, 3, 4)
.map( new Function<Integer, String>() { //将输入的每个事件处理
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer); // 处理的具体逻辑是将 Integer 类型转换成 String 类型
}
})
.subscribe( new Consumer<String>() {
@Override
public void accept(String s) throws Exception { // 可以看到这里回调传入的数据类型已经变成了 Stirng 类型
Log.i( "TAG", "accept: " + s);
}
});

Map 操作符的使用场景有很多,比如通过网络获取到 Json 数据,而需求只需要其中一部分,那么就可以用 Map 操作符在得到 Json 数据后对其解析并取出需要的部分。

FlatMap 操作符

floatMap() 函数将传入的事件包装成一个 Observable 对象,并让这个 Observable 自动发送事件,实现根据需求创建新的事件的功能,例如原本事件队列中只有一个事件“a,b,c,d,e,f,g”,现在要将他按照”,” 切分并发射出去,这样在事件队列中将增加 7 个事件,具体代码如下:

 
      
Observable.just( "a,b,c,d,e,f,g")
.flatMap( new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
//原本是一个事件,这里将其转换成一个拥有 7 个事件的 Observer 并触发
return Observable.fromArray(s.split( ","));
}
})
.subscribe( new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i( "TAG", "accept: " + s);
}
});

FlatMap 操作符的使用场景也很丰富,比如可以根据上层传下来的结果来创建新的不同事件,这样将事件的创建放在了运行中,使得很多业务逻辑可以方便的实现。

总结

以上是对 RxJava 基础使用的介绍,RxJava 相对于传统的异步操作方法,最大的特点就是简洁且功能强大,灵活使用 RxJava 可以解决很多复杂的实际需求,下面通过一个小例子来看看 RxJava 在实际应用中的效果。

需求:通过对我校(黑龙江大学)校园应用门户网站的抓包分析,掌握了学生用户的登录流程,编写一个 Android 应用模拟登录校园应用门户网站。

登录流程如下:

  1. 访问校园网主页,获取 Cookie 并保存
  2. GET 请求获取登录页面的图片验证码
  3. 向自建的服务器发送图片验证码,得到解析结果(验证码识别过程)
  4. POST 请求带上主页 Cookie,用户名,密码以及验证码字符串提交登录
  5. 对返回结果分析判断是否登录成功

以上流程如果使用传统的 HTTPClient 操作,一个回调嵌套着另一个回调,代码会稍微复杂,而如果使用 RxJava + Retrofit 组合,代码流程就跟需求流程一样,从上往下一步接着一步,这里先给出最终完成的代码,以示代码的简洁

 
      
public void login(HttpResultListener<Boolean> listener,final String uid,final String pwd) {
xywCookieService()
.hljuIndex() //访问应用门户首页,获取 Cookie 并保存
.flatMap( new Func1<ResponseBody, Observable<ResponseBody>>() { // 请求验证码
@Override
public Observable<ResponseBody> call(ResponseBody body) {
return captchaService().requestCaptcha();
}
})
.flatMap( new Func1<ResponseBody, Observable<String>>() { // 解析验证码
@Override
public Observable<String> call(ResponseBody body) {
return parseCaptchaObservable(body);
}
})
.flatMap( new Func1<String, Observable<Boolean>>() { // 登录校园网
@Override
public Observable<Boolean> call(String s) {
return loginHLJUObservable(uid, pwd, s);
}
})
.map( new Func1<Boolean, Boolean>() { // 登录结果
@Override
public Boolean call(Boolean aBoolean) {
return aBoolean;
}
})
.retry( 3)
.subscribeOn(Schedulers.io()) // 后台线程执行
.observeOn(AndroidSchedulers.mainThread()) // 结果在主线程接收
.subscribe( new HttpResultSubscriber<>(listener));
}