Gateway实现原理笔记

引入spring-cloud-gateway依赖,查看其MATE-INF下的spring.factories文件,可以看到以org.springframework.boot.autoconfigure.EnableAutoConfiguration为key,配置了GatewayClassPathWarningAutoConfiguration,GatewayAutoConfiguration,GatewayLoadBalancerClientAutoConfiguration,GatewayRedisAutoConfiguration等value,当SpringBoot项目启动后,会通过SPI机制,去加载这些配置类。

 在GatewayAutoConfiguration中,通过spring.cloud.gateway.enabled控制网关的开启和关闭,默认开启。同时初始化了各种Bean,GatewayFilterFactory(比如AddRequestHeaderGatewayFilterFactory、AddResponseHeaderGatewayFilterFactory等)、全局的GlobalFilter(比如NettyRoutingFilter、NettyWriteResponseFilter)、RoutePredicateFactory(比如AfterRoutePredicateFactory、HostRoutePredicateFactory等)、FilteringWebHandler(其将所有的全局的GlobalFilter作为一个变量globalFilters,用于构建了GatewayFilterChain拦截器链)、PropertiesRouteDefinitionLocator(用于从配置文件中,通过GatewayProperties读取配置的所有路由信息RouteDefinition)、CompositeRouteDefinitionLocator(组合多种 RouteDefinitionLocator 的实现,为 RouteDefinitionRouteLocator 提供统一入口,调用其getRouteDefinitions方法,实现统一从多种数据来源加载Routes)、RouteDefinitionRouteLocator(用于将RouteDefinition转换成Route)、RoutePredicateHandlerMapping(类似于Springmvc的RequestMappingHandlerMapping,用于根据请求的url找到对应的路由策略Route,初始化时注入了FilteringWebHandler,CachingRouteLocator),其中RoutePredicateHandlerMapping的是这些类中最重要的Bean。

首先查看Route是如何生成的,以配置文件配置配置spring.cloud.gateway.routes为例,RoutePredicateHandlerMapping.lookupRoute方法是调用了routeLocator(即CachingRouteLocator)的getRoutes()方法来获取Route的,CachingRouteLocator将获取Route的过程委托给了其delegate(CompositeRouteLocator实例对象),CompositeRouteLocator的getRoutes方法则循环调用了具体的RouteLocator的getRoutes方法(如RouteDefinitionRouteLocator),着重看RouteDefinitionRouteLocator,RouteDefinitionRouteLocator.getRoutes方法调用CompositeRouteDefinitionLocator.getRouteDefinitions的获取所有的spring.cloud.gateway.routes配置,然后通过convertToRoute方法,将RouteDefinition转换为Route。

public Flux<Route> getRoutes() {
    Flux routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
    if(!this.gatewayProperties.isFailOnRouteDefinitionError()) {
        routes = routes.onErrorContinue((error, obj) -> {
            if(this.logger.isWarnEnabled()) {
                this.logger.warn("RouteDefinition id " + ((RouteDefinition)obj).getId() + " will be ignored. Definition has invalid configs, " + error.getMessage());
            }

        });
    }

    return routes.map((route) -> {
        if(this.logger.isDebugEnabled()) {
            this.logger.debug("RouteDefinition matched: " + route.getId());
        }

        return route;
    });
}
private Route convertToRoute(RouteDefinition routeDefinition) {
    //根据routeDefinition的predicates信息,获取配置的predicates,并进行串联
    AsyncPredicate predicate = this.combinePredicates(routeDefinition);
    //根据routeDefinition的filters信息,获取配置的GatewayFilter
    List gatewayFilters = this.getFilters(routeDefinition);
    return ((AsyncBuilder)Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters)).build();
}
List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) {
    ArrayList ordered = new ArrayList(filterDefinitions.size());

    for(int i = 0; i < filterDefinitions.size(); ++i) {
        FilterDefinition definition = (FilterDefinition)filterDefinitions.get(i);
        //根据配置的过滤器的名称获取具体的GatewayFilterFactory,比如配置的AddRequestHeader,真正的类为AddRequestHeaderGatewayFilterFactory,具体查看GatewayFilterFactory.name方法        GatewayFilterFactory factory = (GatewayFilterFactory)this.gatewayFilterFactories.get(definition.getName());
        if(factory == null) {
            throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName());
        }

        if(this.logger.isDebugEnabled()) {
            this.logger.debug("RouteDefinition " + id + " applying filter " + definition.getArgs() + " to " + definition.getName());
        }
        //封装配置的filters的参数信息
        Object configuration = this.configurationService.with(factory).name(definition.getName()).properties(definition.getArgs()).eventFunction((bound, properties) -> {
            return new FilterArgsEvent(this, id, (Map)properties);
        }).bind();
        if(configuration instanceof HasRouteId) {
            HasRouteId gatewayFilter = (HasRouteId)configuration;
            gatewayFilter.setRouteId(id);
        }
        //通过GatewayFliterFactory实现类的apply方法,创建GatewayFilter
        GatewayFilter var9 = factory.apply(configuration);
        if(var9 instanceof Ordered) {
            ordered.add(var9);
        } else {
            ordered.add(new OrderedGatewayFilter(var9, i + 1));
        }
    }

    return ordered;
}

请求网关时,会调用org.springframework.web.reactive.DispatcherHandler的handle(ServerWebExchange exchange)方法,会遍历所有的HandlerMapping对象(处理器映射器),遍历去调用HandlerMapping对象的getHandler方法(即AbstractHandlerMapping的getHandler),在AbstractHandlerMapping.getHandler方法中会调用AbstractHandlerMapping实现类的getHandlerInternal方法,对于Gateway来说,着重看RoutePredicateHandlerMapping,

public Mono<Void> handle(ServerWebExchange exchange) {
    return this.handlerMappings == null?this.createNotFoundError():Flux.fromIterable(this.handlerMappings).concatMap((mapping) -> {
        return mapping.getHandler(exchange);
    }).next().switchIfEmpty(this.createNotFoundError()).flatMap((handler) -> {
        return this.invokeHandler(exchange, handler);
    }).flatMap((result) -> {
        return this.handleResult(exchange, result);
    });
}
public Mono<Object> getHandler(ServerWebExchange exchange) {
    return this.getHandlerInternal(exchange).map((handler) -> {
        if(this.logger.isDebugEnabled()) {
            this.logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
        }

        ServerHttpRequest request = exchange.getRequest();
        if(this.hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
            CorsConfiguration config = this.corsConfigurationSource != null?this.corsConfigurationSource.getCorsConfiguration(exchange):null;
            CorsConfiguration handlerConfig = this.getCorsConfiguration(handler, exchange);
            config = config != null?config.combine(handlerConfig):handlerConfig;
            if(!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
                return REQUEST_HANDLED_HANDLER;
            }
        }

        return handler;
    });
}

在 RoutePredicateHandlerMapping的getHandlerInternal中,通过lookupRoute方法匹配到对应的Route,中会并将匹配到的Route保存到ServerWebExchange 的属性中,然后返回FilteringWebHandler处理器,处理请求。

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
    if(this.managementPortType == RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort.intValue()) {
        return Mono.empty();
    } else {
        exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_HANDLER_MAPPER_ATTR, this.getSimpleName());
        //找到匹配的Route
        return this.lookupRoute(exchange).flatMap((r) -> {
            exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR);
            if(this.logger.isDebugEnabled()) {
                this.logger.debug("Mapping [" + this.getExchangeDesc(exchange) + "] to " + r);
            }
            //将Route保存到ServerWebExchange的属性中,后面的过滤器会用到这个属性
            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR, r);
            //返回FilteringWebHandler。用于构造拦截器链,处理请求
            return Mono.just(this.webHandler);
        }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
            exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR);
            if(this.logger.isTraceEnabled()) {
                this.logger.trace("No RouteDefinition found for [" + this.getExchangeDesc(exchange) + "]");
            }

        })));
    }
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
    //循环遍历所有的Route
    return this.routeLocator.getRoutes().concatMap((route) -> {
        return Mono.just(route).filterWhen((r) -> {
            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
            //调用Route的组合Predicate进行匹配
            return (Publisher)r.getPredicate().apply(exchange);
        }).doOnError((e) -> {
            this.logger.error("Error applying predicate for route: " + route.getId(), e);
        }).onErrorResume((e) -> {
            return Mono.empty();
        });
    }).next().map((route) -> {
        if(this.logger.isDebugEnabled()) {
            this.logger.debug("Route matched: " + route.getId());
        }

        this.validateRoute(route, exchange);
        //返回匹配到的Route
        return route;
    });
}

回到DispatcherHandler的handle(ServerWebExchange exchange)方法 ,获得匹配的Route和对应的handler(FilteringWebHandler)之后,调用DispatcherHandler.invokeHandler方法,获取匹配的处理器适配器之后,调用其handle方法,即SimpleHandlerAdapter的handle方法,SimpleHandlerAdapter.handle中,将处理逻辑委托给了WebHandler.handle方法。

private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
    if(this.handlerAdapters != null) {
        Iterator var3 = this.handlerAdapters.iterator();

        while(var3.hasNext()) {//遍历所有的HandlerAdapter
            HandlerAdapter handlerAdapter = (HandlerAdapter)var3.next();
            if(handlerAdapter.supports(handler)) {//匹配能处理FilteringWebHandler的处理器适配器
                return handlerAdapter.handle(exchange, handler);//调用SimpleHandlerAdapter的handle方法
            }
        }
    }

    return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}

SimpleHandlerAdapter.handle

public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
    WebHandler webHandler = (WebHandler)handler;
    Mono mono = webHandler.handle(exchange);
    return mono.then(Mono.empty());
}

在WebHandler.handle方法中,首先会从exchange属性中获取保存的Route,从Route中得到其专属的GatewayFilter,再和全局的globalFilters合并通过order大小排序,最后通过DefaultGatewayFilterChain构建拦截器链,执行过滤。主要包括RouteToRequestUrlFilter

public Mono<Void> handle(ServerWebExchange exchange) {
    //获取保存在exchange属性中的route 
    Route route = (Route)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
    //获取Route独有的GatewayFilter
    List gatewayFilters = route.getFilters();
    //获取全局的globalFilters
    ArrayList combined = new ArrayList(this.globalFilters);
    combined.addAll(gatewayFilters);
    AnnotationAwareOrderComparator.sort(combined);//所有的Filter通过Ordered排序
    if(logger.isDebugEnabled()) {
        logger.debug("Sorted gatewayFilterFactories: " + combined);
    }
    //构建过滤器执行链
    return (new FilteringWebHandler.DefaultGatewayFilterChain(combined)).filter(exchange);
}

RouteToRequestUrlFilter.filter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
    if(route == null) {
        return chain.filter(exchange);
    } else {
        log.trace("RouteToRequestUrlFilter start");
        URI uri = exchange.getRequest().getURI();
        boolean encoded = ServerWebExchangeUtils.containsEncodedParts(uri);
        URI routeUri = route.getUri();//获取Route设置的uri
        if(hasAnotherScheme(routeUri)) {
            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
            routeUri = URI.create(routeUri.getSchemeSpecificPart());
        }

        if("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
            throw new IllegalStateException("Invalid host: " + routeUri.toString());
        } else {
            //根据请求的uri和Route设置的uri得到改写后的uri,并保存到exchange的属性中
            URI mergedUrl = UriComponentsBuilder.fromUri(uri).scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();
            exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, mergedUrl);
            return chain.filter(exchange);
        }
    }
}

猜你喜欢

转载自blog.csdn.net/qq_33513289/article/details/109545538