定义基础flux生成方法
private Flux<Integer> flux1() {
//range(start, count) = start, start+1, ..., start+count-1
//1, 2, 3, 4, 5
return Flux.range(1, 5);
}
private Flux<Integer> flux2() {
//6, 7, 8, 9, 10
return Flux.range(6, 5);
}
private Flux<String> hotFlux1() {
//带延迟的flux(如下即每个元素依次延2000ms后再被消费)
//[1]1, [1]2, [1]3, [1]4, [1]5
return flux1().map(i -> "[1]" + i).delayElements(Duration.ofMillis(2000));
}
private Flux<String> hotFlux2() {
//带延迟的flux(如下即每个元素依次延后500ms再被消费)
//[2]6, [2]7, [2]8, [2]9, [2]10
return flux2().map(i -> "[2]" + i).delayElements(Duration.ofMillis(500));
}
Java Api文档图示
示例代码
//merge: 合并流(按照各个流中元素的时间点先后顺序进行消费)
//即在一开始就订阅所有流,然后按时间先后顺序消费流中的元素
Flux.merge(hotFlux1(), hotFlux2())
.subscribe(s -> {
log.info(s);
});
//同merge
hotFlux1().mergeWith(hotFlux2())
.subscribe(s -> {
log.info(s);
});
//mergeSequential: 合并流(先消费完流1的所有元素,再消费流2)
//即一开始就订阅所有流,先消费流1元素,若流2已经产生元素,则按顺序累积流2元素,待流1元素全部消费完后再依次消费流2中元素
Flux.mergeSequential(hotFlux1(), hotFlux2())
.subscribe(s -> {
log.info(s);
});
//concat: 连接流(先消费完流1的所有元素,再消费流2)
//先订阅流1后消费流1,待流1消费完成后再订阅流2、消费流2,即依次订阅流
Flux.concat(hotFlux1(), hotFlux2())
.subscribe(s -> {
log.info(s);
});
int step = 10;
//flatMap: 转换流+合并流(即转换后的流被merge合并)
//即同时订阅多个流,然后按时间先后顺序消费流中的元素
Flux.range(1, 2)
.flatMap(i -> {
Stream<Integer> newStream = IntStream.range(i * step, (i + 1) * step).boxed();
if (i < 2) {
return Flux.fromStream(newStream).delayElements(Duration.ofMillis(500));
}
return Flux.fromStream(newStream).delayElements(Duration.ofMillis(2000));
})
.subscribe(s -> log.info(String.valueOf(s)));
//flatMapSequential: 转换流+合并流(即转换后的流被mergeSequential合并)
//即同时订阅多个流,然后先消费流1,积累流2,直到流1消费完成后再消费流2
Flux.range(1, 2)
.flatMapSequential(i -> {
Stream<Integer> newStream = IntStream.range(i * step, (i + 1) * step).boxed();
if (i < 2) {
return Flux.fromStream(newStream).delayElements(Duration.ofMillis(500));
}
return Flux.fromStream(newStream).delayElements(Duration.ofMillis(2000));
})
.subscribe(s -> log.info(String.valueOf(s)));
//concatMap: 转换流+连接流(即转换后的流被concat连接)
//即先订阅流1消费流1,待流1全部消费完成后再订阅流2消费流2
Flux.range(1, 2)
.concatMap(i -> {
Stream<Integer> newStream = IntStream.range(i * step, (i + 1) * step).boxed();
if (i < 2) {
return Flux.fromStream(newStream).delayElements(Duration.ofMillis(500));
}
return Flux.fromStream(newStream).delayElements(Duration.ofMillis(2000));
})
.subscribe(s -> log.info(String.valueOf(s)));