1. WebFlux 介绍
1.1 简述
Spring WebFlux是Spring Framework 5.0中引入的新的反应式Web框架。 与Spring MVC不同,它不基于 Servlet,完全异步和非阻塞,并通过 Reactor模块实现Reactive Streams规范,可以在诸如Netty,Undertow和Servlet 3.1+容器的服务器上运行
1.2 Reactor中的Mono和Flux
Flux
和 Mono
是 Reactor 中的两个基本概念,其大致含义如下:
Flux
表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。 当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用Mono
表示的是包含 0 或者 1 个元素的异步序列。 该序列中同样可以包含与 Flux 相同的三种类型的消息通知
Flux 和 Mono 之间可以进行转换。 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象,把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
1.3 两种开发模式
-
注解驱动
该模式与 SpringMVC 的开发模式几乎完全相同,基于注解@RequestMapping 来完成外部请求与内部处理方法的配置路由 -
函数式
基于 Java 8 之后的 lambda 表达式,RouterFunction注入代替@RequestMapping注解用于请求路由和处理
Spring WebFlux 使用HttpMessageReader
和HttpMessageWriter
接口来转换HTTP请求和响应,具体可参考 官方文档传送门
2. WebFlux 服务启动流程
-
SpringApplication
的静态 run() 方法会调用其构造方法,再调用其对象 run()方法// 静态 run() 方法 public static ConfigurableApplicationContext run(Class<?>[] primarySources, String[] args) { return new SpringApplication(primarySources).run(args); } // 构造方法,注意 webApplicationType 属性,决定了应用的类型 public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) { ...... this.webApplicationType = WebApplicationType.deduceFromClasspath(); ...... } // 对象 run 方法 public ConfigurableApplicationContext run(String... args) { StopWatch stopWatch = new StopWatch(); try { ...... // 1. 根据 webApplicationType 属性选择对应的 ApplicationContext 初始化 context = createApplicationContext(); // 2. 调用 AbstractApplicationContext#refresh() 方法创建 Bean refreshContext(context); ...... }
-
SpringApplication#createApplicationContext()
方法根据webApplicationType 属性选择对应的 ApplicationContext 进行初始化。WebFlux 是REACTIVE
类型的应用,则初始化AnnotationConfigReactiveWebServerApplicationContext
protected ConfigurableApplicationContext createApplicationContext() { Class<?> contextClass = this.applicationContextClass; if (contextClass == null) { try { switch (this.webApplicationType) { case SERVLET: contextClass = Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS); break; case REACTIVE: contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS); break; default: contextClass = Class.forName(DEFAULT_CONTEXT_CLASS); } } catch (ClassNotFoundException ex) { throw new IllegalStateException( "Unable create a default ApplicationContext, " + "please specify an ApplicationContextClass", ex); } } return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass); }
-
SpringApplication#refreshContext()
方法刷新AnnotationConfigReactiveWebServerApplicationContext
。该类的继承结构向上可追溯到AbstractApplicationContext
,而Spring 框架中所有的容器初始化都将调用到 AbstractApplicationContext#refresh() 方法,这部分可参考 Spring 启动流程源码解析,此处不深入解析,主要关注其中的两个子类实现方法onRefresh()
和finishRefresh()
public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { ...... try { ...... // Initialize other special beans in specific context subclasses. onRefresh(); ...... // Last step: publish corresponding event. finishRefresh(); } ...... }
-
已知实例化的 ApplicationContext 为
AnnotationConfigReactiveWebServerApplicationContext
,追溯其方法重写,可发现onRefresh()
是在其父类ReactiveWebServerApplicationContext
中实现的protected void onRefresh() { super.onRefresh(); try { createWebServer(); } catch (Throwable ex) { throw new ApplicationContextException("Unable to start reactive web server", ex); } }
-
createWebServer()
方法将在容器初始化的过程中创建ServerManager
实例,该类是一个重要的封装类,其两个成员变量 WebServer 为底层服务器抽象,HttpHandler 为上层方法处理者的抽象,创建WebServer对象时将自身传入WebServer中,作为服务器与上层之间的桥接用于处理请求。此步骤之后具体服务器的创建不做分析,因为不同的服务器可能有不同的实现,以上流程图以NettyWebServer
为例子理解。总结来说,ReactiveWebServerApplicationContext#onRefresh()
方法主要是完成了服务器创建的工作,需注意此时上层的方法处理者(HttpHandler)还没有实质注入到 ServerManager中private void createWebServer() { ServerManager serverManager = this.serverManager; if (serverManager == null) { // getWebServerFactory() 默认为 NettyReactiveWebServerFactory this.serverManager = ServerManager.get(getWebServerFactory()); } initPropertySources(); } // 创建 ServerManager,其内部根据 WebServerFactory 的不同,创建不同的服务器实例 public static ServerManager get(ReactiveWebServerFactory factory) { return new ServerManager(factory); }
-
创建服务器之后,下一步自然是启动服务器。该动作在
ReactiveWebServerApplicationContext#finishRefresh()
方法中触发,关键方法为startReactiveWebServer()
@Override protected void finishRefresh() { super.finishRefresh(); WebServer webServer = startReactiveWebServer(); if (webServer != null) { publishEvent(new ReactiveWebServerInitializedEvent(webServer, this)); } }
-
startReactiveWebServer()
通过ServerManager
实例启动服务器,并将上层的方法处理者(HttpHandler,实例为 HttpWebHandlerAdapter 对象)实际注入到 ServerManager中,便于服务器底层接收到网络请求后将其转化为 HttpRequest,并将请求丢入上层的处理方法。不同服务器实现可能不一样,此处不作具体分析,请参考以上流程图private WebServer startReactiveWebServer() { ServerManager serverManager = this.serverManager; ServerManager.start(serverManager, this::getHttpHandler); return ServerManager.getWebServer(serverManager); } // ServerManager 的 start() 方法 public static void start(ServerManager manager, Supplier<HttpHandler> handlerSupplier) { if (manager != null && manager.server != null) { manager.handler = handlerSupplier.get(); // 启动服务器实例 manager.server.start(); } }
-
服务器启动之后,实际的处理过程为服务器接收到网络请求,通知状态变化生成上层能识别的 HttpRequest 和 HttpResponse后,将二者在
HttpWebHandlerAdapter
中进一步包装为ServerWebExchange
对象,最终将其入参调用DispatcherHandler#handle()
,真正完成请求处理