• Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现



    经验总结,如有错误请指正!

    背景

    Spring Gateway v3.0.1为响应式框架,但是在微服务中避免不了微服务之间相互调用,但微服务大部分都是Spring Boot开发的,属于阻塞式框架,所以这种情况就属于响应式程序调用阻塞式程序提供的接口;

    OpenFeign是阻塞式的,所以没办法在Gateway中直接使用,官方文档对响应式支持这样写的;
    Spring Cloud Openfeign | Reactive Support
    在这里插入图片描述
    意思就是有个 feign-reactive 的框架可以支持,这个框架是PlaytikaOSS根据OpenFeign开发的一个框架;

    但是我没有使用,我想自己封装一下,然后就可以实现像OpenFeign那样调用就可以了,也是增长一下水平,响应式框架的Http客户端有WebClient,可以基于WebClient调用封装一个类似OpenFeign的小实现(参考Openfeign)

    开搞

    项目pom.xml

    本次介绍的封装实现主要是依赖nacos、gateway、loadbalancer,如下所示,其余的按需自行增配

    
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-configartifactId>
    dependency>
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId>
    dependency>
    
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-gatewayartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-commonsartifactId>
    dependency>
    
    <dependency>
        <groupId>org.springframework.cloudgroupId>
        <artifactId>spring-cloud-starter-loadbalancerartifactId>
    dependency>
    
    <dependency>
        <groupId>com.github.ben-manes.caffeinegroupId>
        <artifactId>caffeineartifactId>
    dependency>
    <dependency>
        <groupId>org.springframeworkgroupId>
        <artifactId>spring-context-supportartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    调用处理方法

    先编写实际处理业务接口调用的WebClient实现方法,这里写了一个简单的工具类

    @Slf4j
    @Component
    public class SmartWebClient {
    
        private final WebClient smartWebClient;
    
    	// ReactorLoadBalancerExchangeFilterFunction是LoadBalance的一个负载
    	// 可以适配nacos的服务发现,将其注入到WebClient即可实现服务名称调用及负载
        public SmartWebClient(ReactorLoadBalancerExchangeFilterFunction lbFunction) {
            this.smartWebClient = WebClient.builder()
                    .filter(lbFunction)
                    // 默认请求头
                    .defaultHeader(WEB_CLIENT_HEADER, WEB_CLIENT_HEADER_VALUE)
                    .build();
        }
    
    	// TODO 调用方法,响应Mono类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子
        public <T> Mono<T> restMono(HttpMethod httpMethod, String url,
                                Map<String, String> params, MultiValueMap<String, String> body,
                                MediaType reqMediaType, MediaType respMediaType,
                                Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) {
            return smartWebClient
                    .method(httpMethod) // post get put ……
                    .uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数
                    .accept(respMediaType) // contentType
                    .contentType(reqMediaType) // contentType
                    // 这里可以放入BodyInserters.from……多种方法,按需使用
                    .body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收
                    .retrieve() // 发送请求
                    // 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常>
                    .onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle) 
                    // 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux
                    .bodyToMono(resultType);
        }
        
    	// TODO 调用方法,响应Flux类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子
        public <T> Flux<T> restFlux(HttpMethod httpMethod, String url,
                                Map<String, String> params, MultiValueMap<String, String> body,
                                MediaType reqMediaType, MediaType respMediaType,
                                Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) {
            return smartWebClient
                    .method(httpMethod) // post get put ……
                    .uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数
                    .accept(respMediaType) // contentType
                    .contentType(reqMediaType) // contentType
                    // 这里可以放入BodyInserters.from……多种方法,按需使用
                    .body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收
                    .retrieve() // 发送请求
                    // 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常>
                    .onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle) 
                    // 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux
                    .bodyToFlux(resultType);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    实体

    @Data
    public class WebClientRestInfo<T> {
    
        /**
         * 请求方式
         */
        private HttpMethod httpMethod = HttpMethod.GET;
    
        /**
         * 请求地址
         */
        private String url;
    
        /**
         * 请求path参数
         */
        private Map<String, Object> pathVariable;
    
        /**
         * 请求表单/Body
         */
        private MultiValueMap<String, Object> formValues;
    
        /**
         * 编码
         */
        private MediaType reqMediaType = MediaType.APPLICATION_JSON;
        private MediaType respMediaType = MediaType.APPLICATION_JSON;
    
        /**
         * 是否为Flux
         */
        private boolean isFlux = false;
    
        /**
         * 返回值类型
         */
        private Class<T> resultType;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    自定义注解(类~@FeignClient)

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    public @interface ReactiveFeignClient {
    
        String name();
    
        String path();
    
        Class<?> fallback() default void.class;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    使用注解定义服务调用方法

    和Openfeign一样,从其他服务的controller参考创建服务接口即可,实体在服务内创建即可,但返回结果需要加上Mono(单个结果)或者Flux(多个结果),这是响应式编程必要的

    @ReactiveFeignClient(name = "/order-service", path = "/test")
    public interface OrderService {
    
        @PostMapping(value = "/getOrder")
        Mono<OrderInfo> getOrder(@RequestParam String orderCode);
    }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    编写动态代理

    根据注解所在类,使用反射获取信息,调用业务处理方法
    这里使用JDK代理方式

    创建代理接口

    public interface ReactiveFeignClientProxy {
    
    	// 因为WebClient调用需要负载,所以需要将负载通过自动注入传进来
        <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    编写代理实现

    @Slf4j
    public class ReactiveFeignClientProxyCreator implements ReactiveFeignClientProxy {
    
        /**
         * 代理实例化
         *
         * @param type
         * @param lbFunction 需要传入LoadBalance负载均衡实例
         * @param 
         * @return
         */
        @Override
        public <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction) {
            log.info("ReactiveFeignClientProxyCreator.createProxy: {}", type);
            // 构建WebClient客户端
            SmartWebClient smartWebClient = new SmartWebClient(lbFunction);
    
            // 获取注解信息
            ReactiveFeignClient annotationClass = type.getAnnotation(ReactiveFeignClient.class);
            String serviceName = annotationClass.name();
            String serviceBasePath = annotationClass.path();
            Class fallbackClass = annotationClass.fallback();
    
            // 创建代理方法
            Object obj = Proxy.newProxyInstance(type.getClassLoader(), new Class<?>[]{type}, (proxy, method, args) -> {
                // 根据方法和参数得到调用服务器信息
                String url = "http://" + serviceName + serviceBasePath;
                
                // TODO 这里我是封装了对象,多个变量信息也是一样的,参考逻辑即可
                WebClientRestInfo<T> webClientRestInfo = new WebClientRestInfo<T>();
                webClientRestInfo.setHttpMethod(HttpMethod.GET);
                webClientRestInfo.setUrl(url);
                // 得到请求URL和请求方法
                contractUrlAndMethod(method, webClientRestInfo);
                // 得到调用的参数和body
                contractRequestParams(method, args, webClientRestInfo);
    
                // 返回flux还是mono
                // isAssignableFrom 判断类型是否是某个类的子类 instanceof 判断实例是否是某个类的子类
                boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class);
                webClientRestInfo.setFlux(isFlux);
    
                // 得到返回对象的实际类型
                Class<T> elementType = extractElementType(method.getGenericReturnType());
                webClientRestInfo.setResultType(elementType);
    
                // 调用rest 注意根据类型分别调用Mono和Flux两种请求处理
                CorePublisher<T> webClientResult = null;
                if (isFlux) {
                	// TODO 第二个参数为判断响应状态后的处理Function
                    webClientResult = smartWebClient.restFlux(webClientRestInfo, resp -> {
                        log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName());
                        return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail."));
                    }).onErrorResume(Exception.class, throwable -> {
                        // 失败时调用方法 onErrorResume可以使调用失败后按照自定义处理并继续返回响应式结果
                        return (Flux<T>) fallback(fallbackClass, type, method, args);
                    });
                } else {
                    // 调用rest 同上
                    webClientResult = smartWebClient.restMono(webClientRestInfo, resp -> {
                        log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName());
                        return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail."));
                    }).onErrorResume(Exception.class, throwable -> {
                        // 失败时调用方法
                        return (Mono<T>) fallback(fallbackClass, type, method, args);
                    });
                }
    
                // 返回调用结果
                return webClientResult;
            });
    
            return (T) obj;
        }
    
        /**
         * 得到缺省类型的实际类型
         *
         * @param genericReturnType
         * @return
         */
        private <T> Class<T> extractElementType(Type genericReturnType) {
            Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
            for (Type t : actualTypeArguments) {
                if (t instanceof Class) {
                    return (Class<T>) t;
                } else {
                    Type[] aaa = ((ParameterizedType) t).getActualTypeArguments();
                    return (Class<T>) ((ParameterizedType) t).getRawType();
                }
            }
            return (Class<T>) actualTypeArguments[0];
        }
    
        /**
         * 得到请求URL和请求方法
         *
         * @param method
         * @param webClientRestInfo
         */
        private void contractUrlAndMethod(Method method, WebClientRestInfo webClientRestInfo) {
            String url = webClientRestInfo.getUrl();
            Annotation[] annotationsMethod = method.getAnnotations();
            for (Annotation annotation : annotationsMethod) {
                // GET
                if (annotation instanceof GetMapping) {
                    GetMapping a = (GetMapping) annotation;
                    url += a.value()[0];
                    webClientRestInfo.setHttpMethod(HttpMethod.GET);
                }
                // POST
                else if (annotation instanceof PostMapping) {
                    PostMapping a = (PostMapping) annotation;
                    url += a.value()[0];
                    webClientRestInfo.setHttpMethod(HttpMethod.POST);
                }
                // DELETE
                else if (annotation instanceof DeleteMapping) {
                    DeleteMapping a = (DeleteMapping) annotation;
                    url += a.value()[0];
                    webClientRestInfo.setHttpMethod(HttpMethod.DELETE);
                }
                // PUT
                else if (annotation instanceof PutMapping) {
                    PutMapping a = (PutMapping) annotation;
                    url += a.value()[0];
                    webClientRestInfo.setHttpMethod(HttpMethod.PUT);
                }
            }
            webClientRestInfo.setUrl(url);
        }
    
        /**
         * 得到调用的参数和body
         *
         * @param method
         * @param args
         * @param webClientRestInfo
         */
        private void contractRequestParams(Method method, Object[] args, WebClientRestInfo webClientRestInfo) {
            // 参数和值对应的map
            Map<String, Object> params = new LinkedHashMap<>();
            MultiValueMap<String, Object> formValue = new LinkedMultiValueMap<>();
    
            // 得到调用的参数和body
            Parameter[] parameters = method.getParameters();
            for (int i = 0; i < parameters.length; i++) {
                // 是否带 @PathVariable注解
                PathVariable annoPath = parameters[i].getAnnotation(PathVariable.class);
                if (annoPath != null) {
                    params.put(annoPath.value(), args[i]);
                }
    
                // 是否带了 RequestParam
                RequestParam annoParam = parameters[i].getAnnotation(RequestParam.class);
                // 是否带了 RequestBody
                RequestBody annoBody = parameters[i].getAnnotation(RequestBody.class);
                if (annoParam != null || annoBody != null) {
                    formValue.add(parameters[i].getName(), args[i]);
                }
            }
            webClientRestInfo.setPathVariable(params);
            webClientRestInfo.setFormValues(formValue);
        }
    
        /**
         * 调用fallback方法
         *
         * @param fallbackClass
         * @param proxyType
         * @param method
         * @param args
         * @param 
         * @return
         */
        private <T> Object fallback(Class fallbackClass, Class<T> proxyType,
                                    Method method, Object[] args) {
            // 失败时调用方法
            try {
                return fallbackClass.getMethod(
                        method.getName(),
                        method.getParameterTypes()
                ).invoke(fallbackClass.newInstance(), args);
            } catch (NoSuchMethodException e) {
                log.error("未找到{}.{}方法的Fallback", proxyType.getSimpleName(), method.getName());
            } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
                log.error("实例化{}的FallbackClass失败", proxyType.getSimpleName());
            }
            return Mono.empty();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192

    测试

    可用通过自动注入,调用代理获取实例

    自动注入获取业务接口实例

    private OrderService orderService;
    
    @Resource
    public void setOrderService (ReactorLoadBalancerExchangeFilterFunction lbFunction) {
        this.orderService = new ReactiveFeignClientProxyCreator().createProxyObject(OrderService.class, lbFunction);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    方法调用

    像Openfeign一样使用,就会根据自定义注解声明的服务名称去nacos中找到服务地址,并通过LoadBalance负载调用服务,然后根据注解声明的接口信息调用的Controller接口

    // 响应式线程处理
    Mono<OrderInfo> orderInfo = orderService.getOrder("order-code");
    // 返回结果或者订阅响应处理
    orderInfo.subscribe(res -> {
    //TODO
    })
    
    // 阻塞式线程处理(不推荐)
    Mono<OrderInfo> orderInfo = orderService.getOrder("order-code");
    // block可以阻塞等待获取结果
    OrderInfo orderInfo = orderInfo.block();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    欢迎留言指正、讨论

  • 相关阅读:
    ROS数据格式转换:LaserScan转MultiEchoLaserScan
    sequencer和sequence
    web前端之零碎知识点、字符串、数组、基础知识
    基于Java+springboot+SSM的医疗报销系统的设计与实现
    EasyAVFilter代码示例之将摄像机RTSP流录像成mp4文件分段存储
    蚂蚁集团、浙江大学联合发布开源大模型知识抽取框架OneKE
    重构:banner 中 logo 聚合分散动画
    C_指针基础5
    git删除文件提交,删除文件按后这个记录怎么提交?
    Spring Security+Spring Boot实现登录认证以及权限认证
  • 原文地址:https://blog.csdn.net/qq_33598419/article/details/126171796