RxJava学习 - 6. Disposing
当你subscribe()一个Observable打算接收emissions的时候,增加了一个流处理这些emissions。当然,这样做使用了资源。当我们这样做的时候,我们想处置(dispose of)这些资源,这样它们能被垃圾回收。有限的Observables调用onComplete()就是典型的安全处理办法。如果你用的是无限的或者长时间运行的Observables,你可能会希望明确地停止emissions,处置订阅相关的所有内容。事实上,你不再需要的活动的订阅,垃圾回收器是无能为力的,明确地disposal可以防止内存泄漏。
Disposable链接了Observable和活动的Observer,你能调用它的dispose()方法停止emissions,处置Observer使用的全部资源。它还有一个isDisposed()方法,
指示它是否已经被处理掉了:
package io.reactivex.disposables;
public interface Disposable {
void dispose();
boolean isDisposed();
}
当你提供onNext()、onComplete()和onError()的lambdas作为subscribe()方法的参数,它实际上返回一个Disposable。
你可以使用它,在任何时候,使用它的dispose()方法停止emissions。例如,你能在5秒钟以后停止接收Observable.interval()的emissions:
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);
Disposable disposable =
seconds.subscribe(l -> System.out.println("Received: " + l));
//sleep 5 seconds
sleep(5000);
//dispose and stop emissions
disposable.dispose();
//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这里,我们让Observable.interval()运行5秒,它有一个Observer,我们保存了subscribe()方法返回的Disposable。然后我们能调用这个Disposable的dispose()方法,停止处理,释放它使用的任何资源。然后,我们sleep了5秒,证明不再产生emissions。
Handling a Disposable within an Observer
通过onSubscribe()方法,在Observer的实现里传递Disposable。你可以实现自己的Observer,使用onNext()、onComplete()或者onError()访问Disposable。
它们调用dispose(),Observer不再想要emissions:
Observer<Integer> myObserver = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
}
@Override
public void onNext(Integer value) {
//has access to Disposable
}
@Override
public void onError(Throwable e) {
//has access to Disposable
}
@Override
public void onComplete() {
//has access to Disposable
}
};
上面的Disposable从源发射,沿着chain到Observer,所以,chain内的每一步都可以访问这个Disposable。
注意,把Observer传给subscribe()方法,是void,不返回Disposable,因为它假设这个Observer会处理它。如果你不想明确地处理这个Disposable,而是让RxJava处理它(这可能是个好主意,除非你有理由控制它),你能扩展ResourceObserver当作你的Observer,它使用默认的Disposable处理。把它传给subscribeWith(),你会得到默认的Disposable:
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.ResourceObserver;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> source =
Observable.interval(1, TimeUnit.SECONDS);
ResourceObserver<Long> myObserver = new
ResourceObserver<Long>() {
@Override
public void onNext(Long value) {
System.out.println(value);
}@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
//capture Disposable
Disposable disposable = source.subscribeWith(myObserver);
}
}
Using CompositeDisposable
如果你需要管理几个订阅,并处置它们,可以使用CompositeDisposable。它实现了Disposable,在内部保存了一个disposables的集合,
这样你能一次处置全部订阅:
import io.reactivex.Observable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;
public class Launcher {
private static final CompositeDisposable disposables = new CompositeDisposable();
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);
//subscribe and capture disposables
Disposable disposable1 =
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
Disposable disposable2 =
seconds.subscribe(l -> System.out.println("Observer 2: " + l));
//put both disposables into CompositeDisposable
disposables.addAll(disposable1, disposable2);
//sleep 5 seconds
sleep(5000);
//dispose all disposables
disposables.dispose();
//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Handling Disposal with Observable.create()
如果你的Observable.create()返回一个长时间运行的,或者无限的Observable,你应该经常检查ObservableEmitter的isDisposed()方法,看你是否能继续发射emissions。
下面的例子,你应该使用Observable.range(),但是为了做示例,就在Observable.create()里循环发射整数吧。在发射每个整数前,你应该确保ObservableEmitter指示没被调用disposal:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> source =
Observable.create(observableEmitter -> {
try {
for (int i = 0; i < 1000; i++) {
while (!observableEmitter.isDisposed()) {
observableEmitter.onNext(i);
}
if (observableEmitter.isDisposed())
return;
}
observableEmitter.onComplete();
} catch (Throwable e) {
observableEmitter.onError(e);
}
});
}
}
如果你的Observable.create()使用了资源,你应该处理资源的disposal,防止泄漏。ObservableEmitter有setCancellable()和setDisposable()方法。
在我们前面的例子里,当disposal发生时,应该从ObservableValue删除ChangeListener。我们能提供一个lambda来setCancellable(),在dispose()前,它将执行下面的动作:
private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable) {
return Observable.create(observableEmitter -> {
//emit initial state
observableEmitter.onNext(fxObservable.getValue());
//emit value changes uses a listener
final ChangeListener<T> listener =
(observableValue, prev, current) ->
observableEmitter.onNext(current);
//add listener to ObservableValue
fxObservable.addListener(listener);
//Handle disposing by specifying cancellable
observableEmitter.setCancellable(() ->
fxObservable.removeListener(listener));
});
}