来源:https://www.jianshu.com/p/1f4867ce3c01
1.什么是Fowable
Fowable也是被观察者,和Subscriber构成一对观察者和被观察者
2.为什么引入Fowable
在rxjava中经常会遇到一种情况就是被观察者发送消息十分迅速,导致观察者不能即时响应消息。例如:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { while (true){ e.onNext(1); } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); System.out.println(integer); } });
例子中被观察者无限产生事件,消费者每隔2秒消费一个事件,造成事件堆积,最后造成OOM,Fowable就是为了处理这个问题,这个现象称为Backpressure背压,即生产者生生的速度大于消费者消费的速度,Flowable就是为了解决Backpressure问题而产生的。
3.处理Backpressure的策略
处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方式,即使采用了Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度产生事件,主要处理的是Subscriber接收事件的方式
1. 什么情况下发生Backpressure
如果生产者和消费者在同一个线程情况下,每产生一个事件就会通知消费者,等消费者消费完毕,再产生下一个事件,所以同步情况下不存在Backpressure问题。
如果生产者和消费者不在一个线程,如果生产者速度大于消费者速度,就会产生backpressure问题,即在异步情况下。
2. 具体策略
ERROR:
在backpressure问题时直接抛出一个异常MissingBackpressureException
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); } }, BackpressureStrategy.ERROR); //增加了一个参数 Subscriber<Integer> subscriber = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
上述代码创建了一个Flowable和一个Subscriber,不同的是在onSubscribe(Subscription s)中传递过来的是Subscripiton而不再是disposable了,也是用于切断观察者和被观察者直接按的联系,调用Subscription.cancel方法即可,不同的是Subscription增加了一个void request(long n)方法,上述代码中:
s.request(Long.MAX_VALUE);
这个方法用来向生产者申请可以消费的事件数量,这个可以根据本身的消费能力进行消费事件,这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,你要去多少,我便传给你多少
在异步调用的时候,rxjava有个缓存池,用来缓存消费者暂时处理不了缓存下来的数据,缓存池的大小为128,只能缓存128个事件,无论request中传入的数字比128大还是小,缓存池中在刚开始都会存入128个事件,如果本身没有那么多事件就不会存128个,在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。
BUFFER
即把默认容量为128的缓存池换成一个大的缓存池,支持很多的数据,这样,消费者通过request传入一个很大的数据,生产者也会生产事件,并把处理不了的事件缓存,但是这种方式比较消耗内存,
DROP
当消费者处理不了的时候就丢弃,消费者通过request传入其需求n,然后生产着把n个事件传递给消费者供其消费,其他消费不掉的事件就丢弃。
LATEST
基本和DROP一致,消费者通过request传入需求n,然后生产者把n个事件传递给消费者供其消费,其他消费不掉的事件就丢弃,唯一的区别是LATEST总能使消费者能够接收到生产者产生的最后一个事件
除了flowable对象通过create对象自己创建,可以指定Backpressure策略外,如果Flowable不是自己创建的,可以采用onBackpressureBuffer onBackpressureDrop onBackpressureLatest方式指定。
例子:
Flowable.just(1) .onBackpressureBuffer() .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { } });