reactor之数据源的产生

Flux和Mono作为数据源,产生数据的方法有好多种,本文中对常用的进行罗列和简述。不足的地方可以去源文档中查看。

空数据源或者特殊数据源

empty

通过empty方法,可以创建一个空的数据流。

Flux.empty();
Mono.empty();

error

通过error方法,可以创建一个错误的数据流。

Flux.error(new RuntimeException("error"));
Mono.error(new RuntimeException("error"));

never

通过never方法,可以创建一个永远不会发出任何信号的数据流。

Flux.never();
Mono.never();

简单的数据源

just

通过just方法,可以创建一个包含固定元素的数据流。

Flux.just("foo", "bar", "foobar");
Mono.just("foo");

range

通过range方法,可以创建一个包含指定范围的整数的数据流。

FLux<Integer> flux = Flux.range(1, 10);

fromArray

通过fromArray方法,可以创建一个包含数组元素的数据流。

Flux.fromArray(new Integer[]{
    
    1, 2, 3});

fromIterable

通过fromIterable方法,可以创建一个包含Iterable元素的数据流。

Flux.fromIterable(Arrays.asList("foo", "bar", "foobar"));

fromStream

通过fromStream方法,可以创建一个包含Stream元素的数据流。

Flux.fromStream(Stream.of("foo", "bar", "foobar"));

fromFuture

通过fromFuture方法,可以创建一个包含CompletableFuture元素的数据流。

Mono.fromFuture(CompletableFuture.completedFuture("foo"));

fromRunnable

通过fromRunnable方法,可以创建一个包含Runnable元素的数据流。

Mono.fromRunnable(() -> System.out.println("foo"));

Interval产生间隔时间发送数据

interval

通过interval方法,可以创建一个每隔固定时间就发送元素的数据流。

Flux.interval(Duration.ofSeconds(1));

intervalMillis

通过intervalMillis方法,可以创建一个每隔固定时间就发送元素的数据流。

Flux.intervalMillis(1000);

高级方法generate和create

generate

通过generate方法,可以自定义数据流的产生。 generate方法同步地,一次产生一个元素。意味着,当一个元素被消费后,才会产生下一个元素。sink是一个SynchronousSink,它提供了next()方法来产生元素。

generate有多个重载方法。

Flux.generate(generator);
Flux.generate(stateSupplier,generator);
/**
 * @param stateSupplier 一个无参函数,返回一个初始状态
 * @param generator 一个BiFunction,接受状态和SynchronousSink,返回新的状态
 * @param stateConsumer 一个Consumer,接受状态,用于清理资源
 */
Flux.generate(stateSupplier,generator,stateConsumer)

具体的demo如下:


Flux.generate(sink->{
    
    
        sink.next("Hello");
        sink.complete();
});
        
Flux.generate(()->0,(state,sink)->{
    
    
        sink.next("3 x "+state+" = "+3*state);
        if(state==10)sink.complete();
        return state+1;
});

create

通过create方法,可以自定义数据流的产生。create方法相对于generate方法更加高级,既可以同步地,一次产生一个元素,也可以异步地,一次产生多个元素。 该方法用到了 FluxSink,后者同样提供 next,error 和
complete 等方法。 与 generate 不同的是,create 不需要状态值,另一方面,它可以在回调中触发 多个事件(即使是在未来的某个时间)。

        //create方法的接口
        Flux.create(Consumer<? super FluxSink<T>>emitter)


        /**
         *
         */
        Flux.create(Consumer<? super FluxSink<T>>emitter,FluxSink.OverflowStrategy overflowStrategy)

假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。如下:

interface MyEventListener<T> {
    
    
    void onDataChunk(List<T> chunk);

    void processComplete();
}

你可以使用 create 方法来实现这个 API:

Flux.create(sink->{
    
    
        MyEventListener<T> listener=new MyEventListener<T>(){
    
    
            @Override
            public void onDataChunk(List<T> chunk){
    
    
                for(T value:chunk){
    
    
                    sink.next(value);
                }
            }
            @Override
            public void processComplete(){
    
    
                sink.complete();
            }
        };
        source.register(listener);
});

此外,既然 create 可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy 来定义背压行为。

  • IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。
  • ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
  • LATEST:让下游只得到上游最新的元素。
  • BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。

推送(push)模式

在推送模式中,数据源会主动推送数据给下游。这种模式下,数据源是生产者,下游是消费者。 create 的一个变体是 push,适合生成事件流。与 create类似,push 也可以是异步地, 并且能够使用以上各种溢出策略(overflow
strategies)管理背压。每次只有一个生成线程可以调用 next,complete 或 error。

Flux<String> bridge=Flux.push(sink->{
    
    
        myEventProcessor.register(new SingleThreadEventListener<String>(){
    
    
            public void onDataChunk(List<String> chunk){
    
    
                for(String s:chunk){
    
    
                    sink.next(s);
                }
            }
            public void processComplete(){
    
    
                sink.complete();
            }
            public void processError(Throwable e){
    
    
            sink.error(e);
            }
        });
});

推送/拉取(push/pull)混合模式

不像 push,create 可以用于 push 或 pull 模式,因此适合桥接监听器的 的 API,因为事件消息会随时异步地到来。回调方法 onRequest 可以被注册到 FluxSink
以便跟踪请求。这个回调可以被用于从源头请求更多数据,或者通过在下游请求到来 的时候传递数据给 sink 以实现背压管理。这是一种推送/拉取混合的模式, 因为下游可以从上游拉取已经就绪的数据,上游也可以在数据就绪的时候将其推送到下游。

Flux<String> bridge = Flux.create(sink -> {
    
    
    myMessageProcessor.register(
      new MyMessageListener<String>() {
    
    

        public void onMessage(List<String> messages) {
    
    
          for(String s : messages) {
    
    
            sink.next(s); 
          }
        }
    });
    sink.onRequest(n -> {
    
    
        List<String> messages = myMessageProcessor.request(n); 
        for(String s : message) {
    
    
           sink.next(s); 
        }
    });

在上述的文章中,拆选和列举了常见的生成数据源的方式和方法。仅仅是为了方便更好的入门和理解,严谨性和完整性很难保证。如果要进一步学习,或者感觉有谬误,应该去官网查看更多的资料。

猜你喜欢

转载自blog.csdn.net/aofengdaxia/article/details/128906288