RxJava系列教程:
RxJava系列教程之介绍篇(一)
RxJava系列教程之创建篇(二)
RxJava系列教程之变换篇(三)
RxJava系列教程之过滤篇(四)
RxJava系列教程之线程篇(五)
序言
上篇我们介绍了rxjava的意义,和最基本的用法,好吧,其实那根本不算用法,而且写法很复杂,接下来我们来介绍下rxjava的简单创建方式。
被观察者篇
我们先来回顾下上期我们是怎么创建一个被观察者的:
// 被观察者
Observable<String> observable = Observable.create(new OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> sub)
{
}
});
感觉太多太大了,我们明明就只想传一个简单的数字1,就要写那么大的一片代码,好像有点麻烦啊,没关系,rxjava早已经帮我们做了很多常见操作的封装了。
所以,如果我们只是想传一些简单的参数,我们可以使用just
这个操作符,这是个创建操作符,可以给我们创建一个被观察者,并且你想使用的数据可以直接做为参数传递给just,他的内部会帮你做出一系列的行为,你甚至连onNext都不用写了,并且在行为完毕的时候会调用onCompleted,在执行过程中错误的时候也会调用onError。这里顺带提一句,onCompleted和onError这2个方法只会使用一次,如果onCompleted执行了onError就不会执行,反之亦然。
当我们知道了just这个创建操作符,我们就来用用:
Observable<Integer> observable = Observable.just(1,2,3);
这个结构看上去就爽多了有没有,既然这个just是直接返回一个Observable,那么我们直接来使用不就行了吗,干嘛还要那么创建那么多对象去控制流程,所以:
Observable
.just(1,2,3)
.subscribe(new Observer<Integer>()
{
@Override
public void onCompleted()
{
}
@Override
public void onError(Throwable arg0)
{
}
@Override
public void onNext(Integer i)
{
System.out.println(i);
}
});
是不是看上去要简单多了。但这个just
最多只能在里面传入10个类型相同的参数,如果我们要创建更多的参数呢。
有办法,来看看第二个创建操作符:from
,这个操作符允许在里面传递数组和集合,那么就不受just里面只能传递10个参数的限制了,现在你想传递多少就可以传递多少了,来,我们看看:
// 传递数组
Integer [] arr = {1,2,3};
Observable
.from(arr).subscribe(new Observer<Integer>()
{
@Override
public void onCompleted()
{
}
@Override
public void onError(Throwable arg0)
{
}
@Override
public void onNext(Integer arg0)
{
System.out.println(arg0);
}
});
// 传递集合
ArrayList<Integer> data = new ArrayList<>();
data.add(1);
data.add(2);
data.add(3);
Observable.from(data).subscribe(new Observer<Integer>()
{
@Override
public void onCompleted()
{
}
@Override
public void onError(Throwable arg0)
{
}
@Override
public void onNext(Integer arg0)
{
System.out.println(arg0);
}
});
这2中写法的结果其实都是一样的。
观察者篇
但是我们突发奇想,这个创建被观察者倒是简便了,但是这个观察者看上去有点麻烦啊,好大一坨啊,感觉头轻脚重的,那么观察者有简单的创建方法吗,说实话,这个观察者只有简化,还真没有简便化,怎么理解这个简化,首先我们来看看这个观察者里面的东西,有3个回调函数,分别是:
onNext
onError
onComplete
如果在写代码的时候,我们非常确定这块代码绝对不会报错,并且执行完毕之后也不需要做什么完成后的操作,也就是说我们只想做一个onNext,别的都不想做,这样可以吗?答案是肯定的,当然可以啦!
请容许我引入2个类:Action0
和Action1
,我们先来看看这2个类的区别,最直观的观察方式就是我们直接new一个看看。
new Action0()
{
@Override
public void call()
{
}
};
new Action1<String>()
{
@Override
public void call(String arg0)
{
}
};
我们发现这2个类都是个接口,而且和onNext、onError、onComplete的形状是有点相似的,所以,你懂的,我们再精确一下这2个类的具体外貌,会发现Action0
的回调其实和onComplete
是有点像的,回调的方法都没有参数,而Action1
和onNext
还有onError
是比较像的,回调的函数都有1个参数包含在回调方法中。东西我们已经有了,接下来我们就来看看到底如何使用,既然是在观察者这里做文章,那么我们就只能在订阅那块做文章了。
我们来看看被观察者的另外3中订阅方式:
subscribe(Action1<? super Integer> onNext)
subscribe(Action1<? super Integer> onNext, Action1(Throwable) onError)
subscribe(Action1<? super Integer> onNext, Action1(Throwable) onError, Action0 onComplete)
很明显有没有,我们可以选择只实现哪些方式,所以我们用Action的方式再将上面的例子重新玩一次:
// 传递数组
Integer [] arr = {1,2,3};
Observable
.from(arr).subscribe(new Action1<Integer>()
{
@Override
public void call(Integer arg0)
{
System.out.println(arg0);
}
});
你可以按照自己的需求从上面3中订阅方式中选择一种最合适的方式来实现你的功能,当然,如果你要选择第三种方式,还是直接new Observer
吧。
创建操作符
接下来是一些教科书式的创建操作符介绍:
1. repeat
这个操作符就是可以让事件进行多次订阅
Observable
.just(1,2,3)
.repeat()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
});
运行结果:
onNext: 3
onNext: 3
onNext: 3
onNext: 2
onNext: 2
onNext: 3
onNext: 2
onNext: 1
onNext: 3
onNext: 1
onNext: 3
onNext: 1
onNext: 3
onNext: 2
onNext: 3
onNext: 3
onNext: 2
onNext: 3
...
各种乱有木有,说明每组数字事件的发送是有一定时间间隔的,并且开始重复的时候,上一轮的数据不一定完全发送完毕,所以看上去会很乱,当然onCompleted可以无视了,不会执行的。
2. repeatWhen
这个操作符就是在一定条件下再重复一次事件,3秒后再执行一次
例子:
Observable
.just(1,2,3)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return Observable.timer(3, TimeUnit.SECONDS);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "call: " + integer );
}
});
运行结果:
onNext: 1
onNext: 2
onNext: 3
onNext: 1
onNext: 2
onNext: 3
onCompleted:
3. defer
只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable,我们先来看看订阅再来创建observable有什么好处,比如你要传一个单例进去,然后get各种值,如果在传递之前,该单例什么都里面什么都没有set,那么去get的时候也get不到任何东西,但是如果你在订阅前给这个单例set了不少值,那么在订阅后就可以得到那些值了,简单的例子:
Observable
.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(1, 2, 3);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
});
运行结果:
onNext: 1
onNext: 2
onNext: 3
onCompleted:
4. range
创建一个整数序列的Observable,有2个参数,第一个参数指开始数字,第二关参数指一共有多少个数字,比如我们要创建一个从3开始,一共有5个数字的整数序列的被观察者
Observable
.range(3, 5)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer );
}
});
运行结果:
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onCompleted:
5. interval
创建一个按照给定的时间间隔发射整数序列的Observable
例子:
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: "+aLong );
}
});
运行结果:
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
...
onCompleted还是忽视吧
6. timer
创建一个在给定的延时之后发射单个数据的Observable
Observable
.timer(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: " + aLong);
}
});
运行结果:
onNext: 0
onCompleted:
7. empty
创建一个直接完成的Observable
Observable
.empty().subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: " + o.toString() );
}
});
运行结果:
onCompleted:
8. error
创建一个直接异常的Observable
Observable
.error(new NullPointerException("测试空指针"))
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable throwable) {
Log.e(TAG, "onError: " + throwable.getMessage());
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: " + o.toString());
}
});
运行结果:
onError: 测试空指针
9. never
创建一个什么都不做的Observable
Observable
.never()
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable throwable) {
Log.e(TAG, "onError: " + throwable.getMessage());
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: " + o.toString());
}
});
运行结果:
好吧,没有
总结
Observable的创建方式:
1. just:传10个以内的数据
2. from:传数组或者集合
3. repeat:无限重复发送数据
4. repeatWhen:满足某个条件后再发送一次数据
5. defer:订阅后再创建被观察者
6. range:发送一个指定的整型序列
7. interval:按照给定时间间隔无限发送整型序列数据
8. timer:某一时延后发送一次
9. empty:创建一个直接完成的Observable
10. error创建一个直接异常的Observable
11. never创建一个什么都不做的Observable
12. create:(第一章讲的,最基础的创建)
Observer的另类实现方式:
Action0 -> onComplete
Action1 -> onNext、onError
下集预告:本章只是讲解了Observable和Observer的创建方式,其实,重头戏还是没有来,下一章将会讲述rxjava最牛逼的地方,变换,相信看了下一章,你对会rxjava跃跃欲试,敬请期待吧!