说明:
concat 将两个及以上的可观察者,进行有序的发射
Concatenates elements of each ObservableSource provided via an Iterable sequence into a single sequence
of elements without interleaving them.
方法预览:
//接收一个可观察者
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
//接收两个可观察者
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
//接收三个可观察者
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3)
//接收四个可观察者
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
//接收一个集合,如arrylist
public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
从重载的方法可以看出,concat可以接收1到4个可观察者,或者一个可观察者的集合
例子1:
public void concat(View view){
Observable observable1 = Observable.just(1,2,3,4,5,6);
Observable observable2 = Observable.just(7,8);
ArrayList list = new ArrayList();
Observable.concat(list)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.e("RxTest","onSubscribe");
}
@Override
public void onNext(Object o) {
Log.e("RxTest","onNext "+(int)o);
}
@Override
public void onError(Throwable e) {
Log.e("RxTest","onError "+e.getMessage().toString());
}
@Override
public void onComplete() {
Log.e("RxTest","onComplete");
}
});
}
打印结果:
onSubscribe
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
onNext 8
onComplete
可以看到发射的数据有序的,如果需要在数据全部发射完,再执行某个动作,可以在onComplete()方法中。
例子2,使用Arrylist传递:
public void concat1(View view){
ArrayList<Observable<Integer>> list = new ArrayList<>();
Observable observable1 = Observable.just(1,2);
Observable observable2 = Observable.just(3,4);
Observable observable3 = Observable.just(5,6);
Observable observable4 = Observable.just(7,8);
Observable observable5 = Observable.just(9,10);
list.add(observable1);
list.add(observable2);
list.add(observable3);
list.add(observable4);
list.add(observable5);
Observable.concat(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer o) throws Exception {
Log.e("RxTest","accept "+o);
}
});
}
打印结果:
accept 1
accept 2
accept 3
accept 4
accept 5
accept 6
accept 7
accept 8
accept 9
accept 10