RxJava学习 - 5. Single, Completable, and Maybe
Single
Single实际上只发射一次。它有自己的SingleObserver接口:
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
onSuccess()合并了onNext()和onComplete(),只接受一个emission。当你使用subscribe(),可以提供onSuccess()的lambdas,也可以提供可选的onError():
import io.reactivex.Single;
public class Launcher {
public static void main(String[] args) {
Single.just("Hello")
.map(String::length)
.subscribe(System.out::println,
Throwable::printStackTrace);
}
}
一些Observable操作可以产生一个Single。比如,first()将返回一个Single,因为它只关注一个item。它接受默认值(下面的例子里的Nil),如果Observable是空的:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source = Observable.just("Alpha","Beta","Gamma");
source.first("Nil") //returns a Single
.subscribe(System.out::println);
}
}
Single必须有一个emission。最好不要使用Observable.just(“Alpha”),而应该使用Single.just(“Alpha”)。需要的时候,可以把Single转换成Observable,比如使用toObservable()。
如果有0个或者1个emission,你应该使用Maybe。
Maybe
Maybe允许不发射。MaybeObserver的定义如下:
public interface MaybeObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable e);
void onComplete();
}
Maybe把可能的emission传给onSuccess(),无论怎样,都会在完成的时候调用onComplete()。Maybe.just()可以用来增加一个Maybe,发射一个item。
Maybe.empty()增加的Maybe不产生emission:
import io.reactivex.Maybe;
public class Launcher {
public static void main(String[] args) {
// has emission
Maybe<Integer> presentSource = Maybe.just(100);
presentSource.subscribe(s -> System.out.println("Process 1 received: " + s),
Throwable::printStackTrace,
() -> System.out.println("Process 1 done!"));
//no emission
Maybe<Integer> emptySource = Maybe.empty();
emptySource.subscribe(s -> System.out.println("Process 2 received: " + s),
Throwable::printStackTrace,
() -> System.out.println("Process 2 done!"));
}
}
有些Observable操作能生成一个Maybe。一个例子是firstElement(),它类似first(),只是没有发射时返回一个空结果:
import io.reactivex.Maybe;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon");
source.firstElement().subscribe(
s -> System.out.println("RECEIVED " + s),
Throwable::printStackTrace,
() -> System.out.println("Done!"));
}
}
Completable
当某动作执行时,Completable不接收任何发射。它没有onNext()和onSuccess()方法,只有onError()和onComplete():
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
可能不经常使用Completable。可以使用Completable.complete()或者Completable.fromRunnable()快速构造它。前者立即执行onComplete(),
后者执行特定动作之后,再调用onComplete():
import io.reactivex.Completable;
public class Launcher {
public static void main(String[] args) {
Completable.fromRunnable(() -> runProcess())
.subscribe(() -> System.out.println("Done!"));
}
public static void runProcess() {
//run process here
}
}