本文参考:https://www.jianshu.com/p/e4c6d7989356
3.8 疏通水缸的几种方法
水缸中存放的数据太多了,总是会堵的,也就是内存溢出,那有什么办法来避免这种情况?
(1)将要放入水缸中的数据只放一部分进去,这样就避免了水缸堵塞
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 1000 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: "+integer);
}
});
这里使用了filter过滤操作符。从上面的代码及结果图中可以看出,过滤后的内存占用好了很多,基本避免了OOM的情况。
也可以根据时间对数据进行采样选择,
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.sample(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: " + integer);
}
});
结果图如下所示:
这里使用了sample操作符,这个操作符每隔指定的时间就从上游中取出一个事件发送给下游,从结果图中也可以看出,并没有发生OOM等情况。
(2)降低发送速度,让下游有时间去处理
这个很容易理解,具体代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: " + integer);
}
});
从结果图中可以看出,内存占用很正常,没有发生OOM的情况
3.9 Flowable
Flowable可以看作是Observable的进化版,既然是进化版,那肯定有比Observable厉害的地方,它比Observable多了流速控制。既然Flowable进化了,Observer自然也会变,它进化成了Subscriber,但连接它们之间的依旧还是subscribe()。
先看一个正常的Flowable使用指南。
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
mSubscription = s;
mSubscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.i(TAG, "onError: "+ t);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
flowable.subscribe(subscriber);
1558356633.590 3979-3979/? I/RxJavaTestHaHa: onSubscribe:
1558356633.591 3979-3979/? I/RxJavaTestHaHa: onNext: 1
1558356633.591 3979-3979/? I/RxJavaTestHaHa: onNext: 2
1558356633.591 3979-3979/? I/RxJavaTestHaHa: onNext: 3
1558356633.591 3979-3979/? I/RxJavaTestHaHa: onComplete:
使用方法与Observable和Observer基本相似,但是也多了几行新的代码。
首先是在创建Flowabel时多了BackpressureStrategy.ERROR这个参数,就是选择背压的策略,现在选择的是在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。剩下的四种策略在以后会提到。
还有就是在接收端不再是Disposable了,而是Subscription。它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法。
这是因为Flowable在设计时采取了响应式拉取的方式,下游根据自身处理情况拉取数据。如果将代码中的request去掉会变成什么样子。
I/RxJavaTestHaHa: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
果不其然报错了,上游发送第一个事件后下游就抛出了MissingBackpressureException异常,这是因为下游没有调用request,上游就认为下游没有处理事件的能力,而这又是一个同步的订阅,既然下游处理不了,那上游不可能一直等待吧, 如果是这样,万一这两根水管工作在主线程里,界面不就卡死了吗,因此只能抛个异常来提醒我们。
在同步中,如果没有调用request方法会报错,那异步会怎么样,我们来试一下。
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.i(TAG, "subscribe: 1");
emitter.onNext(1);
Log.i(TAG, "subscribe: 2");
emitter.onNext(2);
Log.i(TAG, "subscribe: 3");
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.i(TAG, "onError: "+ t);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
I/RxJavaTestHaHa: onSubscribe:
I/RxJavaTestHaHa: subscribe: 1
I/RxJavaTestHaHa: subscribe: 2
I/RxJavaTestHaHa: subscribe: 3
在异步中,我们没有调用request方法,但是却没有报错,而且上游成功的发送了数据,这是因为在Flowable里默认有一个大小为128的水缸,当上下游工作在不同的线程中时,上游就会先把事件发送到这个水缸中, 因此,下游虽然没有调用request,但是上游在水缸中保存着这些事件,只有当下游调用request时,才从水缸里取出事件发给下游。
我们用两段代码来试一下这个大小是不是128。
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.i(TAG, "subscribe: " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.i(TAG, "onError: "+ t);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
Log.i(TAG, "subscribe: " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe: ");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.i(TAG, "onError: "+ t);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
果然是128,当发送的数据量为129时会报错。
I/RxJavaTestHaHa: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests