上一节我们学习了 RxJava中线程的切换,今天我们将开始学习它强大的操作符,有人会问,操作符是用来干嘛的?在我看来,操作符犹如rxjava的灵魂,它贯穿了被观察者和观察者的整个生命周期,从创建,到进行订阅,到数据传递,到取消订阅等环节。好了,接下来开始我们的学习。
创建操作符
create() 操作符
- 方法预览:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
- 作用:
创建一个被观察者
- 怎么用
Observable<String> mObservable = Observable.create(new ObservableOnSubscribe<String>() {
emitter.onNext(" 1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onComplete();
emitter.onNext(" 1");
emitter.onNext(" 2");
}
});
- 实战
Observable<String> mObservable = Observable.create(new ObservableOnSubscribe<String>() { // 1.创建 Observable对象
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext(" 1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onComplete();
emitter.onNext(" 1");
emitter.onNext(" 2");
}
});
Observer<String> mObserver = new Observer<String>() { // 2.创建observer
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
mObservable.subscribe(mObserver);// 3.开始调用
可以看到,通过create() 操作符,我们创建了一个Observable被观察者,重写subscribe() 方法通过 ObservableEmitter将数据发射出去。
just() 操作符
- 方法预览
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6) { }
- 作用及特点
创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
- 怎么用
Observable.just(1,2,3,4,5,6).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable=d;
Log.d(TAG, "onSubscribe: " + d.isDisposed());
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "onNext: 接收到的数据是:"+s);
if (s>3){
mDisposable.dispose();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
from操作符
RxJava2 中 from 操作符主要操作以下四个,它可以创建发送数组(array)、集合(Iterable) 以及异步任务(future)的 Observable。
//数组
public static <T> Observable<T> fromArray(T... items)
//集合
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//异步任务
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//异步任务+超时时间
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//异步任务+超时时间+线程调度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//异步任务+线程调度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
//Reactive Streams中的发布者,使用方式类似create操作符,事件的发送由发布者(被观察者)自行决定
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
fromArray() 操作符
- 方法预览
public static <T> Observable<T> fromArray(T... items) { ...}
-
作用及特点:
- 和just一样,可以发送数据,将数据以数组的形式进行装载,然后发送,可以发送的数据大于10个。
- 通过查看just()源码可知,just()发送数据本质上其实也是将数据装载在一个数组里,通过fromArray()发送出去,所以说,just()是fromArray() 在发送数据少于10个的情况下精简版。
@CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> just(T item1, T item2, T item3, T item4) { ObjectHelper.requireNonNull(item1, "item1 is null"); ObjectHelper.requireNonNull(item2, "item2 is null"); ObjectHelper.requireNonNull(item3, "item3 is null"); ObjectHelper.requireNonNull(item4, "item4 is null"); return fromArray(item1, item2, item3, item4); }
-
怎么用
String[] strArray = {"kotlin","java","python","C++"};
Observable<String>observable1=Observable.fromArray(strArray);
fromIterable() 操作符
- 方法预览
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {...}
- 作用
创建被观察者,直接发送集合给观察者。我们知道,既然发送的数据可以是单个的,也可以是多个分开发的,还可以是以数组的形式发送的,那么能不能直接发送集合呢,当然可以,通过 formIterable() 操作符就可以实现。
- 怎么用
List<String> list = new ArrayList<>();
list.add("kotlin");
list.add("java");
list.add("C++");
list.add("python");
Observable.fromIterable(list);
fromCallable()操作符
- 方法预览
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) { }
- 作用
Callable 位于 java.util.concurrent 包下,和 Runnable 类似,但是带有返回值,使用 fromCallable 发出的事件是从主线程发出的,接收 Observable 的发射值要使用 observeOn 切换到 Main 线程接收。
- 怎么用
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "测试一下";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext():"+"接收到的数据为"+s+"\n所在线程为:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
fromFuture()操作符
public static <T> Observable<T> fromFuture(Future<? extends T> future) {...}
from操作符开始的时候我们介绍过,fromFuture()有四个重载方法,分别是:
//异步任务
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//异步任务+超时时间
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//异步任务+超时时间+线程调度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//异步任务+线程调度器
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
对于fromFuture(),其实就跟Runnable作用差不多,但不同的是Runnable执行完毕后不会返回结果,而 fromFuture() 会将执行结果返回给观察者。
// 异步任务
FutureTask<Integer> mFutureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("---------开始执行任务------"+Thread.currentThread().getName());
Thread.sleep(3000); // 延时3秒以计算1+2+3+4+...并将结果返回
int sum = 0;
for (int i = 0; i <= 10; i++) {
sum += i;
}
return sum;
}
});
Observable.
fromFuture(mFutureTask)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mFutureTask.run();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("---------处理完毕并接收数据为:------"+integer+" "+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
System.out.println("---------处理失败了:------"+e+Thread.currentThread().getName());
}
@Override
public void onComplete() {
}
} );
}
在上面的程序中,我们延时了3秒进行数学计算,然后再将结果返回给已订阅的观察者。
好了,今天主要介绍这么多,下一篇将继续学习Rxjava其他强大的操作符。