目录
2.事件的发送与接收
鉴于网上有很多从源码角度深入理解 RxJava 的文章,这里就不再做过多重复的分析。我们直接用 RxJava 所提供的设计思想,来看如何实现自己的 RxJava。
众所周知,RxJava 采用的是观察者设计模式。由被观察者通知观察者自己的行为发生了变化,让观察者做出响应。在 RxJava 中,上游的 Observable 扮演了被观察者的角色,它能够发送事件,由下游的观察者 Observer 进行监听,在接收到事件后做出响应。
RxJava的发送和接收原理
来看一个简单的例子。
\\这里用的是 RxJava 1 的最后一个版本 1.3.8
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
});
使用 Observable.create()
方法创建了一个 Observable 对象,在 call 中调用了三次 subscriber.onNext()
和一次 subscriber.onCompleted()
。我们先不管 create()
方法中传入的参数以及 call()
方法中的 subscriber 是什么,我们可以看到在 subscribe()
方法中传入了一个 Subscriber,它是一个 Observer,相当于我们前面自己实现的 Callback,它定义了接受到事件时的响应。当我们调用了 subscribe()
的时候,上面 OnSubscribe 中的 call()
方法便会开始执行,事件便从上游发送出去了。一旦完成发送,下游的观察者会立即作出响应。可以这么理解,事件的发送是 onNext()
方法的调用,而事件的接收是 onNext()
方法的执行,它们是一个前后的逻辑关系。
那么回到 create()
方法中,Observable.OnSubscribe 是什么呢,它继承自一个名为 Action1 的接口。
public interface Action1<T> extends Action {
void call(T t);
}
Action1 中定义了一个 call()
方法,传入一个泛型参数,没有返回值,它的作用在于封装一个执行方法。事实上,在 RxJava 中还存在的许多同样命名的接口,Action0,Action2,Action3,Action4,它们的区别在于传入的参数个数不同,如 Action5 的定义是这样的。
public interface Action5<T1, T2, T3, T4, T5> extends Action {
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
}
Observable 在调用 create()
方法的时候,传入的 OnSubscribe 对象,会被存储在返回的 Observable 对象中,由于 OnSubscribe 中封装了事件的执行方法,所以在 Observable 调用 subscribe()
的时候,就可以通过这个 OnSubscribe 调用自身的 call()
方法。
弄懂了 RxJava 中的事件是如何产生和发送之后,我们就可以来实现自己的事件发送机制。
事件发送
我们先模仿定义一个 MyAction1 接口。
public interface MyAction1<T> {
void call(T t);
}
为了更加贴近 RxJava 的命名,我们重新定义一下 Callback,将其改名为 MyObserver,并添加 onNext()
和 onCompleted()
方法。
public interface MyObserver<T> {
void onNext(T t);
void onCompleted();
void onError(Throwable e);
}
将 AsyncJob 重命名为 MyObservable,同时将 start()
方法改为 subscribe()
方法。 由于我们要新增一些方法,所以它不再是一个抽象类。
public class MyObservable<T> {
private MyAction1<MyObserver<T>> action;
private MyObservable(MyAction1<MyObserver<T>> action){
this.action = action;
}
public void subscribe(MyObserver<T> myObserver) {
action.call(myObserver);
}
public static <T> MyObservable<T> create(MyAction1<MyObserver<T>> action){
return new MyObservable<T>(action);
}
}
可以看到,我们在构造函数里面接收了一个泛型为 MyObserver<T> 类型的 action 并保存。在调用 subscribe()
方法的时候,会调用 action.call()
方法,并传入一个 MyObserver 对象,它实现了对结果的回调。
我们还增加了一个 create()
的静态方法,接收一个 MyAction1<MyObserver<T>> 的参数,它返回了一个含有该 action 的 MyObservable 对象。事实上它只是调用了内部的构造函数,我们完全可以直接从外部调用 new MyObservable()
的方式去创建,但是为了和 RxJava 保持一致,我们采用声明一个静态方法 create()
的方式,并将构造函数声明为 private 。
接着我们来看怎么使用。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onNext(3);
myObserver.onCompleted();
}
});
通过调用 MyObservable.create()
方法传入一个匿名内部类 MyAction1
接收
接下来我们调用 subscribe()
方法。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onNext(3);
myObserver.onCompleted();
}
})
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
});
一切还是那么熟悉,唯一的不同是我们把 Callback 换成了 MyObserver 。
运行一下,输出结果如下:
onNext:1
onNext:2
onNext:3
onCompleted
操作符 just 的实现
RxJava 不仅支持单一事件的发送,还支持序列事件的发送,来看下面的例子。
Observable.just(1,2,3)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
});
调用 Observable.just()
方法顺序发送了 1,2,3 三个值,在调用 subscribe()
方法后同样会收到三次 onNext()
和一次 onCompleted()
的回调。
虽然外部隐藏了事件的发送,但是内部的执行原理依旧是不变的。
我们在 MyObservable 中新增一个 just()
方法。
//MyObservable.java
public static <T> MyObservable<T> just(Iterable<T> iterable) {
return new MyObservable<T>(new MyAction1<MyObserver<T>>() {
@Override
public void call(MyObserver<T> myObserver) {
for (T anIterable : iterable) {
myObserver.onNext(anIterable);
}
myObserver.onCompleted();
}
});
}
接收一个可迭代序列,同 craete()
方法一样,我们调用了构造函数,在匿名内部类 MyAction1 的 call()
方法中,遍历序列,调用 onNext()
方法,最后调用 onCompleted()
方法。
使用方式如下。
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
MyObservable.just(list)
.subscribe(new MyObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
});
跟 RxJava 不同的是,我们接收的是一个序列,而它可以直接接收多个相同类型的值。我们可以看下它的方法定义。
public static <T> Observable<T> just(T t1, T t2, T t3) {}
public static <T> Observable<T> just(T t1, T t2, T t3, T t4) {}
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5) {}
可以说是非常暴力了。但是内部实现是一样的,都是对这个序列进行遍历调用 onNext()
方法,最后再调用 onCompleted()
。
知道这个原理之后,我们就可以按照自己想要的方式自行定义我们的操作符,这里不做展开了。
3.映射
在前面 简单的映射 中,我们已经介绍了如何将一个 AsyncJob<T> 映射成一个 AsyncJob<R> 。
现在我们只需要对原来的 map()
修改一下,就能实现 MyObservable 的 map()
方法。
public <R> MyObservable<R> map(Func<T, R> func) {
final MyObservable<T> upstream = this;
return new MyObservable<R>(new MyAction1<MyObserver<R>>() {
@Override
public void call(MyObserver<R> myObserver) {
upstream.start(new MyObserver<T>() {
@Override
public void onNext(T t) {
myObserver.onNext(func.call(t));
}
@Override
public void onCompleted() {
myObserver.onCompleted();
}
@Override
public void onError(Throwable e) {
myObserver.onError(e);
}
});
}
});
}
Func 接口保持不变,唯一需要改变的是将 Callback 接口替换成新的 MyObserver 接口,实现对应的回调方法。
我们再来看一下使用。
MyObservable.create(new MyAction1<MyObserver<Integer>>() {
@Override
public void call(MyObserver<Integer> myObserver) {
myObserver.onNext(1);
myObserver.onNext(2);
myObserver.onNext(3);
myObserver.onCompleted();
}
})
.map(new Func<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
})
.subscribe(new MyObserver<String>() {
@Override
public void onNext(String string) {
System.out.println("onNext:" + string);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
});
我们在 create()
和 subscribe()
中间加入 map()
方法,在 call()
中实现了整形变量 integer 到 String 的转换。对于下游的 subscribe()
方法来说,调用它的主体已经从原来的 MyObservable<Integer> 类型转变为 MyObservable<String> 类型。
▷ 自己动手造一个 RxJava(一)—— 理解临时任务对象
▶ 自己动手造一个 RxJava(二)—— 事件的发送、接收与映射
▷ 自己动手造一个 RxJava(三)—— 线程调度