• 网关gateway - 自定义实现动态路由信息存储记载


    RouteDefinition实体类 记录了 PredicateDefinition 和 FilterDefinition

    	@NotEmpty
    	@Valid
    	private List predicates = new ArrayList<>();
    
    	@Valid
    	private List filters = new ArrayList<>();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    PredicateDefinition 记录 断言信息

    @Validated
    public class PredicateDefinition {
    	@NotNull
    	private String name;
    
    	private Map args = new LinkedHashMap<>();
    
    	public PredicateDefinition() {
    	}
    
    	public PredicateDefinition(String text) {
    		int eqIdx = text.indexOf('=');
    		if (eqIdx <= 0) {
    			throw new ValidationException("Unable to parse PredicateDefinition text '"
    					+ text + "'" + ", must be of the form name=value");
    		}
    		setName(text.substring(0, eqIdx));
    
    		String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");
    
    		for (int i = 0; i < args.length; i++) {
    			this.args.put(NameUtils.generateName(i), args[i]);
    		}
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    FilterDefinition 记录 过滤拦截信息

    @Validated
    public class FilterDefinition {
    
    	@NotNull
    	private String name;
    
    	private Map args = new LinkedHashMap<>();
    
    	public FilterDefinition() {
    	}
    
    	public FilterDefinition(String text) {
    		int eqIdx = text.indexOf('=');
    		if (eqIdx <= 0) {
    			setName(text);
    			return;
    		}
    		setName(text.substring(0, eqIdx));
    
    		String[] args = tokenizeToStringArray(text.substring(eqIdx + 1), ",");
    
    		for (int i = 0; i < args.length; i++) {
    			this.args.put(NameUtils.generateName(i), args[i]);
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    数据表创建

    -- 路由规则表
    CREATE TABLE `route_rule` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
      `route_id` varchar(255) DEFAULT NULL COMMENT '路由ID ',
      `uri` varchar(255) DEFAULT NULL COMMENT '目标地址',
      `ordered` int(11) DEFAULT NULL COMMENT '加载顺序',
      `creator` varchar(30) DEFAULT  NULL COMMENT  '创建者',
      `created_time` datetime DEFAULT NULL COMMENT '创建时间',
      `updator` varchar(30) DEFAULT  NULL COMMENT  '修改人',
      `updated_time datetime DEFAULT NULL COMMENT '修改时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
     
    -- 路由参数表 主要用来存储 断言器Predicates和过滤器Filter
    -- route_rule与route_args 存在一对多的映射关系
    CREATE TABLE `route_args` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `type` int(11) DEFAULT NULL COMMENT '参数类型 0:断言器 1:过滤器',
      `name` varchar(255) DEFAULT NULL COMMENT '断言器名称 例如: Path RewritePath',
      `args_name` varchar(255) DEFAULT NULL COMMENT '参数名称',
      `args_value` varchar(255) DEFAULT NULL COMMENT '参数值',
      `route_id` varchar(255) DEFAULT NULL COMMENT '表route_id中的字段route_id',
      `creator` varchar(30) DEFAULT  NULL COMMENT  '创建者',
      `created_time` datetime DEFAULT NULL COMMENT '创建时间',
      `updator` varchar(30) DEFAULT  NULL COMMENT  '修改人',
      `updated_time datetime DEFAULT NULL COMMENT '修改时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    使用mysql存储记录路由信息,通过自定义 MysqlRouteDefinitionRepository 实现RouteDefinitionRepository 接口 完成路由信息的动态加载,如果没有自定义实现RouteDefinitionRepository 接口 ,会默认使用内存存储形式

      在GatewayAutoConfiguration 源码部分有说过,如果没有自定义注入的RouteDefinitionRepository会使用内存加载的形式
    	@Bean
    	@ConditionalOnMissingBean(RouteDefinitionRepository.class)
    	public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
    		return new InMemoryRouteDefinitionRepository();
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    public class MySQLRouteDefinitionRepository implements RouteDefinitionRepository {
    
        @Autowired
        private ReactiveStringRedisTemplate redisTemplate;
        @Autowired
        private GatewayRouteDao gatewayRouteDao;
        @Autowired
        private GatewayRouteArgsDao gatewayRouteArgsDao;
    
        /**
         * Gateway启动时会通过RouteDefinitionRouteLocator.getRoutes方法
         * 将路由规则RouteDefinition转换为Route
         * this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
         * 加载到内存中
         **/
        @Override
        public Flux getRouteDefinitions() {
            List routePOS = gatewayRouteDao.getGatewayRoute();
            return Flux.fromIterable(GatewayRouteDefinition.toRouteDefinition(routePOS));
        }
    
        @Override
        @Transactional(rollbackFor = Exception.class)
        public Mono save(Mono route) {
            RouteDefinition definition = route.block();
            if (ObjectUtils.isEmpty(definition.getId())) {
                return Mono.error(new IllegalArgumentException("id may not be empty"));
            }
            GatewayRoutePO routePO = GatewayRoutePO.toGatewayRoute(definition);
            List filter = GatewayRouteArgsPO.toGatewayRouteFilterArgs(definition.getFilters(),definition.getId());
            List predicate = GatewayRouteArgsPO.toGatewayRoutePredictArgs(definition.getPredicates(),definition.getId());
            //1:先查询
            GatewayRoutePO gatewayRoutePOS = gatewayRouteDao.findGatewayRouteByRouteId(definition.getId());
            if (!Objects.isNull(gatewayRoutePOS)) {
                //更新
                gatewayRouteDao.updateGatewayRoute(routePO);
                gatewayRouteArgsDao.batchUpdateGatewayArgs(filter);
                gatewayRouteArgsDao.batchUpdateGatewayArgs(predicate);
            } else {
                //2: 保存
                gatewayRouteDao.saveGatewayRoute(routePO);
                gatewayRouteArgsDao.bathSaveGatewayArgs(filter);
                gatewayRouteArgsDao.bathSaveGatewayArgs(predicate);
            }
    
            return Mono.empty();
        }
    
    
        @Override
        @Transactional(rollbackFor = Exception.class)
        public Mono delete(Mono routeId) {
    //        return routeId.flatMap(r -> {
    //            if (ObjectUtils.isEmpty(r)) {
    //                return Mono.error(new IllegalArgumentException("id may not be empty"));
    //            }
    //            //删除路由以及路由 参数
    //            gatewayRouteDao.deleteRoute(r);
    //            return Mono.empty();
    //        });
    
            String routId = routeId.block();
            if (ObjectUtils.isEmpty(routId)) {
                return Mono.defer(() -> Mono.error(
                    new NotFoundException("RouteDefinition not found: " + routeId)));
            }
            gatewayRouteDao.deleteRoute(routId);
            return Mono.empty();
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    在项目中添加actuator依赖,并配置可以监控gateway

            
                org.springframework.boot
                spring-boot-starter-actuator
            
    
    • 1
    • 2
    • 3
    • 4
    management:
      endpoints:
        web:
          exposure:
            include: '*'
      endpoint:
        health:
          show-details: always
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    可以通过接口查看加载的路由信息
    请求路径:http://localhost:8085/actuator/gateway/routes 对应 GatewayControllerEndpoint 的routes 方法
    在这里插入图片描述
    响应结果:

    [
    	{
    		"predicate": "Paths: [/admin/**], match trailing slash: true",
    		"route_id": "authorication-admin",
    		"filters": [],
    		"uri": "lb://authorication-admin",
    		"order": 0
    	},
    	{
    		"predicate": "Paths: [/cloud-gateway/**], match trailing slash: true",
    		"metadata": {
    			"nacos.instanceId": "192.168.234.1#8085#DEFAULT#DEFAULT_GROUP@@cloud-gateway",
    			"nacos.weight": "1.0",
    			"nacos.cluster": "DEFAULT",
    			"nacos.ephemeral": "true",
    			"nacos.healthy": "true",
    			"preserved.register.source": "SPRING_CLOUD"
    		},
    		"route_id": "ReactiveCompositeDiscoveryClient_cloud-gateway",
    		"filters": [
    			"[[RewritePath /cloud-gateway/(?.*) = '/${remaining}'], order = 1]"
    		],
    		"uri": "lb://cloud-gateway",
    		"order": 0
    	},
    	{
    		"predicate": "Paths: [/authorication-admin/**], match trailing slash: true",
    		"metadata": {
    			"nacos.instanceId": "192.168.234.1#8084#DEFAULT#DEFAULT_GROUP@@authorication-admin",
    			"nacos.weight": "1.0",
    			"nacos.cluster": "DEFAULT",
    			"nacos.ephemeral": "true",
    			"nacos.healthy": "true",
    			"preserved.register.source": "SPRING_CLOUD"
    		},
    		"route_id": "ReactiveCompositeDiscoveryClient_authorication-admin",
    		"filters": [
    			"[[RewritePath /authorication-admin/(?.*) = '/${remaining}'], order = 1]"
    		],
    		"uri": "lb://authorication-admin",
    		"order": 0
    	}
    ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    其中下面这块是我通过实现RouteDefinitionRepository接口,自定义保存的动态路由数据

    {
    “predicate”: “Paths: [/admin/**], match trailing slash: true”,
    “route_id”: “authorication-admin”,
    “filters”: [],
    “uri”: “lb://authorication-admin”,
    “order”: 0
    },

    手动刷新路由: http://localhost:8085/actuator/gateway/refresh 这是post请求 对应AbstractGatewayControllerEndpoint 类的refresh 方法
    在这里插入图片描述
    AbstractGatewayControllerEndpoint refresh方法

    @PostMapping("/refresh")
    	public Mono refresh() {
    		this.publisher.publishEvent(new RefreshRoutesEvent(this));
    		return Mono.empty();
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    动态路由信息加载过程如下所示:
    在上一篇文章 Gateway网关-源码讲解从GatewayAutoConfiguration开始
    已经从源码分析了gateway路由刷新事件的过程,首先会由RouteRefreshListener 监听到ContextRefreshedEvent事件

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
    	if (event instanceof ContextRefreshedEvent
    			|| event instanceof RefreshScopeRefreshedEvent
    			|| event instanceof InstanceRegisteredEvent) {
    		reset();
    	}
    		
    private void reset() {
    		this.publisher.publishEvent(new RefreshRoutesEvent(this));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    之后会发布一个路由刷新事件,也正是发布路由刷新事件,才会重新执行一遍RouteDefinitionLocator 的getRouteDefinitions 去获取我通过实现RouteDefinitionRepository接口写入的路由信息

    public interface RouteDefinitionRepository
    		extends RouteDefinitionLocator, RouteDefinitionWriter {
    
    }
    这个接集成了RouteDefinitionLocator 接口 和RouteDefinitionWriter  接口
    其中RouteDefinitionWriter  接口实现动态路由信息的save和delete操作 ,而RouteDefinitionLocator 接口可以获取刷新的路由信息
    public interface RouteDefinitionWriter {
    
    	Mono save(Mono route);
    
    	Mono delete(Mono routeId);
    
    }
    
    public interface RouteDefinitionLocator {
    
    	Flux getRouteDefinitions();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在CachingRouteDefinitionLocator里会去监听收到的RefreshRoutesEvent事件,会去获取RouteDefinition

    public class CachingRouteDefinitionLocator
    		implements RouteDefinitionLocator, ApplicationListener {
    		
       	@Override
    	public void onApplicationEvent(RefreshRoutesEvent event) {
    		fetch().materialize().collect(Collectors.toList())
    				.doOnNext(routes -> cache.put(CACHE_KEY, routes)).subscribe();
    	}
    	private Flux fetch() {
    		return this.delegate.getRouteDefinitions();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    这个类重点关注对象delegate,默认是CompositeRouteDefinitionLocator 会执行 他的getRouteDefinitions方法

    	private final RouteDefinitionLocator delegate;
    
    
    • 1
    • 2
    	@Override
    	public Flux getRouteDefinitions() {
    		return this.delegates
    				.flatMapSequential(RouteDefinitionLocator::getRouteDefinitions)
    				.flatMap(routeDefinition -> {
    					if (routeDefinition.getId() == null) {
    						return randomId().map(id -> {
    							routeDefinition.setId(id);
    							if (log.isDebugEnabled()) {
    								log.debug(
    										"Id set on route definition: " + routeDefinition);
    							}
    							return routeDefinition;
    						});
    					}
    					return Mono.just(routeDefinition);
    				});
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    自已可以debug看下,CompositeRouteDefinitionLocator 在执行getRouteDefinitions的时候,会调用其他实现RouteDefinitionLocator接口的类也去调用他的getRouteDefinitions方法,这里面就包含我们自定义实现RouteDefinitionRepository接口的类

    其中 CachingRouteLocator类也监听了 RefreshRoutesEvent事件,实现的是RouteLocator 接口,这里可以对比下RouteLocator 接口和RouteDefinitionLocator接口,他们都是去获取路由信息,只不过是返回的封装对象不同,一个是Route 路由信息用于接口封装返回路由信息,一个是 RouteDefinition 用于定义获取路由信息

    public interface RouteLocator {
    	Flux getRoutes();
    }
    
    public interface RouteDefinitionLocator {
    	Flux getRouteDefinitions();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    public class CachingRouteLocator implements Ordered, RouteLocator,
    		ApplicationListener, ApplicationEventPublisherAware {
    	
    	private final RouteLocator delegate;
    	private ApplicationEventPublisher applicationEventPublisher;
    	private Flux fetch() {
    		return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
    	}
    
       @Override
    	public void onApplicationEvent(RefreshRoutesEvent event) {
    		try {
    			fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
    					.materialize().collect(Collectors.toList()).subscribe(signals -> {
    					//发布RefreshRoutesResultEvent 事件,获取路由刷新的结果 是否成功,可以自定义实现监听RefreshRoutesResultEvent事件捕获路由刷新结果
    						applicationEventPublisher
    								.publishEvent(new RefreshRoutesResultEvent(this));
    						cache.put(CACHE_KEY, signals);
    					}, throwable -> handleRefreshError(throwable)));
    		}
    		catch (Throwable e) {
    			handleRefreshError(e);
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    分享个人一个开源的项目,里面有具体使用到网关gateway统一处理异常,动态路由加载,网关限流处理,网关统一处理请求,请求认证处理等,并且集成了日志追踪,日志分析,RPC调用,动态限流规则加载,分布式事务,高并发限流处理等
    项目地址:GITEE项目链接地址

  • 相关阅读:
    Java中的容器(二) 双例集合
    Nature文章使用认证Kamiya艾美捷抗胸腺嘧啶二聚体单抗方案
    Vue单文件组件(SFC)规范
    LLM - 绝对与相对位置编码 与 RoPE 旋转位置编码 源码
    3D封装技术发展
    docker-compose采用前后端分离部署vue和springboot项目
    shell脚本(一)环境变量
    力扣 899. 有序队列
    java.lang.OutOfMemoryError- unable to create new native thread 问题排查
    Vite HMR API
  • 原文地址:https://blog.csdn.net/huanglu0314/article/details/127556671