RxJava学习 - 11. Switching, Throttling, Windowing, and Buffering
一个Observable的生产速度可能比Observer的消费速度快。并发的时候,Observable链的不同operators运行在不同的Schedulers上。慢操作的后面会排队,产生瓶颈。
可以使用Flowable处理瓶颈。但是,不是每个源都可以被backpressured。你不能让Observable.interval()(甚至Flowable.interval())变慢,因为这些emissions是时间敏感的。用户的输入事件,比如点击按钮,也是不能被backpressured。
幸好,下面介绍的这些operators,可以不使用backpressure应付快速产生的源,或者无法使用backpressure的时候。
Buffering
buffer()收集emissions,批量发射(一个list或者其他集合类型)。可以是固定的buffer,也可以是一个时间窗口,甚至被另一个Observable分割。
Fixed-size buffering
这个简单的例子,buffer()接受count参数,以固定尺寸,成批发射:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.buffer(8)
.subscribe(System.out::println);
}
}
输出是
[1, 2, 3, 4, 5, 6, 7, 8]
[9, 10, 11, 12, 13, 14, 15, 16]
[17, 18, 19, 20, 21, 22, 23, 24]
[25, 26, 27, 28, 29, 30, 31, 32]
[33, 34, 35, 36, 37, 38, 39, 40]
[41, 42, 43, 44, 45, 46, 47, 48]
[49, 50]
也可以提供第二个参数bufferSupplier,把元素放到一个自定义的集合,比如HashSet:
import io.reactivex.Observable;
import java.util.HashSet;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.buffer(8, HashSet::new)
.subscribe(System.out::println);
}
}
还可以提供一个skip参数,指定开始一个新的buffer前,要跳过多少元素。如果skip等于count,skip没有影响。
如果不相等,事情变得很有趣。比如,你buffer两个emissions,但是跳过三个:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 10)
.buffer(2, 3)
.subscribe(System.out::println);
}
}
输出是
[1, 2]
[4, 5]
[7, 8]
[10]
如果skip小于count,就得到一个有趣的rolling buffers。比如buffer的大小是3,skip是1:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 10)
.buffer(3, 1)
.subscribe(System.out::println);
}
}
输出是
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]
[8, 9, 10]
[9, 10]
[10]
下面的例子,使用buffer(2, 1)发射,然后使用filter()过滤掉最后一个:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 10)
.buffer(2, 1)
.filter(c -> c.size() == 2)
.subscribe(System.out::println);
}
}
Time-based buffering
buffer()也可以使用固定的时间间隔。下面的代码,源每300毫秒发射,每个缓冲的list包含3个或者4个元素:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.buffer(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
sleep(4000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
也有可选的timeskip参数,它控制每个buffer开始的时间。
还可以提供第三个参数count,控制buffer的最大size。无论到时间了,还是buffer满了,都会导致buffer发射。如果时间窗口关闭之前,buffer满了,会发射空的buffer:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.buffer(1, TimeUnit.SECONDS, 2)
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出是
[300, 600]
[900]
[1200, 1500]
[1800]
[2100, 2400]
[2700]
[3000, 3300]
[3600, 3900]
[]
[4200, 4500]
[4800]
基于时间的buffer(),使用computation Scheduler。
Boundary-based buffering
更强大的buffer()变种是接受另一个Observable作为boundary参数。其他Observable发射的类型不重要。重要的是,每当它发射,就开始另一个buffer。
比如前面的例子,我们使用每秒的Observable.interval()作为每300毫秒的Observable.interval()的边界:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> cutOffs =
Observable.interval(1, TimeUnit.SECONDS);
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.buffer(cutOffs)
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出是
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]
[4200, 4500, 4800]
Windowing
window()和buffer()几乎一样,只是它把元素缓冲进其他Observables,而不是集合。它返回一个Observable<Observable>。每个Observable emission会缓存emissions,订阅以后flush他们(很像GroupedObservable)。这样,emissions有效了就发射,而不是list满了才发射。
和buffer()一样,每个批也可以是固定大小的,时间间隔的,或者来自另一个Observable。
Fixed-size windowing
我们修改以前的例子,使用window()把50个整数缓冲进长度为8的list。我可以响应式地转换每个批次成为一个非集合,比如使用“|”级联的字符串:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.window(8)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
}
}
就像buffer(),可以提供skip参数,表示开始一个新窗口前,需要跳过多少emissions。下面的例子,窗口大小是2,跳过3个元素。然后接受每个窗口Observable,reduce成级联的字符串:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 50)
.window(2, 3)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
}
}
Time-based windowing
下面,有一个每300毫秒发射的Observable,每一秒切成一个分离的Observables,然后字符串级联:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.window(1, TimeUnit.SECONDS)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Boundary-based windowing
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> cutOffs =
Observable.interval(1, TimeUnit.SECONDS);
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.window(cutOffs)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println);
sleep(5000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Throttling
buffer()和window()把emissions分批放进集合或者Observables,这样做一般是合并而部署忽略emissions。而throttle(),emissions发生得太快时,就忽略一部分。当假设快速的emissions是冗余的或者不想要的时候,还是很有用的,比如重复点击button。
下面的例子,我们有三个Observable.interval()源,分别以100毫秒、300毫秒和2000毫秒的间隔发射。我们只接受第一个源的前十个元素,第二个的三个,第三个的两个元素。我们使用Observable.concat()把他们连到一起:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 100) // map to elapsed time
.map(i -> "SOURCE 1: " + i)
.take(10);
Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.map(i -> "SOURCE 2: " + i)
.take(3);
Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 2000) // map to elapsed time
.map(i -> "SOURCE 3: " + i)
.take(2);
Observable.concat(source1, source2, source3)
.subscribe(System.out::println);
sleep(6000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
接下来,我们使用throttle(),只选择其中一部分,忽略其他的。
throttleLast() / sample()
throttleLast()(它的别名是sample())只发射每个固定时间间隔之内的最后的元素。
Observable.concat(source1, source2, source3)
.throttleLast(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
输出是
SOURCE 1: 900
SOURCE 2: 900
SOURCE 3: 2000
可以看到,发射了每个一秒的最后一个emission。
如果想以更大的时间间隔节流,你会得到更少的emissions:
Observable.concat(source1, source2, source3)
.throttleLast(2, TimeUnit.SECONDS)
.subscribe(System.out::println);
输出是
SOURCE 2: 900
SOURCE 3: 2000
如果减少时间间隔,会得到更多emissions:
Observable.concat(source1, source2, source3)
.throttleLast(500, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
输出是
SOURCE 1: 400
SOURCE 1: 900
SOURCE 2: 300
SOURCE 2: 900
SOURCE 3: 2000
throttleFirst()
throttleFirst()只发射固定时间间隔的第一个元素:
Observable.concat(source1, source2, source3)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
输出是
SOURCE 1: 100
SOURCE 2: 300
SOURCE 3: 2000
SOURCE 3: 4000
throttleFirst()和throttleLast()使用computation Scheduler,你可以使用第三个参数指定自己的Scheduler。
throttleWithTimeout() / debounce()
当快速发射的时候,throttleWithTimout()不发射任何东西,出现一个“沉默期”以后,就发射最后一个emission。
它接受时间间隔参数,表示多长时间没收到emissions,就发射最后一个emission。比如前面的例子,我们的沉默期是一秒:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 100) // map to elapsed time
.map(i -> "SOURCE 1: " + i)
.take(10);
Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300) // map to elapsed time
.map(i -> "SOURCE 2: " + i)
.take(3);
Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 2000) // map to elapsed time
.map(i -> "SOURCE 3: " + i)
.take(2);
Observable.concat(source1, source2, source3)
.throttleWithTimeout(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
sleep(6000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出是
SOURCE 2: 900
SOURCE 3: 2000
SOURCE 3: 4000
source3启动的时候,source2的900是最后一个emission,因为source3的第一个2秒不会发射,900超过了一秒的沉默期,被发射了。
接下来是2000,因为一秒之内没有发射,所以它也被发射了。又一个两秒以后,发射了4000。
Switching
switchMap()感觉很像flatMap(),但是有个重要的不同点:它会发射最后一个Observable的最后一个emission,处置任何前面的进行中的Observables。换个说法,它允许你取消一个正在工作的Observable,切换到一个新的,防止陈旧的、冗余的处理。
比如,要发射九个字符串,每个都随机延迟0-2000毫秒(模拟每一个的运算):
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
Observable<String> items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon",
"Zeta", "Eta", "Theta", "Iota");
//delay each String to emulate an intense calculation
Observable<String> processStrings = items.concatMap(s ->
Observable.just(s)
.delay(randomSleepTime(), TimeUnit.MILLISECONDS)
);
processStrings.subscribe(System.out::println);
//keep application alive for 20 seconds
sleep(20000);
}
public static int randomSleepTime() {
//returns random sleep time between 0 to 2000 milliseconds
return ThreadLocalRandom.current().nextInt(2000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果我们想每5秒运行一次该过程,但是我们想cancel(或者叫dispose())先前的处理实例,只运行最后一个。可以使用switchMap()。我们增加另一个Observable.interval(),每5秒发射,然后使用switchMap(),处置当前的正在处理的Observable:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
Observable<String> items = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon",
"Zeta", "Eta", "Theta", "Iota");
//delay each String to emulate an intense calculation
Observable<String> processStrings = items.concatMap(s ->
Observable.just(s)
.delay(randomSleepTime(), TimeUnit.MILLISECONDS)
);
//run processStrings every 5 seconds, and kill each previous instance to start next
Observable.interval(5, TimeUnit.SECONDS)
.switchMap(i ->
processStrings.doOnDispose(() -> System.out.println("Disposing! Starting next"))
).subscribe(System.out::println);
//keep application alive for 20 seconds
sleep(20000);
}
public static int randomSleepTime() {
//returns random sleep time between 0 to 2000 milliseconds
return ThreadLocalRandom.current().nextInt(2000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
为了使switchMap()有效工作,把emissions发射进switchMap()的线程不能占用,完成switchMap()的内部工作。这意味着你可能不得不在switchMap()内使用observeOn()或者subscribeOn(),在不同的线程上做不同的工作。如果switchMap()内的operations很难停下来,你可能得使用unsubscribeOn(),不要让触发线程做处置工作。