webflux系列--reactor功能

创建一个新的Flux

just

指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。 即有限序列。

public static <T> Flux<T> just(T... data);
public static <T> Flux<T> just(T data)

empty()

创建一个不包含任何元素,只发布结束消息的序列

public static <T> Flux<T> empty();

error(Throwable error)

创建一个只包含错误消息的序列。

public static <T> Flux<T> error(Throwable error);
public static <T> Flux<T> error(Supplier<? extends Throwable> errorSupplier);
public static <O> Flux<O> error(Throwable throwable, boolean whenRequested);

never()

创建一个不包含任何消息通知的序列

public static <T> Flux<T> never();

range(int start, int count)

创建包含从 start 起始的 count 个数量的 Integer 对象的序列

public static Flux<Integer> range(int start, int count);

defer

订阅时才决定

public static <T> Flux<T> defer(Supplier<? extends Publisher<T>> supplier);
public static <T> Flux<T> deferWithContext(Function<Context, ? extends Publisher<T>> supplier);

从其他转换

可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。

public static <T> Flux<T> from(Publisher<? extends T> source);
public static <T> Flux<T> fromArray(T[] array);
public static <T> Flux<T> fromIterable(Iterable<? extends T> it);
public static <T> Flux<T> fromStream(Stream<? extends T> s);
public static <T> Flux<T> fromStream(Supplier<Stream<? extends T>> streamSupplier);

转换


public final <V> Flux<V> transform(Function<? super Flux<T>, ? extends Publisher<V>> transformer);
public final <V> Flux<V> transformDeferred(Function<? super Flux<T>, ? extends Publisher<V>> transformer);
//把Flux转成一个
public final <P> P as(Function<? super Flux<T>, P> transformer);
//元素转成另一个类型
public final <E> Flux<E> cast(Class<E> clazz);
//compose是惰性的
public final <V> Flux<V> compose(Function<? super Flux<T>, ? extends Publisher<V>> transformer);

定时产生元素

定时产生Long型对象

public static Flux<Long> interval(Duration period);
public static Flux<Long> interval(Duration delay, Duration period);
public static Flux<Long> interval(Duration period, Scheduler timer);
public static Flux<Long> interval(Duration delay, Duration period, Scheduler timer);

示例:

        Flux.interval(Duration.ofMillis(200)).take(10)
                .subscribe(i -> System.out.println("  ->"+i + ":" + Instant.now().toString()));
        
        Thread.sleep(3000);

结果:

  ->0:2020-12-06T06:25:00.006Z
  ->1:2020-12-06T06:25:00.191Z
  ->2:2020-12-06T06:25:00.391Z
  ->3:2020-12-06T06:25:00.592Z
  ->4:2020-12-06T06:25:00.793Z
  ->5:2020-12-06T06:25:00.995Z
  ->6:2020-12-06T06:25:01.196Z
  ->7:2020-12-06T06:25:01.399Z
  ->8:2020-12-06T06:25:01.600Z
  ->9:2020-12-06T06:25:01.804Z

通用方法

generate()

generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate()方法的另外一种形式 generate(Callable stateSupplier, BiFunction<S,SynchronousSink,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator);
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer);

在代码清单 中,第一个序列的生成逻辑中通过 next()方法产生一个简单的值,然后通过 complete()方法来结束该序列。如果不调用 complete()方法,所产生的是一个无限序列。第二个序列的生成逻辑中的状态对象是一个 ArrayList 对象。实际产生的值是一个随机数。产生的随机数被添加到 ArrayList 中。当产生了 10 个数时,通过 complete()方法来结束序列

Flux.generate(sink -> {
    
    
    sink.next("Hello");
    sink.complete();
}).subscribe(System.out::println);
 
 
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
    
    
    int value = random.nextInt(100);
    list.add(value);
    sink.next(value);
    if (list.size() == 10) {
    
    
        sink.complete();
    }
    return list;
}).subscribe(System.out::println);

create()方法

create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。

 public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter);
 public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure);
 

在代码 中,在一次调用中就产生了全部的 10 个元素。

Flux.create(sink -> {
    
    
    for (int i = 0; i < 10; i++) {
    
    
        sink.next(i);
    }
    sink.complete();
}).subscribe(System.out::println);

杂项

//给序列起个名字
public final Flux<T> name(String name);
//给序列打标签
public final Flux<T> tag(String key, String value);
//使用API接口提供可编程方式,创建以单线程生产者发送多个元素的Flux。
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter);
public static <T> Flux<T> push(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure);
//将2个Publisher的值映射到一个时间窗口,emit组织值,以免窗口重叠。通过一个BiFunction合并2个Publisher发射的值,
public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> join(
			Publisher<? extends TRight> other,
			Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd,
			Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
			BiFunction<? super T, ? super TRight, ? extends R> resultSelector
	) ;

//激活度量,并且被subscriber订阅。
public final Flux<T> metrics();
//多线程处理,
public final ParallelFlux<T> parallel();
public final ParallelFlux<T> parallel(int parallelism);
public final ParallelFlux<T> parallel(int parallelism, int prefetch);

操作符(Operator)

合并多个Flux

combineLatest

该操作会将 所有 publisher 中 最新产生的元素 合并成一个新的元素(每个publisher取最新的,合并成一个新元素)。

但是,他并不会是每个publisher的笛卡尔积,而是只有有一个新元素产生,就会触发一次合并操作。如果有任意一个publisher没有产生元素,则不会进行合并。

public static <T, V> Flux<V> combineLatest(Function<Object[], V> combinator, Publisher<? extends T>... sources);
public static <T, V> Flux<V> combineLatest(Function<Object[], V> combinator, int prefetch,
			Publisher<? extends T>... sources);
public static <T1, T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1,
			Publisher<? extends T2> source2,
			BiFunction<? super T1, ? super T2, ? extends V> combinator);
public static <T1, T2, T3, V> Flux<V> combineLatest(Publisher<? extends T1> source1,
			Publisher<? extends T2> source2,
			Publisher<? extends T3> source3,
			Function<Object[], V> combinator);
 ... .... .... 
 public static <T, V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
			Function<Object[], V> combinator);
public static <T, V> Flux<V> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
			int prefetch,
			Function<Object[], V> combinator);
            

示例1

        Flux c = Flux.range(3, 6).map(i ->  new DataVo(i,"A") ) ;
        Flux d = Flux.range(5, 6).map(i -> new DataVo(i,"B")) ;
        Flux f = Flux.range(4, 6).map(i -> new DataVo(i,"C")) ;

        Flux.combineLatest( Arrays::toString
                ,c, d,f). subscribe(i -> System.out.println("  ->"+i));

        Thread.sleep(2000);

结果:

  ->[[A,8], [B,10], [C,4]]
  ->[[A,8], [B,10], [C,5]]
  ->[[A,8], [B,10], [C,6]]
  ->[[A,8], [B,10], [C,7]]
  ->[[A,8], [B,10], [C,8]]
  ->[[A,8], [B,10], [C,9]]

由于没有延迟,因此publisher顺序完成产生元素。仅当第3个publisher产生元素时,3个publisher才每个都有元素,因此才真正进行合并操作,最后结果是A的最后一个,B个最后一个,与 C个每个进行组合。

示例 2

        Flux c = Flux.range(3, 6).map(i ->  new DataVo(i,"A") ).delayElements(Duration.ofMillis(90));
        Flux d = Flux.range(5, 6).map(i -> new DataVo(i,"B")).delayElements(Duration.ofMillis(80));
        Flux f = Flux.range(4, 6).map(i -> new DataVo(i,"C")).delayElements(Duration.ofMillis(50));

        Flux.combineLatest( Arrays::toString
                ,c, d,f). subscribe(i -> System.out.println("  ->"+i));

        Thread.sleep(2000);

结果:

第50毫秒时产生元素:C,4,80毫秒B有元素:B,5,第90毫秒 A,3,则第一个合并元素为:[[A,3], [B,5], [C,4]]

第100毫秒产生C,5,第150毫秒产生C,6,160毫秒产生B,6。…

  ->[[A,3], [B,5], [C,4]]
  ->[[A,3], [B,5], [C,5]]
  ->[[A,3], [B,5], [C,6]]
  ->[[A,3], [B,6], [C,6]]
  ->[[A,4], [B,6], [C,6]]
  ->[[A,4], [B,6], [C,7]]
  ->[[A,4], [B,7], [C,7]]
  ->[[A,5], [B,7], [C,7]]
  ->[[A,5], [B,7], [C,8]]
  ->[[A,5], [B,7], [C,9]]
  ->[[A,5], [B,8], [C,9]]
  ->[[A,6], [B,8], [C,9]]
  ->[[A,6], [B,9], [C,9]]
  ->[[A,7], [B,9], [C,9]]
  ->[[A,7], [B,10], [C,9]]
  ->[[A,8], [B,10], [C,9]]

concat

用于合并多个Flux,concat按照publisher的顺序进行拼接,即第一个完成,再拼接下一个。

concatWithValues,参数为元素列表。

concatDelayError:保证所有数据都发射,并且error放在最后 。

public static <T> Flux<T> concat(Iterable<? extends Publisher<? extends T>> sources);
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources);
public static <T> Flux<T> concat(Publisher<? extends Publisher<? extends T>> sources, int prefetch);
public static <T> Flux<T> concat(Publisher<? extends T>... sources);
public final Flux<T> concatWithValues(T... values);
public final Flux<T> concatWith(Publisher<? extends T> other)
//错误不会中断主序列,其余的源有机会被连接之后被传播。
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch);
public static <T> Flux<T> concatDelayError(Publisher<? extends Publisher<? extends
			T>> sources, boolean delayUntilEnd, int prefetch);
public static <T> Flux<T> concatDelayError(Publisher<? extends T>... sources);

concat 示例

        Flux<Integer> a = Flux.range(1, 4);
        Flux b = Flux.range(5, 8);
        Flux c = a.map(i -> "[1]" + i).delayElements(Duration.ofMillis(30));
        Flux d = b.map(i -> "[2]" + i).delayElements(Duration.ofMillis(20));
        Flux.concat(c, d).subscribe(i -> System.out.print("  ->"+i));
        Thread.sleep(1000);

结果:

->[1]1  ->[1]2  ->[1]3  ->[1]4  ->[2]5  ->[2]6  ->[2]7  ->[2]8  ->[2]9  ->[2]10  ->[2]11  ->[2]12

concatMap

合并多个Flux,把每个元素map成一个publisher。

public final <V> Flux<V> concatMap(Function<? super T, ? extends Publisher<? extends V>>
			mapper);
public final <V> Flux<V> concatMap(Function<? super T, ? extends Publisher<? extends V>>
			mapper, int prefetch);
public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper);
public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publisher<?
			extends V>> mapper, int prefetch);
public final <V> Flux<V> concatMapDelayError(Function<? super T, ? extends Publisher<?
			extends V>> mapper, boolean delayUntilEnd, int prefetch);
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper);
public final <R> Flux<R> concatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch);
public final Flux<T> concatWith(Publisher<? extends T> other);			

示例

        Flux.range(3, 3).concatMap(t -> Flux.range(7, t)
                .map(i -> "元素:" + t + " 产生序列:" + i))
                .subscribe(i -> System.out.println("  ->" + i));

        Thread.sleep(2000);

结果:

  ->元素:3 产生序列:7
  ->元素:3 产生序列:8
  ->元素:3 产生序列:9
  ->元素:4 产生序列:7
  ->元素:4 产生序列:8
  ->元素:4 产生序列:9
  ->元素:4 产生序列:10
  ->元素:5 产生序列:7
  ->元素:5 产生序列:8
  ->元素:5 产生序列:9
  ->元素:5 产生序列:10
  ->元素:5 产生序列:11

merge

merge和concat一样操作,但是 ,是按照时间先后执行 。先emit的元素先合并。

mergeSequential和merge不同,操作结果按publisher的订阅顺序保证一个publisher订阅完再订阅下一个publisher,与concat类似。和concat不同的是,inner publisher更早被订阅

public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source);
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency);
public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch);
public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources);
public static <I> Flux<I> merge(Publisher<? extends I>... sources);
public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources);
public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>... sources);
public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources);
public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources);
public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources);
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources);
public static <T> Flux<T> mergeSequential(Publisher<? extends Publisher<? extends T>> sources,
			int maxConcurrency, int prefetch);
public static <T> Flux<T> mergeSequentialDelayError(Publisher<? extends Publisher<? extends T>> sources,			int maxConcurrency, int prefetch);
public static <I> Flux<I> mergeSequential(Publisher<? extends I>... sources);
public static <I> Flux<I> mergeSequential(int prefetch, Publisher<? extends I>... sources);
public static <I> Flux<I> mergeSequentialDelayError(int prefetch, Publisher<? extends I>... sources);
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources);
public static <I> Flux<I> mergeSequential(Iterable<? extends Publisher<? extends I>> sources,
			int maxConcurrency, int prefetch);
public static <I> Flux<I> mergeSequentialDelayError(Iterable<? extends Publisher<? extends I>> sources,int maxConcurrency, int prefetch);	
public final Flux<T> mergeOrderedWith(Publisher<? extends T> other,
			Comparator<? super T> otherComparator);
public final Flux<T> mergeWith(Publisher<? extends T> other);

merge示例:

        Flux<Integer> a = Flux.range(1, 6);
        Flux b = Flux.range(5, 8);

        Flux c = a.map(i -> "[1]" + i).delayElements(Duration.ofMillis(30));
        Flux d = b.map(i -> "[2]" + i).delayElements(Duration.ofMillis(20));

        Flux.merge(c, d).subscribe(i -> System.out.print("  ->"+i));

        Thread.sleep(1000);

结果:

->[2]5  ->[1]1  ->[2]6  ->[1]2  ->[2]7  ->[1]3  ->[2]8  ->[1]4  ->[2]9  ->[1]5  ->[2]10  ->[1]6  ->[2]11  ->[2]12

mergeSequential 示例:

        Flux<Integer> a = Flux.range(3, 6);
        Flux<Integer> b = Flux.range(5, 6);
        Flux<Integer> e = Flux.range(4, 6);

        Flux c = a.map(i ->  new DataVo(i,"1") ).delayElements(Duration.ofMillis(30));
        Flux d = b.map(i -> new DataVo(i,"2")).delayElements(Duration.ofMillis(20));
        Flux f = e.map(i -> new DataVo(i,"3")).delayElements(Duration.ofMillis(20));

        Flux.mergeSequential(c, d,f). subscribe(i -> System.out.print("  ->"+i));

        Thread.sleep(1000);

结果:

 ->[1,3]  ->[1,4]  ->[1,5]  ->[1,6]  ->[1,7]  ->[1,8]  ->[2,5]  ->[2,6]  ->[2,7]  ->[2,8]  ->[2,9]  ->[2,10]  ->[3,4]  ->[3,5]  ->[3,6]  ->[3,7]  ->[3,8]  ->[3,9]

mergeOrdered 示例

mergeOrdered:合并之后再排序。如果值相同,则以Publisher 的顺序作为排序依据。

        Flux<Integer> a = Flux.range(3, 6);
        Flux<Integer> b = Flux.range(5, 6);
        Flux<Integer> e = Flux.range(4, 6);

        Flux c = a.map(i ->  new DataVo(i,"1") ).delayElements(Duration.ofMillis(30));
        Flux d = b.map(i -> new DataVo(i,"2")).delayElements(Duration.ofMillis(20));
        Flux f = e.map(i -> new DataVo(i,"3")).delayElements(Duration.ofMillis(20));

        //Comparator.comparing(DataVo::getData)
        Flux.mergeOrdered(DataVo::compareTo,c, d,f). subscribe(i -> System.out.print("  ->"+i));

        Thread.sleep(1000);

结果:

 ->[1,3]  ->[1,4]  ->[3,4]  ->[1,5]  ->[2,5]  ->[3,5]  ->[1,6]  ->[2,6]  ->[3,6]  ->[1,7]  ->[2,7]  ->[3,7]  ->[1,8]  ->[2,8]  ->[3,8]  ->[2,9]  ->[3,9]  ->[2,10]

zip

通过合并器,合并多个Publisher成一个输出流,一 一对应合并 。必须保证每个Publisher都有元素。任意一个Publisher缺少元素即停止。

public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1,
			Publisher<? extends T2> source2,
			final BiFunction<? super T1, ? super T2, ? extends O> combinator);
public static <T1, T2> Flux<Tuple2<T1, T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2);
public static <T1, T2, T3> Flux<Tuple3<T1, T2, T3>> zip(Publisher<? extends T1> source1,
			Publisher<? extends T2> source2,
			Publisher<? extends T3> source3);
... ... ... 
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources,
			final Function<? super Object[], ? extends O> combinator);
public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources,
			int prefetch,
			final Function<? super Object[], ? extends O> combinator);
public static <I, O> Flux<O> zip(final Function<? super Object[], ? extends O> combinator, Publisher<? extends I>... sources);
public static <I, O> Flux<O> zip(final Function<? super Object[], ? extends O> combinator,
			int prefetch,
			Publisher<? extends I>... sources);
public static <TUPLE extends Tuple2, V> Flux<V> zip(Publisher<? extends
			Publisher<?>> sources,
			final Function<? super TUPLE, ? extends V> combinator);
public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2);
public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2,
        final BiFunction<? super T, ? super T2, ? extends V> combinator);
public final <T2, V> Flux<V> zipWith(Publisher<? extends T2> source2,
        int prefetch,
        BiFunction<? super T, ? super T2, ? extends V> combinator;
public final <T2> Flux<Tuple2<T, T2>> zipWith(Publisher<? extends T2> source2, int prefetch);
public final <T2> Flux<Tuple2<T, T2>> zipWithIterable(Iterable<? extends T2> iterable);
public final <T2, V> Flux<V> zipWithIterable(Iterable<? extends T2> iterable,
        BiFunction<? super T, ? super T2, ? extends V> zipper);
                    

示例

        Flux c = Flux.range(3, 6).map(i -> new DataVo(i, "A")).delayElements(Duration.ofMillis(90));
        Flux d = Flux.range(5, 4).map(i -> new DataVo(i, "B")).delayElements(Duration.ofMillis(80));
        Flux f = Flux.range(4, 6).map(i -> new DataVo(i, "C")).delayElements(Duration.ofMillis(50));
        
        Flux.zip(c, d, f)
                .subscribe(i -> System.out.println("  ->" + i));

        Thread.sleep(2000);

结果:

  ->[[A,3],[B,5],[C,4]]
  ->[[A,4],[B,6],[C,5]]
  ->[[A,5],[B,7],[C,6]]
  ->[[A,6],[B,8],[C,7]]

repeat

//一直重复订阅,不停止。
public final Flux<T> repeat();
//?
public final Flux<T> repeat(BooleanSupplier predicate);
//重复n次订阅,输出n+1次。
public final Flux<T> repeat(long numRepeat) ;
public final Flux<T> repeat(long numRepeat, BooleanSupplier predicate);
//?
public final Flux<T> repeatWhen(Function<Flux<Long>, ? extends Publisher<?>> repeatFactory);

cache

将此Flux量转换为热源,并为后面的订阅者缓存最后发射的信号。将保留一个无限量的OnNeXT信号。完成和错误也将被重放。

public final Flux<T> cache();
public final Flux<T> cache(int history);
public final Flux<T> cache(Duration ttl);
public final Flux<T> cache(Duration ttl, Scheduler timer);
public final Flux<T> cache(int history, Duration ttl, Scheduler timer);

示例 1

为B,C 订阅 缓存了5个已经发射过的。

    Flux a =  Flux.range(1,20).delayElements(Duration.ofMillis(100))
                .cache(5);
        a.map(e-> "A->" + e + " ") .subscribe(System.out::print);
        Thread.sleep(1000);
        a.map(e -> "B->" + e + " ") .subscribe(System.out::print);
        Thread.sleep(1000);
        a.map(e -> "C->" + e + " ") .subscribe(System.out::print);
        Thread.sleep(3000);

输出:

A->1 A->2 A->3 A->4 A->5 A->6 A->7 A->8 A->9 B->5 B->6 B->7 B->8 B->9 A->10 B->10 A->11 B->11 A->12 B->12 A->13 B->13 A->14 B->14 A->15 B->15 A->16 B->16 A->17 B->17 A->18 B->18 C->14 C->15 C->16 C->17 C->18 A->19 B->19 C->19 A->20 B->20 C->20 

示例 2

        Flux a =  Flux.range(1,10).delayElements(Duration.ofMillis(100))
                .cache(Duration.ofMillis(80));
        a.map(e-> "A->" + e + " ") .subscribe(System.out::print);
        Thread.sleep(600);
        a.map(e -> "B->" + e + " ") .subscribe(System.out::print);
        Thread.sleep(200);
        a.map(e -> "C->" + e + " ") .subscribe(System.out::print);

输出:

A->1 A->2 A->3 A->4 A->5 B->5 A->6 B->6 A->7 B->7 C->7 A->8 B->8 C->8 A->9 B->9 C->9 A->10 B->10 C->10 

行为(behavior)

doOnNext,doOnError,doOnCancel等等,均表示完成后触发

public final Flux<T> cancelOn(Scheduler scheduler);

public final Flux<T> doAfterTerminate(Runnable afterTerminate);
//当Flux被取消时触发附加行为(side-effect)。
public final Flux<T> doOnCancel(Runnable onCancel);
//当Flux完成时触发附加行为(side-effect)。
public final Flux<T> doOnComplete(Runnable onComplete);
public final <R> Flux<T> doOnDiscard(final Class<R> type, final Consumer<? super R> discardHook)//每个信号附加行为
public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer);
public final Flux<T> doOnError(Consumer<? super Throwable> onError);
public final <E extends Throwable> Flux<T> doOnError(Class<E> exceptionType,
			final Consumer<? super E> onError);
public final Flux<T> doOnError(Predicate<? super Throwable> predicate,
			final Consumer<? super Throwable> onError);
//当Flux发射一个项目时附加行为(side-effect)。
public final Flux<T> doOnNext(Consumer<? super T> onNext);
public final Flux<T> doOnRequest(LongConsumer consumer);
public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe);
//发出终止信号时触发。
public final Flux<T> doOnTerminate(Runnable onTerminate);
//当Flux发射第一个元素之前附加行为
public final Flux<T> doFirst(Runnable onFirst);
public final Flux<T> doFinally(Consumer<SignalType> onFinally);
public final <R> Flux<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler);
public Flux<T> hide();

示例

 Flux.just(5,6).doOnNext( e-> System.out.println("--next:" + e))
                .doOnComplete(()->{System.out.println("--complete");})
                .doOnSubscribe(e-> e.toString())
                .doOnCancel(()-> System.out.println("--cancel"))
                .doOnEach(s -> System.out.println("--singal:" + s.getType()) )
                .subscribe(System.out::println)
                ;

        Thread.sleep(1000);

输出:

--next:5
--singal:onNext
5
--next:6
--singal:onNext
6
--complete
--singal:onComplete

聚合操作

collect

public final <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E, ? super T> collector);
public final <R, A> Mono<R> collect(Collector<? super T, A, ? extends R> collector);
public final Mono<List<T>> collectList();
public final <K, V> Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor,
			Function<? super T, ? extends V> valueExtractor);
public final <K, V> Mono<Map<K, V>> collectMap(
			final Function<? super T, ? extends K> keyExtractor,
			final Function<? super T, ? extends V> valueExtractor,
			Supplier<Map<K, V>> mapSupplier);
public final <K> Mono<Map<K, Collection<T>>> collectMultimap(Function<? super T, ? extends K> keyExtractor);
public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(Function<? super T, ? extends K> keyExtractor,Function<? super T, ? extends V> valueExtractor);
public final <K, V> Mono<Map<K, Collection<V>>> collectMultimap(
			final Function<? super T, ? extends K> keyExtractor,
			final Function<? super T, ? extends V> valueExtractor,
			Supplier<Map<K, Collection<V>>> mapSupplier);
public final Mono<List<T>> collectSortedList();
public final Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator);

reduce

同Stream reduce。

public final Mono<T> reduce(BiFunction<T, T, T> aggregator);
public final <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
public final <A> Mono<A> reduceWith(Supplier<A> initial, BiFunction<A, ? super T, A> accumulator) ;

示例

        Flux.range(1,99).reduce( (i,j)-> i+j).subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");

        Flux.range(1,99).reduce(100, (i,j)-> i+j).subscribe(i -> System.out.print("  -B->" + i));
        System.out.println("\n-------------------");
        Thread.sleep(3000);

结果:

  -A->4950
-------------------
  -B->5050
-------------------

distinct

distinct底层使用HashSet去重,并且保留重复元素中的最后一个元素。

distinctUntilChanged底层也使用HashSet去重,但是只去掉连续出现的重复元素,并且只保留其中第一个元素,如果重复元素不连续出现,则不会被去重。

public final Flux<T> distinct();
public final <V> Flux<T> distinct(Function<? super T, ? extends V> keySelector);
public final <V, C extends Collection<? super V>> Flux<T> distinct(
			Function<? super T, ? extends V> keySelector,
			Supplier<C> distinctCollectionSupplier);
public final <V, C> Flux<T> distinct(
			Function<? super T, ? extends V> keySelector,
			Supplier<C> distinctStoreSupplier,
			BiPredicate<C, V> distinctPredicate,
			Consumer<C> cleanup);
public final Flux<T> distinctUntilChanged();
public final <V> Flux<T> distinctUntilChanged(Function<? super T, ? extends V> keySelector);
public final <V> Flux<T> distinctUntilChanged(Function<? super T, ? extends V> keySelector,
			BiPredicate<? super V, ? super V> keyComparator);

示例

        Flux.just(1, 2, 3, 4, 5, 5, 4, 3, 2, 1).distinct(t -> t)
                .subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");

        Flux.just(1, 2, 3, 4, 5, 5, 4, 3, 2, 1).distinctUntilChanged(t -> t)
                .subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");

        Thread.sleep(3000);

结果:

       Flux.just(1, 2, 3, 4, 5,5,4,3,2,1)
                .distinct(t->t)
        .subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");

group by

public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper);
public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K> keyMapper, int prefetch);
public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper,
			Function<? super T, ? extends V> valueMapper);
public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extends K> keyMapper,
			Function<? super T, ? extends V> valueMapper, int prefetch);
public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> groupJoin(
			Publisher<? extends TRight> other,
			Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd,
			Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
			BiFunction<? super T, ? super Flux<TRight>, ? extends R> resultSelector
	);
    

scan

将每次 reduce 的结果立即发出

public final Flux<T> scan(BiFunction<T, T, T> accumulator);
public final <A> Flux<A> scan(A initial, BiFunction<A, ? super T, A> accumulator);
public final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T, A>
			accumulator);

其他

public final Mono<Long> count();

错误处理

Backpressure(背压)

背压指的是当Subscriber请求的数据的访问超出它的处理能力时,Publisher限制数据发送速度的能力。默认情况下,Subscriber会要求Publisher有多少数据推多少数据,能推多快就推多块。

本质上背压和TCP中的窗口限流机制比较类似,都是让消费者反馈请求数据的范围,生产者根据消费者的反馈提供一定量的数据来进行流控。反馈请求数据范围的操作,可以在Subscriber每次完成数据的处理之后,让Subscriber自行反馈;也可以在Subscriber外部对Subscriber的消费情况进行监视,根据监视情况进行反馈

背压策略

背压策略指的是当Subscriber无法及时request更多数据时,Publisher采取的措施。

可选的策略有buffer、error 、drop和latest,默认策略为buffer。

背压策略方法

可以通过onBackPressureBuffer、onBackPressureError、onBackPressureDrop、onBackPressureLatest选择不同策略。

//缓存策略
public final Flux<T> onBackpressureBuffer();
public final Flux<T> onBackpressureBuffer(int maxSize);
public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow);
public final Flux<T> onBackpressureBuffer(int maxSize, BufferOverflowStrategy bufferOverflowStrategy);
public final Flux<T> onBackpressureBuffer(int maxSize, Consumer<? super T> onBufferOverflow,
			BufferOverflowStrategy bufferOverflowStrategy);
public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction);
public final Flux<T> onBackpressureBuffer(Duration ttl, int maxSize, Consumer<? super T> onBufferEviction, Scheduler scheduler);
//丢弃
public final Flux<T> onBackpressureDrop() ;
public final Flux<T> onBackpressureDrop(Consumer<? super T> onDropped);
public final Flux<T> onBackpressureError() ;
public final Flux<T> onBackpressureLatest();

event

//出现错误跳过错误,使用原数据继续执行
public final Flux<T> onErrorContinue(BiConsumer<Throwable, Object> errorConsumer);
public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type, BiConsumer<Throwable, Object> errorConsumer);
public final <E extends Throwable> Flux<T> onErrorContinue(Predicate<E> errorPredicate,
			BiConsumer<Throwable, Object> errorConsumer)
//
public final Flux<T> onErrorStop();
//替换错误内容
public final Flux<T> onErrorMap(Function<? super Throwable, ? extends Throwable> mapper);
public final <E extends Throwable> Flux<T> onErrorMap(Class<E> type,
			Function<? super E, ? extends Throwable> mapper);
public final Flux<T> onErrorMap(Predicate<? super Throwable> predicate,
			Function<? super Throwable, ? extends Throwable> mapper);
//出现错误使用备用方案
public final Flux<T> onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback);
public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type,
			Function<? super E, ? extends Publisher<? extends T>> fallback);
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate,
			Function<? super Throwable, ? extends Publisher<? extends T>> fallback);
//出现错误直接返回默认值
public final Flux<T> onErrorReturn(T fallbackValue) ;
public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue);
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue);
public final Flux<T> onTerminateDetach() ;

示例

        Flux.just(1,0,7).map(i-> 5/i).onErrorReturn(10).subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");
        Flux.just(1,0,7).map(i-> 5/i).onErrorReturn(ArithmeticException.class ,10).subscribe(i -> System.out.print("  -B->" + i));
        System.out.println("\n-------------------");
        Flux.just(1,0,7).map(i-> 5/i).onErrorResume(e->  Flux.just(4)).subscribe(i -> System.out.print("  -C->" + i));
        System.out.println("\n-------------------");
        Flux.just(1,0,7).map(i-> 5/i).onErrorContinue((e,o)->  System.out.println("--original:" +o)).subscribe(i -> System.out.print("  -D->" + i));
        System.out.println("\n-------------------");
        Flux.just(1,0,7).map(i-> 5/i).onErrorMap(e-> new Exception("CHU YI 0")).subscribe(i -> System.out.print("  -E->" + i));
        System.out.println("\n-------------------");

输出:

  -A->5  -A->10
-------------------
  -B->5  -B->10
-------------------
  -C->5  -C->4
-------------------
  -D->5--original:0
  -D->0
-------------------
  -E->5
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.Exception: CHU YI 0
Caused by: java.lang.Exception: CHU YI 0

retry

//一直重复
public final Flux<T> retry();
//重复n次
public final Flux<T> retry(long numRetries);
//以下都被标识为废弃的。
public final Flux<T> retry(Predicate<? super Throwable> retryMatcher);
public final Flux<T> retry(long numRetries, Predicate<? super Throwable> retryMatcher);
public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory);
public final Flux<T> retryWhen(Retry retrySpec) ;
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff);
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff);
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, Scheduler backoffScheduler);
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor);
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler);

using

类似于java8 中using。

public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends  Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup);
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager);
public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier,
			Function<? super D, ? extends Publisher<? extends T>> resourceClosure,
			Function<? super D, ? extends Publisher<?>> asyncCleanup);
public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier,
			Function<? super D, ? extends Publisher<? extends T>> resourceClosure,
			Function<? super D, ? extends Publisher<?>> asyncComplete,
			Function<? super D, ? extends Publisher<?>> asyncError);
public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier,
			Function<? super D, ? extends Publisher<? extends T>> resourceClosure,
			Function<? super D, ? extends Publisher<?>> asyncComplete,
			Function<? super D, ? extends Publisher<?>> asyncError,
			//the operator itself accepts null for asyncCancel, but we won't in the public API
			Function<? super D, ? extends Publisher<?>> asyncCancel);
public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier,
			Function<? super D, ? extends Publisher<? extends T>> resourceClosure,
			Function<? super D, ? extends Publisher<?>> asyncComplete,
			BiFunction<? super D, ? super Throwable, ? extends Publisher<?>> asyncError,
			//the operator itself accepts null for asyncCancel, but we won't in the public API
			Function<? super D, ? extends Publisher<?>> asyncCancel);
            

示例

Flux.using(
                        () -> new Disposable() {
    
    
                            @Override
                            public void dispose() {
    
    
                              System.out.println("do dispose");
                            }

                            @Override
                            public String toString() {
    
    
                                return "DISPOSABLE";
                            }
                        },
                        disposable -> Flux.just(disposable.toString()),
                        Disposable::dispose
                ).subscribe(System.out::println);

输出:

DISPOSABLE
do dispose

dematerialize,materialize

将上游信号转换为 Signal ,并将其视为 onNext(Signal) 信号进行处理。直接效果就是:它将接收错误信号和完结信号,因此可以用来高效的处理错误。一旦有错误产生,我们可以将 dematerialize() 回调中的 Signal 转化到 Reactive Streams,保证服务的运行。

public final <X> Flux<X> dematerialize() ;
public final Flux<Signal<T>> materialize() ;

发布和订阅(hot流和cold流)

Cold流不论订阅者在何时订阅该数据流,总是能收到数据流中产生的全部消息。

Hot流则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。



public final Disposable subscribe();
public final Disposable subscribe(Consumer<? super T> consumer);
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer);
public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Consumer<? super Subscription> subscriptionConsumer)
public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Context initialContext);
public final void subscribe(Subscriber<? super T> actual) ;
public abstract void subscribe(CoreSubscriber<? super T> actual);
public final Flux<T> subscriberContext(Context mergeContext);
public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) ;
public final Flux<T> subscribeOn(Scheduler scheduler);
public final Flux<T> subscribeOn(Scheduler scheduler, boolean requestOnSeparateThread);
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber);

//用于背压的。
public final Flux<T> limitRate(int prefetchRate);
public final Flux<T> limitRate(int highTide, int lowTide);
public final Flux<T> limitRequest(long requestCap);
//创建一个flux,只要有订阅者就广播,订阅者取消则取source。
public final Flux<T> share() ;

将已有Cold流转变为Hot流

只需要调用publish方法即可,只是要注意,添加非第一个Subscriber前,需要调用一下connect方法

public final ConnectableFlux<T> publish()
public final ConnectableFlux<T> publish(int prefetch);
public final <R> Flux<R> publish(Function<? super Flux<T>, ? extends Publisher<?
			extends R>> transform);
public final <R> Flux<R> publish(Function<? super Flux<T>, ? extends Publisher<?
			extends R>> transform, int prefetch);
public final Mono<T> publishNext();
public final Flux<T> publishOn(Scheduler scheduler);
public final Flux<T> publishOn(Scheduler scheduler, int prefetch);
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch);
final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide);

示例

使用Processor构造Hot流

使用Processor的publish方法即可构造出一个Hot Stream,调用同一个Processor实例的onNext方法即可为之前构造的Hot Stream提供数据

replay

把Flux转换为hot 源,并且缓存 元素为后续subscriber 提供数据

public final ConnectableFlux<T> replay();
public final ConnectableFlux<T> replay(int history);
public final ConnectableFlux<T> replay(Duration ttl);
public final ConnectableFlux<T> replay(int history, Duration ttl);
public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer) ;
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer);

基于时间的操作

timeout

超过设置的时间没有emit 元素,则抛出异常。

public final Flux<T> timeout(Duration timeout);
public final Flux<T> timeout(Duration timeout, @Nullable Publisher<? extends T> fallback);
public final Flux<T> timeout(Duration timeout,
			@Nullable Publisher<? extends T> fallback,
			Scheduler timer);
public final <U> Flux<T> timeout(Publisher<U> firstTimeout);
public final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
			Function<? super T, ? extends Publisher<V>> nextTimeoutFactory) ;
private final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
        Function<? super T, ? extends Publisher<V>> nextTimeoutFactory,
        String timeoutDescription);
public final <U, V> Flux<T> timeout(Publisher<U> firstTimeout,
        Function<? super T, ? extends Publisher<V>> nextTimeoutFactory, Publisher<? extends T>
        fallback) ;

			

delay

延迟每个元素发射,延迟是指 相对于上一个元素发射之后的延迟,即间隔多少时间。

public final Flux<T> delayElements(Duration delay);
public final Flux<T> delayElements(Duration delay, Scheduler timer);
public final Flux<T> delaySequence(Duration delay);
public final Flux<T> delaySequence(Duration delay, Scheduler timer);
public final Flux<T> delayUntil(Function<? super T, ? extends Publisher<?>> triggerProvider);
public final Flux<T> delaySubscription(Duration delay) ;
public final Flux<T> delaySubscription(Duration delay, Scheduler timer);
public final <U> Flux<T> delaySubscription(Publisher<U> subscriptionDelay);

示例

        Flux.range(1,10).map(i-> "A-" + i + " ")
                .timestamp().subscribe(System.out::print);
        System.out.println("\n---------------------");
        Flux.range(1,10).map(i-> "B-" + i + " ").delayElements(Duration.ofMillis(1000))
                .timestamp().subscribe(System.out::print);

        Thread.sleep(10000);

输出:

[1607507005108,A-1 ][1607507005108,A-2 ][1607507005108,A-3 ][1607507005108,A-4 ][1607507005108,A-5 ][1607507005108,A-6 ][1607507005108,A-7 ][1607507005108,A-8 ][1607507005108,A-9 ][1607507005108,A-10 ]
---------------------
[1607507006177,B-1 ][1607507007185,B-2 ][1607507008195,B-3 ][1607507009206,B-4 ][1607507010211,B-5 ][1607507011219,B-6 ][1607507012229,B-7 ][1607507013235,B-8 ][1607507014242,B-9 ]

元素相关Operator

filter

对序列中的每个值进行判断。同Stream filter。

filterWhen与filter过程类似,不过将发射这一步修改为放入buffer中,直到流结束将整个buffer返回。

public final Flux<T> filter(Predicate<? super T> p);
public final Flux<T> filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate);
public final Flux<T> filterWhen(Function<? super T, ? extends Publisher<Boolean>> asyncPredicate,
			int bufferSize);

示例


take

public final Flux<T> take(long n);
//根据时间长度获取,能取出多少是多少
public final Flux<T> take(Duration timespan);
public final Flux<T> take(Duration timespan, Scheduler timer);
public final Flux<T> takeLast(int n);
//取元素直到某个元素   满足条件 则终止(包含此元素)
public final Flux<T> takeUntil(Predicate<? super T> predicate);
public final Flux<T> takeUntilOther(Publisher<?> other);
//取元素直到某个元素  不满足条件 则终止(不包含此元素)。如果第一个条件就不满足,则不会返回任何元素
public final Flux<T> takeWhile(Predicate<? super T> continuePredicate);

示例

        Flux.range(1,10).takeUntil(e -> e> 3).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");
        Flux.range(1,10).takeWhile(e -> e<6).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");
        Flux.range(1,10).takeWhile(e -> e>6).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");

        Thread.sleep(3000);

结果:

  ->1  ->2  ->3  ->4
-------------------
  ->1  ->2  ->3  ->4  ->5
-------------------

-------------------

skip

public final Flux<T> skip(long skipped);
public final Flux<T> skip(Duration timespan);
public final Flux<T> skip(Duration timespan, Scheduler timer);
//跳过最后n个元素
public final Flux<T> skipLast(int n);
//skip元素,直到遇到第一个 满足条件 的不再skip。
public final Flux<T> skipUntil(Predicate<? super T> untilPredicate);
//跳过元素,直到某个publisher 发出了onNext,onComplete信号。
public final Flux<T> skipUntilOther(Publisher<?> other);
//当 满足条件 时skip,直到第一个不满足条件的 的不再skip。
public final Flux<T> skipWhile(Predicate<? super T> skipPredicate);

示例

        Flux.range(1,10).skip(5).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");
        Flux.range(1,10).skipLast(5).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");
        Flux.interval(Duration.ofMillis(30)).take(10).skip(Duration.ofMillis(150)).subscribe(i -> System.out.print("  -->" + i));
        System.out.println("\n-------------------");

        Flux.range(1,10).skipUntil(e -> e>6).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");
        Flux.range(1,10).skipUntil(e -> e<6).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");

        Flux.range(1,10).skipWhile(e -> e>6).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");
        Flux.range(1,10).skipWhile(e -> e<6).subscribe(i -> System.out.print("  ->" + i));
        System.out.println("\n-------------------");

	   //由于other publisher 300毫秒之后才发出onNext信息,所以Flux 全部被跳过了
Flux.range(1,10).skipUntilOther(Flux.just(8).delayElements(Duration.ofMillis(300))).subscribe(i -> System.out.print("  .->" + i));
        System.out.println("\n-------------------");	

        Thread.sleep(3000);

输出:

  ->6  ->7  ->8  ->9  ->10
-------------------
  ->1  ->2  ->3  ->4  ->5
-------------------

-------------------
  ->7  ->8  ->9  ->10
-------------------
  ->1  ->2  ->3  ->4  ->5  ->6  ->7  ->8  ->9  ->10
-------------------
  ->1  ->2  ->3  ->4  ->5  ->6  ->7  ->8  ->9  ->10
-------------------
  ->6  ->7  ->8  ->9  ->10
-------------------
  
-------------------
  -->5  -->6  -->7  -->8  -->9

sample

//周期性采样,在时间窗口内,采样Flux发射的最后一个值。
public final Flux<T> sample(Duration timespan);
//当另一个publisher,emit 一个元素时 即 采样。取最后一个元素
public final <U> Flux<T> sample(Publisher<U> sampler);
//同sample,但是取的时间窗口中的第一个元素。
public final Flux<T> sampleFirst(Duration timespan);
public final <U> Flux<T> sampleFirst(Function<? super T, ? extends Publisher<U>> samplerFactory);
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>> throttlerFactory);
public final <U> Flux<T> sampleTimeout(Function<? super T, ? extends Publisher<U>>
			throttlerFactory, int maxConcurrency) ;

示例

        Flux.interval(Duration.ofMillis(200)).take(10).sample(Duration.ofMillis(500)).subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");

        Flux.interval(Duration.ofMillis(200)).take(10).sampleFirst(Duration.ofMillis(500)).subscribe(i -> System.out.print("  -B->" + i));
        System.out.println("\n-------------------");
      Flux.interval(Duration.ofMillis(200)).take(10).sample(Flux.just(1,2,3,4,5).delayElements(Duration.ofMillis(300))).subscribe(i -> System.out.print("  -C->" + i));
        System.out.println("\n-------------------");

        Thread.sleep(3000);

输出:

-------------------

-------------------

-------------------
  -B->0  -C->0  -A->1  -C->2  -B->3  -C->3  -A->4  -C->5  -B->6  -A->6  -C->6  -A->9  -B->9

single

获取一个元素,并且publisher不能emit多于1个元素。

//获取一个元素,如果publisher 没有元素,则异常
public final Mono<T> single() ;
//如果publisher 没有元素,则使用默认值。
public final Mono<T> single(T defaultValue);
//如果publisher 没有元素,则使用空值。
public final Mono<T> singleOrEmpty()

element

//选择第一个发出任何信号(onNext/onError/onComplete)的Publisher,并重放来自该Publisher的所有信号,有效地表现得像这些竞争源中最快的一个。
public static <I> Flux<I> first(Publisher<? extends I>... sources);
public static <I> Flux<I> first(Iterable<? extends Publisher<? extends I>> sources);
//获取第n个元素
public final Mono<T> elementAt(int index);
public final Mono<T> elementAt(int index, T defaultValue);
//是否存在元素
public final Mono<Boolean> hasElement(T value);
public final Mono<Boolean> hasElements();
public final Flux<Tuple2<Long, T>> index() ;
public final <I> Flux<I> index(BiFunction<? super Long, ? super T, ? extends I> indexMapper);
//取序列最后一个值
public final Mono<T> last();
public final Mono<T> last(T defaultValue);
//获取第一个元素。
public final Mono<T> next();
//所有元素都满足,则返回true。
public final Mono<Boolean> all(Predicate<? super T> predicate);
//有一个元素满足,则返回true。
public final Mono<Boolean> any(Predicate<? super T> predicate);
public final Flux<T> defaultIfEmpty(T defaultV) ;

插入元素

//在序列开头插入元素
public final Flux<T> startWith(Iterable<? extends T> iterable);
public final Flux<T> startWith(T... values);
public final Flux<T> startWith(Publisher<? extends T> publisher);

示例

        Flux.just(1, 2, 3, 4, 5).startWith(8, 9, 10).subscribe(i -> System.out.print("  -A->" + i));
        System.out.println("\n-------------------");
        Flux.just(1, 2, 3, 4, 5).startWith(Arrays.asList(8,9,10)).subscribe(i -> System.out.print("  -B->" + i));
        System.out.println("\n-------------------");
        Flux.just(1, 2, 3, 4, 5).startWith(Flux.just(8,9,10)).subscribe(i -> System.out.print("  -C->" + i));


        Thread.sleep(3000);

结果:

  -A->8  -A->9  -A->10  -A->1  -A->2  -A->3  -A->4  -A->5
-------------------
  -B->8  -B->9  -B->10  -B->1  -B->2  -B->3  -B->4  -B->5
-------------------
  -C->8  -C->9  -C->10  -C->1  -C->2  -C->3  -C->4  -C->5

switch

为最近 被发射出的 publisher 创建一个mirror,当有下一个新的 publisher 被发射时立即switch。 被switch的元素不会被发射出来

//onNext 
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers);
public static <T> Flux<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, int prefetch);
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends V>> transformer)
public final <V> Flux<V> switchOnFirst(BiFunction<Signal<? extends T>, Flux<T>, Publisher<? extends V>> transformer, boolean cancelSourceOnComplete);
//如果flux没有任何数据,用另一个publisher替换。
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate);
//针对每个元素,map一个publisher
public final <V> Flux<V> switchMap(Function<? super T, Publisher<? extends V>> fn);
public final <V> Flux<V> switchMap(Function<? super T, Publisher<? extends V>> fn, int prefetch);

示例 1

        Flux.switchOnNext(
                Flux.just(
                        Flux.interval(Duration.ofMillis(50)).take(10).map(i-> "B-" + i + " ")
                        , Flux.interval(Duration.ofMillis(50)).take(10).map(i-> "A-" + i + " ")
                        , Flux.interval(Duration.ofMillis(100)).take(10).map(i-> "C-" + i + " ")
                ).delayElements(Duration.ofMillis(200))
        ).subscribe(System.out::print);

输出:

B-0 B-1 B-2 A-0 A-1 A-2 C-0 C-1 C-2 C-3 C-4 C-5 C-6 C-7 C-8 C-9

示例 2

        Flux.switchOnNext(
                Flux.just(
                Flux.interval(Duration.ofMillis(50)).take(10).map(i-> "B-" + i + " ").log()
                , Flux.interval(Duration.ofMillis(50)).take(10).map(i-> "A-" + i + " ").log()
                        , Flux.interval(Duration.ofMillis(100)).take(10).map(i-> "C-" + i + " ").log()
                ).delayElements(Duration.ofMillis(200))
        ).subscribe(System.out::print);

输出:

18:29:22.696 [parallel-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
18:29:22.700 [parallel-1] INFO reactor.Flux.Map.1 - request(32)
18:29:22.768 [parallel-2] INFO reactor.Flux.Map.1 - onNext(B-0 )
B-0 18:29:22.810 [parallel-2] INFO reactor.Flux.Map.1 - onNext(B-1 )
B-1 18:29:22.857 [parallel-2] INFO reactor.Flux.Map.1 - onNext(B-2 )
B-2 18:29:22.904 [parallel-2] INFO reactor.Flux.Map.1 - onNext(B-3 )
18:29:22.904 [parallel-3] INFO reactor.Flux.Map.1 - cancel()
18:29:22.905 [parallel-3] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
18:29:22.905 [parallel-3] INFO reactor.Flux.Map.2 - request(32)
18:29:22.965 [parallel-4] INFO reactor.Flux.Map.2 - onNext(A-0 )
A-0 18:29:23.012 [parallel-4] INFO reactor.Flux.Map.2 - onNext(A-1 )
A-1 18:29:23.059 [parallel-4] INFO reactor.Flux.Map.2 - onNext(A-2 )
A-2 18:29:23.121 [parallel-4] INFO reactor.Flux.Map.2 - onNext(A-3 )
18:29:23.121 [parallel-5] INFO reactor.Flux.Map.2 - cancel()
18:29:23.121 [parallel-5] INFO reactor.Flux.Map.3 - onSubscribe(FluxMap.MapSubscriber)
18:29:23.121 [parallel-5] INFO reactor.Flux.Map.3 - request(32)
18:29:23.230 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-0 )
C-0 18:29:23.322 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-1 )
C-1 18:29:23.430 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-2 )
C-2 18:29:23.524 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-3 )
C-3 18:29:23.633 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-4 )
C-4 18:29:23.727 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-5 )
C-5 18:29:23.822 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-6 )
C-6 18:29:23.930 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-7 )
C-7 18:29:24.024 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-8 )
C-8 18:29:24.132 [parallel-6] INFO reactor.Flux.Map.3 - onNext(C-9 )
C-9 18:29:24.133 [parallel-6] INFO reactor.Flux.Map.3 - onComplete()

示例 3

        Flux.range(1,10)
                .switchMap( e-> Flux.just( 100 + e, 1000 + e ))
                .map( e-> "--> " + e)
                .subscribe(System.out::print);

输出:

--> 101--> 1001--> 102--> 1002--> 103--> 1003--> 104--> 1004--> 105--> 1005--> 106--> 1006--> 107--> 1007--> 108--> 1008--> 109--> 1009--> 110--> 1010

忽略元素值

//忽略onNext 信号,返回终止信号。
public final Mono<T> ignoreElements();
//选择一个先emit 元素的publisher,并且重放所有信号。
public final Flux<T> or(Publisher<? extends T> other);

then


//用 Mono 来表示序列已经结束
public final Mono<Void> then()
//用一个other Mono来表示序列结束。
public final <V> Mono<V> then(Mono<V> other)
//等待other publisher 完成。
public final Mono<Void> thenEmpty(Publisher<Void> other);
//Let this  Flux  complete then play another   Publisher 
public final <V> Flux<V> thenMany(Publisher<V> other);

示例

        Flux.interval(Duration.ofMillis(100)).take(10).then().log();
        Flux.interval(Duration.ofMillis(100)).take(10).then(Mono.just(100)).subscribe(i -> System.out.print("  -B->" + i));
        Flux.empty().thenEmpty(Flux.just()).subscribe(i -> System.out.print("  -C->" + i));
        Flux.interval(Duration.ofMillis(100)).take(10).thenMany(Flux.just(1,2,3,4,5)).subscribe(i -> System.out.print("  -D->" + i));

结果:

 -B->100  -D->1  -D->2  -D->3  -D->4  -D->5

sort

public final Flux<T> sort();
public final Flux<T> sort(Comparator<? super T> sortFunction);

map

flatMap同Stream flatMap, 如果一个Flux中的元素类型为Publisher,即Flux中每个元素都是一个响应式流时,flatMap可以将这些元素中的流拼接起来,作为一个流返回。 flatMap方法的输入的每个元素类型为 Publisher。

flatMap的两个参数concurrency及prefetch,分别是作用于外头及里头的两个flux,第一次request都是使用该值,后续的话,其内部会对request的数量进行判断和调整。

//mapper,输入是 Publisher<? super T>,输出是Publisher<? extends R>。
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
			concurrency);
public final <V> Flux<V> flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int
			concurrency, int prefetch);
public final <V> Flux<V> flatMapDelayError(Function<? super T, ? extends Publisher<? extends V>> mapper,int concurrency, int prefetch);

public final <R> Flux<R> flatMap(
			@Nullable Function<? super T, ? extends Publisher<? extends R>> mapperOnNext,
			@Nullable Function<? super Throwable, ? extends Publisher<? extends R>> mapperOnError,
			@Nullable Supplier<? extends Publisher<? extends R>> mapperOnComplete);
//mapper,输入是 Publisher<? super T>,输出是Iterable<? extends R>。
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper);
public final <R> Flux<R> flatMapIterable(Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch);
//同concatSequential类似。先inner 再 outer。
public final <R> Flux<R> flatMapSequential(Function<? super T, ? extends
			Publisher<? extends R>> mapper);
public final <R> Flux<R> flatMapSequential(Function<? super T, ? extends
			Publisher<? extends R>> mapper, int maxConcurrency);
public final <R> Flux<R> flatMapSequential(Function<? super T, ? extends
			Publisher<? extends R>> mapper, int maxConcurrency, int prefetch);
public final <R> Flux<R> flatMapSequentialDelayError(Function<? super T, ? extends
			Publisher<? extends R>> mapper, int maxConcurrency, int prefetch);
//同stream map。
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper);

expand

对每个元素进行扩展,再合并成一个Flux。

//深度优先
public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander,
			int capacityHint);
public final Flux<T> expandDeep(Function<? super T, ? extends Publisher<? extends T>> expander);
//广度优先
public final Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>> expander,
			int capacityHint);
public final Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>> expander) ;

杂项

//如果可转换则转换成某个类型
public final <U> Flux<U> ofType(final Class<U> clazz);

拆分

Flux流提供了两种方法将原始流分割为多个批量的块,方便Subscriber一次接收到多条数据进行批量操作。 

​ buffer和window根据设置的每次批量返回的数据的个数或者时间窗的长度分割数据并返回,区别是:

  • buffer会缓存数据,当缓存的数据的数量达到设置的个数时或者时间窗口结束时,才批量返回数据。

  • window则是在Publisher发布数据之后,立刻返回数据直到返回的数据的数量达到设置的值,或者时间窗结束。

  • buffer方法返回的Flux发射的元素类型是List,而windows是Flux、UnicastProcessor

window

public final Flux<Flux<T>> window(int maxSize);
public final Flux<Flux<T>> window(int maxSize, int skip);
public final Flux<Flux<T>> window(Publisher<?> boundary);

public final Flux<Flux<T>> window(Duration windowingTimespan);
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery);
public final Flux<Flux<T>> window(Duration windowingTimespan, Scheduler timer);
public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindowEvery, Scheduler timer);
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime);
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Scheduler timer);

public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger);
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore);
public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cutBefore, int prefetch);
public final <V> Flux<Flux<T>> windowUntilChanged();
public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T, ? super V> keySelector);
public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T, ? extends V> keySelector,
        BiPredicate<? super V, ? super V> keyComparator);
public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate);
public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int prefetch);

buffer

注意buffer不是一个,是满足条件就创建buffer对象。

该操作在数据信号触发 取消或错误时 丢弃缓冲器。

//每次缓存MAX_VALUE
public final Flux<List<T>> buffer();
//每次缓存maxSize
public final Flux<List<T>> buffer(int maxSize);
//需提供容器
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize, Supplier<C> bufferSupplier);
//每跳过几个元素,就创建新的buffer。
public final Flux<List<T>> buffer(int maxSize, int skip);
public final <C extends Collection<? super T>> Flux<C> buffer(int maxSize,
			int skip, Supplier<C> bufferSupplier);
//根据other信号来决定每次缓存的个数, 当other.onNext的时候把缓存提交到消费者的onNext
public final Flux<List<T>> buffer(Publisher<?> other);
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier);
//other为 interval(bufferingTimespan)。每多少时间新建一个buffer。
public final Flux<List<T>> buffer(Duration bufferingTimespan);
//openBufferEvery:每多少时间新建一个buffer。
//bufferingTimespan:缓冲池存在时长。超时缓冲区丢弃,元素也丢弃。
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery);
public final Flux<List<T>> buffer(Duration bufferingTimespan, Scheduler timer);
public final Flux<List<T>> buffer(Duration bufferingTimespan, Duration openBufferEvery, Scheduler timer);
//缓冲区超时,就释放数据,把数据发射出来。
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime);
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration maxTime, Supplier<C> bufferSupplier);
public final Flux<List<T>> bufferTimeout(int maxSize, Duration maxTime, Scheduler timer);
//当元素满足条件,缓存结束(此元素包含在当前缓存中)
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate);
//cutBefore:满足条件的元素放在后一个buffer中。
public final Flux<List<T>> bufferUntil(Predicate<? super T> predicate, boolean cutBefore);
//元素不同,即产生一个buffer,
public final <V> Flux<List<T>> bufferUntilChanged();
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T, ? extends V> keySelector);
public final <V> Flux<List<T>> bufferUntilChanged(Function<? super T, ? extends V> keySelector,
			BiPredicate<? super V, ? super V> keyComparator);
//只缓存满足条件的。
public final Flux<List<T>> bufferWhile(Predicate<? super T> predicate);
//?
public final <U, V> Flux<List<T>> bufferWhen(Publisher<U> bucketOpening,
			Function<? super U, ? extends Publisher<V>> closeSelector);
public final <U, V, C extends Collection<? super T>> Flux<C> bufferWhen(Publisher<U> bucketOpening,
			Function<? super U, ? extends Publisher<V>> closeSelector, Supplier<C> bufferSupplier);
            

示例

 Flux.range(1, 20).buffer(5).subscribe(e -> System.out.println("--A-->" + e.toString()));
        Flux.range(1, 20).buffer(5, () -> new ArrayList<>(5)).subscribe(e -> System.out.println("--B-->" + e.toString()));
        Flux.range(1, 10).buffer(5, 2).subscribe(e -> System.out.println("--C-->" + e.toString()));
        Flux.range(1, 10).delayElements(Duration.ofMillis(100)).buffer(Duration.ofMillis(600)).subscribe(e -> System.out.println("--D-->" + e.toString()));
        Flux.range(1, 10).delayElements(Duration.ofMillis(100)).buffer(Duration.ofMillis(200), Duration.ofMillis(600)).subscribe(e -> System.out.println("--E-->" + e.toString()));
        Flux.range(1, 10).delayElements(Duration.ofMillis(100)).bufferTimeout(3, Duration.ofMillis(200)).subscribe(e -> System.out.println("--F-->" + e.toString()));
        Flux.range(1, 10).delayElements(Duration.ofMillis(100)).bufferUntil(e -> e % 3 == 0).subscribe(e -> System.out.println("--G-->" + e.toString()));
        Flux.range(1, 10).delayElements(Duration.ofMillis(100)).bufferUntil(e -> e % 3 == 0, true).subscribe(e -> System.out.println("--H-->" + e.toString()));
        Flux.just(1, 1, 3, 5, 5, 7, 9, 9, 10).delayElements(Duration.ofMillis(100)).bufferUntilChanged().subscribe(e -> System.out.println("--J-->" + e.toString()));
        Flux.range(1, 10).delayElements(Duration.ofMillis(100)).bufferWhile(e -> e % 3 == 0).subscribe(e -> System.out.println("--K-->" + e.toString()));

        Thread.sleep(3000);

输出:

--A-->[1, 2, 3, 4, 5]
--A-->[6, 7, 8, 9, 10]
--A-->[11, 12, 13, 14, 15]
--A-->[16, 17, 18, 19, 20]
--B-->[1, 2, 3, 4, 5]
--B-->[6, 7, 8, 9, 10]
--B-->[11, 12, 13, 14, 15]
--B-->[16, 17, 18, 19, 20]
--C-->[1, 2, 3, 4, 5]
--C-->[3, 4, 5, 6, 7]
--C-->[5, 6, 7, 8, 9]
--C-->[7, 8, 9, 10]
--C-->[9, 10]
--E-->[1, 2]
--H-->[1, 2]
--J-->[1, 1]
--F-->[1, 2]
--G-->[1, 2, 3]
--K-->[3]
--J-->[3]
--F-->[3, 4]
--D-->[1, 2, 3, 4]
--G-->[4, 5, 6]
--J-->[5, 5]
--H-->[3, 4, 5]
--F-->[5, 6]
--K-->[6]
--J-->[7]
--E-->[6, 7]
--F-->[7, 8]
--H-->[6, 7, 8]
--G-->[7, 8, 9]
--J-->[9, 9]
--J-->[10]
--D-->[5, 6, 7, 8, 9, 10]
--K-->[9]
--F-->[9, 10]
--G-->[10]
--H-->[9, 10]

回到同步操作

//阻塞直到拿到第一个元素。返回值都是元素。
public final T blockFirst();
public final T blockFirst(Duration timeout);
//阻塞直到拿到最后一个元素(如果序列为空则返回 null)
public final T blockLast();
public final T blockLast(Duration timeout);

to

public final Iterable<T> toIterable();
public final Iterable<T> toIterable(int batchSize);
public final Iterable<T> toIterable(int batchSize, @Nullable Supplier<Queue<T>>
queueProvider);
public final Stream<T> toStream();
public final Stream<T> toStream(int batchSize) ;

调试类Operator

log

输出Publisher接收到信号的每一步的信息。

category :采用什么配置。例如:

       <!-- name必须和log方法中category相同 -->
        <logger level="info" name="myCategory">
            <AppenderRef ref="Console2"/>
        </logger>
public final Flux<T> log();
public final Flux<T> log(String category);
public final Flux<T> log(@Nullable String category, Level level, SignalType... options);
public final Flux<T> log(@Nullable String category,
			Level level,
			boolean showOperatorLine,
			SignalType... options) ;
public final Flux<T> log(Logger logger);
public final Flux<T> log(Logger logger,
			Level level,
			boolean showOperatorLine,
			SignalType... options);
			

输出:

15:35:06.301 [main] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
15:35:06.303 [main] INFO reactor.Flux.Take.1 - request(unbounded)
15:35:06.415 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0)
15:35:06.507 [parallel-1] INFO reactor.Flux.Take.1 - onNext(1)
15:35:06.617 [parallel-1] INFO reactor.Flux.Take.1 - onNext(2)
15:35:06.710 [parallel-1] INFO reactor.Flux.Take.1 - onNext(3)
15:35:06.818 [parallel-1] INFO reactor.Flux.Take.1 - onNext(4)
15:35:06.820 [parallel-1] INFO reactor.Flux.Take.1 - onComplete()

elapsed

将Flux转为Flux<Tuple2<Long, T>>,Tuple2类似于Pair对象,将获取数据的耗时和数据本身保存在一个对象中。

public final Flux<Tuple2<Long, T>> elapsed();
public final Flux<Tuple2<Long, T>> elapsed(Scheduler scheduler);

示例

Flux.interval(Duration.ofMillis(100)).take(5).elapsed().log().subscribe();

结果:

15:40:23.589 [parallel-1] INFO reactor.Flux.Elapsed.1 - | onNext([123,0])
15:40:23.683 [parallel-1] INFO reactor.Flux.Elapsed.1 - | onNext([94,1])
15:40:23.776 [parallel-1] INFO reactor.Flux.Elapsed.1 - | onNext([93,2])
15:40:23.885 [parallel-1] INFO reactor.Flux.Elapsed.1 - | onNext([109,3])
15:40:23.979 [parallel-1] INFO reactor.Flux.Elapsed.1 - | onNext([94,4])

timestamp

与elapsed相似,但是要返回的是当前时间戳。

public final Flux<Tuple2<Long, T>> timestamp() ;
public final Flux<Tuple2<Long, T>> timestamp(Scheduler scheduler);

checkpoint

public final Flux<T> checkpoint();
public final Flux<T> checkpoint(@Nullable String description, boolean forceStackTrace);

Schedulers

reactor中到处充满了异步调用,内部必然有一堆线程调度,Schedulers提供了如下几种调用策略:

  • Schedulers.immediate() - 使用当前线程
  • Schedulers.elastic() - 使用线程池
  • Schedulers.newElastic(“test1”) - 使用(新)线程池(可以指定名称,更方便调试)
  • Schedulers.single() - 单个线程
  • Schedulers.newSingle(“test2”) - (新)单个线程(可以指定名称,更方便调试)
  • Schedulers.parallel() - 使用并行处理的线程池(取决于CPU核数)
  • Schedulers.newParallel(“test3”) - 使用并行处理的线程池(取决于CPU核数,可以指定名称,方便调试)
  • Schedulers.fromExecutorService(Executors.newScheduledThreadPool(5)) - 使用Executor(这个最灵活)

publishOn 和 subscribeOn

publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。

在链式调用中,publishOn可以切换Scheduler,但是subscribeOn并不会起作用。

这是因为真正的publish-subscribe关系只有在subscriber开始subscribe的时候才建立。

subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。

publishOn

public void usePublishOn() throws InterruptedException {
    
    
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }

在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。

subscribeOn

    public void useSubscribeOn() throws InterruptedException {
    
    
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }

不管哪个map,都是用的是切换过的parallel-scheduler。

参考

以下链接有助于理解:

https://www.cnblogs.com/yjmyzz/p/reactor-tutorial-1.html

https://www.cnblogs.com/yjmyzz/p/reactor-tutorial-2.html

https://www.jianshu.com/p/5172c48cb877

https://zhuanlan.zhihu.com/p/37076445

http://doc.oschina.net/projectreactor?t=44494

https://projectreactor.io/docs/core/release/reference/index.html#which-operator

猜你喜欢

转载自blog.csdn.net/demon7552003/article/details/111097553