RouteDefinition实体类 记录了 PredicateDefinition 和 FilterDefinition
@NotEmpty
@Valid
private List predicates = new ArrayList<>();
@Valid
private List filters = new ArrayList<>();
PredicateDefinition 记录 断言信息
@Validated
public class PredicateDefinition {
@NotNull
private String name;
private Map args = new LinkedHashMap<>();
public PredicateDefinition() {
}
public PredicateDefinition(String text) {
int eqIdx = text.indexOf('=');
if (eqIdx <= 0) {
throw new ValidationException("Unable to parse PredicateDefinition text '"
+ text + "'" + ", must be of the form name=value");
}
setName(text.substring(0, eqIdx));
String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");
for (int i = 0; i < args.length; i++) {
this.args.put(NameUtils.generateName(i), args[i]);
}
}
}
FilterDefinition 记录 过滤拦截信息
@Validated
public class FilterDefinition {
@NotNull
private String name;
private Map args = new LinkedHashMap<>();
public FilterDefinition() {
}
public FilterDefinition(String text) {
int eqIdx = text.indexOf('=');
if (eqIdx <= 0) {
setName(text);
return;
}
setName(text.substring(0, eqIdx));
String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");
for (int i = 0; i < args.length; i++) {
this.args.put(NameUtils.generateName(i), args[i]);
}
}
}
数据表创建
-- 路由规则表
CREATE TABLE `route_rule` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`route_id` varchar(255) DEFAULT NULL COMMENT '路由ID ',
`uri` varchar(255) DEFAULT NULL COMMENT '目标地址',
`ordered` int(11) DEFAULT NULL COMMENT '加载顺序',
`creator` varchar(30) DEFAULT NULL COMMENT '创建者',
`created_time` datetime DEFAULT NULL COMMENT '创建时间',
`updator` varchar(30) DEFAULT NULL COMMENT '修改人',
`updated_time datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 路由参数表 主要用来存储 断言器Predicates和过滤器Filter
-- route_rule与route_args 存在一对多的映射关系
CREATE TABLE `route_args` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`type` int(11) DEFAULT NULL COMMENT '参数类型 0:断言器 1:过滤器',
`name` varchar(255) DEFAULT NULL COMMENT '断言器名称 例如: Path RewritePath',
`args_name` varchar(255) DEFAULT NULL COMMENT '参数名称',
`args_value` varchar(255) DEFAULT NULL COMMENT '参数值',
`route_id` varchar(255) DEFAULT NULL COMMENT '表route_id中的字段route_id',
`creator` varchar(30) DEFAULT NULL COMMENT '创建者',
`created_time` datetime DEFAULT NULL COMMENT '创建时间',
`updator` varchar(30) DEFAULT NULL COMMENT '修改人',
`updated_time datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
使用mysql存储记录路由信息,通过自定义 MysqlRouteDefinitionRepository 实现RouteDefinitionRepository 接口 完成路由信息的动态加载,如果没有自定义实现RouteDefinitionRepository 接口 ,会默认使用内存存储形式
在GatewayAutoConfiguration 源码部分有说过,如果没有自定义注入的RouteDefinitionRepository会使用内存加载的形式
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
public class MySQLRouteDefinitionRepository implements RouteDefinitionRepository {
@Autowired
private ReactiveStringRedisTemplate redisTemplate;
@Autowired
private GatewayRouteDao gatewayRouteDao;
@Autowired
private GatewayRouteArgsDao gatewayRouteArgsDao;
/**
* Gateway启动时会通过RouteDefinitionRouteLocator.getRoutes方法
* 将路由规则RouteDefinition转换为Route
* this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
* 加载到内存中
**/
@Override
public Flux getRouteDefinitions() {
List routePOS = gatewayRouteDao.getGatewayRoute();
return Flux.fromIterable(GatewayRouteDefinition.toRouteDefinition(routePOS));
}
@Override
@Transactional(rollbackFor = Exception.class)
public Mono save(Mono route) {
RouteDefinition definition = route.block();
if (ObjectUtils.isEmpty(definition.getId())) {
return Mono.error(new IllegalArgumentException("id may not be empty"));
}
GatewayRoutePO routePO = GatewayRoutePO.toGatewayRoute(definition);
List filter = GatewayRouteArgsPO.toGatewayRouteFilterArgs(definition.getFilters(),definition.getId());
List predicate = GatewayRouteArgsPO.toGatewayRoutePredictArgs(definition.getPredicates(),definition.getId());
//1:先查询
GatewayRoutePO gatewayRoutePOS = gatewayRouteDao.findGatewayRouteByRouteId(definition.getId());
if (!Objects.isNull(gatewayRoutePOS)) {
//更新
gatewayRouteDao.updateGatewayRoute(routePO);
gatewayRouteArgsDao.batchUpdateGatewayArgs(filter);
gatewayRouteArgsDao.batchUpdateGatewayArgs(predicate);
} else {
//2: 保存
gatewayRouteDao.saveGatewayRoute(routePO);
gatewayRouteArgsDao.bathSaveGatewayArgs(filter);
gatewayRouteArgsDao.bathSaveGatewayArgs(predicate);
}
return Mono.empty();
}
@Override
@Transactional(rollbackFor = Exception.class)
public Mono delete(Mono routeId) {
// return routeId.flatMap(r -> {
// if (ObjectUtils.isEmpty(r)) {
// return Mono.error(new IllegalArgumentException("id may not be empty"));
// }
// //删除路由以及路由 参数
// gatewayRouteDao.deleteRoute(r);
// return Mono.empty();
// });
String routId = routeId.block();
if (ObjectUtils.isEmpty(routId)) {
return Mono.defer(() -> Mono.error(
new NotFoundException("RouteDefinition not found: " + routeId)));
}
gatewayRouteDao.deleteRoute(routId);
return Mono.empty();
}
}
在项目中添加actuator依赖,并配置可以监控gateway
org.springframework.boot
spring-boot-starter-actuator
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
可以通过接口查看加载的路由信息
请求路径:http://localhost:8085/actuator/gateway/routes 对应 GatewayControllerEndpoint 的routes 方法
响应结果:
[
{
"predicate": "Paths: [/admin/**], match trailing slash: true",
"route_id": "authorication-admin",
"filters": [],
"uri": "lb://authorication-admin",
"order": 0
},
{
"predicate": "Paths: [/cloud-gateway/**], match trailing slash: true",
"metadata": {
"nacos.instanceId": "192.168.234.1#8085#DEFAULT#DEFAULT_GROUP@@cloud-gateway",
"nacos.weight": "1.0",
"nacos.cluster": "DEFAULT",
"nacos.ephemeral": "true",
"nacos.healthy": "true",
"preserved.register.source": "SPRING_CLOUD"
},
"route_id": "ReactiveCompositeDiscoveryClient_cloud-gateway",
"filters": [
"[[RewritePath /cloud-gateway/(?.*) = '/${remaining}'], order = 1]"
],
"uri": "lb://cloud-gateway",
"order": 0
},
{
"predicate": "Paths: [/authorication-admin/**], match trailing slash: true",
"metadata": {
"nacos.instanceId": "192.168.234.1#8084#DEFAULT#DEFAULT_GROUP@@authorication-admin",
"nacos.weight": "1.0",
"nacos.cluster": "DEFAULT",
"nacos.ephemeral": "true",
"nacos.healthy": "true",
"preserved.register.source": "SPRING_CLOUD"
},
"route_id": "ReactiveCompositeDiscoveryClient_authorication-admin",
"filters": [
"[[RewritePath /authorication-admin/(?.*) = '/${remaining}'], order = 1]"
],
"uri": "lb://authorication-admin",
"order": 0
}
]
其中下面这块是我通过实现RouteDefinitionRepository接口,自定义保存的动态路由数据
{
“predicate”: “Paths: [/admin/**], match trailing slash: true”,
“route_id”: “authorication-admin”,
“filters”: [],
“uri”: “lb://authorication-admin”,
“order”: 0
},
手动刷新路由: http://localhost:8085/actuator/gateway/refresh 这是post请求 对应AbstractGatewayControllerEndpoint 类的refresh 方法
AbstractGatewayControllerEndpoint refresh方法
@PostMapping("/refresh")
public Mono refresh() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
return Mono.empty();
}
动态路由信息加载过程如下所示:
在上一篇文章 Gateway网关-源码讲解从GatewayAutoConfiguration开始
已经从源码分析了gateway路由刷新事件的过程,首先会由RouteRefreshListener 监听到ContextRefreshedEvent事件
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent
|| event instanceof RefreshScopeRefreshedEvent
|| event instanceof InstanceRegisteredEvent) {
reset();
}
private void reset() {
this.publisher.publishEvent(new RefreshRoutesEvent(this));
}
之后会发布一个路由刷新事件,也正是发布路由刷新事件,才会重新执行一遍RouteDefinitionLocator 的getRouteDefinitions 去获取我通过实现RouteDefinitionRepository接口写入的路由信息
public interface RouteDefinitionRepository
extends RouteDefinitionLocator, RouteDefinitionWriter {
}
这个接集成了RouteDefinitionLocator 接口 和RouteDefinitionWriter 接口
其中RouteDefinitionWriter 接口实现动态路由信息的save和delete操作 ,而RouteDefinitionLocator 接口可以获取刷新的路由信息
public interface RouteDefinitionWriter {
Mono save(Mono route);
Mono delete(Mono routeId);
}
public interface RouteDefinitionLocator {
Flux getRouteDefinitions();
}
在CachingRouteDefinitionLocator里会去监听收到的RefreshRoutesEvent事件,会去获取RouteDefinition
public class CachingRouteDefinitionLocator
implements RouteDefinitionLocator, ApplicationListener {
@Override
public void onApplicationEvent(RefreshRoutesEvent event) {
fetch().materialize().collect(Collectors.toList())
.doOnNext(routes -> cache.put(CACHE_KEY, routes)).subscribe();
}
private Flux fetch() {
return this.delegate.getRouteDefinitions();
}
}
这个类重点关注对象delegate,默认是CompositeRouteDefinitionLocator 会执行 他的getRouteDefinitions方法
private final RouteDefinitionLocator delegate;
@Override
public Flux getRouteDefinitions() {
return this.delegates
.flatMapSequential(RouteDefinitionLocator::getRouteDefinitions)
.flatMap(routeDefinition -> {
if (routeDefinition.getId() == null) {
return randomId().map(id -> {
routeDefinition.setId(id);
if (log.isDebugEnabled()) {
log.debug(
"Id set on route definition: " + routeDefinition);
}
return routeDefinition;
});
}
return Mono.just(routeDefinition);
});
}
自已可以debug看下,CompositeRouteDefinitionLocator 在执行getRouteDefinitions的时候,会调用其他实现RouteDefinitionLocator接口的类也去调用他的getRouteDefinitions方法,这里面就包含我们自定义实现RouteDefinitionRepository接口的类
其中 CachingRouteLocator类也监听了 RefreshRoutesEvent事件,实现的是RouteLocator 接口,这里可以对比下RouteLocator 接口和RouteDefinitionLocator接口,他们都是去获取路由信息,只不过是返回的封装对象不同,一个是Route 路由信息用于接口封装返回路由信息,一个是 RouteDefinition 用于定义获取路由信息
public interface RouteLocator {
Flux getRoutes();
}
public interface RouteDefinitionLocator {
Flux getRouteDefinitions();
}
public class CachingRouteLocator implements Ordered, RouteLocator,
ApplicationListener, ApplicationEventPublisherAware {
private final RouteLocator delegate;
private ApplicationEventPublisher applicationEventPublisher;
private Flux fetch() {
return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
}
@Override
public void onApplicationEvent(RefreshRoutesEvent event) {
try {
fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
.materialize().collect(Collectors.toList()).subscribe(signals -> {
//发布RefreshRoutesResultEvent 事件,获取路由刷新的结果 是否成功,可以自定义实现监听RefreshRoutesResultEvent事件捕获路由刷新结果
applicationEventPublisher
.publishEvent(new RefreshRoutesResultEvent(this));
cache.put(CACHE_KEY, signals);
}, throwable -> handleRefreshError(throwable)));
}
catch (Throwable e) {
handleRefreshError(e);
}
}
}
分享个人一个开源的项目,里面有具体使用到网关gateway统一处理异常,动态路由加载,网关限流处理,网关统一处理请求,请求认证处理等,并且集成了日志追踪,日志分析,RPC调用,动态限流规则加载,分布式事务,高并发限流处理等
项目地址:GITEE项目链接地址