• 【RSocket】使用 RSocket(三)——服务端主动调用客户端方法


    1. 编写客户端接收请求的逻辑

    我们可以在初始化 Rsocket 实例的时候指定客户端可以被调用的方法,使用 acceptor() 指定可被调用的方法和方法使用的通信模型类型:

    • 通信类型为 RequestResponse 时:
      .acceptor(SocketAcceptor.forRequestResponse(payload -> {}))
      
    • 通信类型为 FireAndForget
      .acceptor(SocketAcceptor.forFireAndForget(payload -> {}))
      
    • 通信类型为 RequestStream
      .acceptor(SocketAcceptor.forRequestStream(payload -> {}))
      
    • 通信类型为 RequestStream
      .acceptor(SocketAcceptor.forRequestChannel(
                payloads ->
                    Flux.from(payloads)...));
      

    接下来编写客户端方法的处理逻辑,以 RequestResponse 为例

    https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/CallingTheClientSide.java

    public static void main(String[] args) {
        final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
    
        // 随机生成 UUID 标识客户端
        UUID uuid = UUID.randomUUID();
        logger.info("My UUID is {}", uuid);
        // 生成 SETUP 阶段(建立连接时) Payload 使用的 route 信息
        ByteBuf setupRouteMetadata = encodeRoute("connect.setup");
    
        RSocket socket = RSocketConnector.create()
                // 设置 metadata MIME Type,方便服务端根据 MIME 类型确定 metadata 内容
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
                // SETUP 阶段的 Payload,data 里面存放 UUID
                .setupPayload(ByteBufPayload.create(
                        ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
                        setupRouteMetadata))
    
                // 编写 Request&Response Acceptor
                .acceptor(SocketAcceptor.forRequestResponse(
                        payload -> {
                            String route = decodeRoute(payload.sliceMetadata());
                            logger.info("[Client Acceptor] Received RequestResponse[route={}]", route);
    
                            String metadataUtf8 = payload.getMetadataUtf8();
                            String dataUtf8 = payload.getDataUtf8();
                            logger.info("[Client Acceptor] This Req&Resp contains data: {}, metadata: {}", dataUtf8, metadataUtf8);
    
                            payload.release();
    
                            if ("request.status.callback".equals(route)) {
                                return Mono.just(ByteBufPayload.create("Thanks for handling my task!"));
                            } else if ("request.server.call".equals(route)) {
                                return Mono.just(ByteBufPayload.create("You called my handler actively from server!"));
                            }
    
                            byte[] respBytes = String
                                    .format("Client received your message, but no handler matched. Your meta is %s and data is %s",
                                            metadataUtf8, dataUtf8).getBytes();
                            return Mono.just(DefaultPayload.create(respBytes));
                        }
                ))
    
                // 设置重连策略
                .reconnect(Retry.backoff(2, Duration.ofMillis(500)))
                .connect(
                        TcpClientTransport.create(
                                TcpClient.create()
                                        .host("127.0.0.1")
                                        .port(8099)))
                .block();
    

    在这里我们设置客户端能够接收 RequestResponse 类型的服务端请求,仔细观察可以看到,服务端发送的请求也是可以携带包含路由信息的 metadata 的,在客户端,我们也可以根据 Payload 中的路由信息将请求分发到不同方法中处理。

    为了方便演示,如果服务端调用时指定的路由信息是 request.status.callback,那么服务端就是在完成一个由客户端发起的,异步执行的任务后调用客户端的回调函数返回任务执行结果。

    如果服务端调用时指定的路由信息是 request.server.call,那么服务端就是在主动调用客户端以获取一些状态信息。

    当然,使用上面的代码设置客户端可被调用的 RSocket 方法有一个局限性,那就是我们只能设置 RequestResponse FireAndForget RequestStream Channel 这四种通信模式的其中一种。也就是说,用这种方法,服务端无法同时向客户端发出 RequestResponse FireAndForget RequestStream Channel 请求。本文会在第四部分展示如何让客户端支持同时响应这四种通信模式。

    2. 场景:客户端提交一个耗时任务,服务端完成任务后使用回调函数返回结果

    image

    如果客户端提交一个耗时任务,服务端可以接受这个任务然后立刻返回响应:“任务提交成功”,然后执行任务。当任务执行完,服务端再使用回调函数将结果返回给客户端。

    我们不妨将执行任务的模块封装成一个 Spring Service:

    @Service
    public class RequestProcessor {
    
        private static final Logger logger = LoggerFactory.getLogger(RequestProcessor.class);
    
        public void processRequests(RSocketRequester rSocketRequester, UUID uuid) {
            logger.info("[RequestProcessor.processRequests]I'm handling this!");
            ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("request.status.callback"));
    
            Mono.just("Your request " + uuid + "  is completed")
                    .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(10, 15)))
                    .flatMap(
                            m -> rSocketRequester.rsocketClient()
                                    .requestResponse(
                                            Mono.just(ByteBufPayload.create(
                                                    ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
                                                            String.format("[TASK %s]This is a task result from server using spring.", uuid)),
                                                    routeMetadata
                                            )))
                                    .doOnSuccess(p -> logger.info("[RequestProcessor.processRequests]Received from client: {}", p.getDataUtf8()))
                    )
                    .subscribe();
        }
    }
    

    这个 Service 中的方法接收一个 RSocketRequester 和一个 任务的 UUID,当任务完成时,这个方法会生成一个 Payload 存放任务结果,指定 metadata 中的路由信息为 request.status.callback。这样客户端在收到这个 RequestResponse 时就能知道这是一个已经提交任务的回调。在这里我们使用 delayElement 模拟处理任务时耗时的操作。

    值得注意的是,RSocketRequester 参数的来源,我们在编写服务端接收任务提交的方法时可以将其作为参数,这是 Spring RSocket 的固定用法,这样就可以拿到服务端-客户端连接的 RSocketRequester 实例,然后就可以在 Service 中通过 RSocketRequester 实例调用客户端的回调函数:

    @MessageMapping("handler.task")
    public Mono task(String request, RSocketRequester rSocketRequester) {
       logger.info("[handler.request]Client request: {}", request);
        UUID uuid = UUID.randomUUID();
        this.requestProcessor.processRequests(rSocketRequester, uuid);
        return Mono.just(uuid.toString());
    }
    

    3. 场景:服务端主动调用客户端获取信息

    我们在【RSocket】使用 RSocket (一)——建立连接一文中已经在连接建立的时刻将客户端-服务端连接的 RSocketRequester 实例保存在一个 ConcurrentHashMap 中了。我们可以通过一些机制,比如定时任务,或者使用 REST API 向服务端下命令的方式,让服务端主动调用已经建立连接的客户端的 RSocket 方法。

    在这个示例里,我们编写两个 REST API,一个 API 返回所有已连接到服务端的客户端信息,包括客户端 UUID、连接建立的时间等:

    @ResponseBody
    @GetMapping("/client/list")
    public List clientsInfo() {
        List info = new ArrayList<>();
        RSocketController.clientsManager.clients.forEach((key, value) -> {
            info.add(new ConnectedClientDto(key, value.connectedTime));
        });
    
        return info;
    }
    

    另一个 API 用于触发服务端向客户端发送请求:

    @GetMapping("/client/call")
    public ServerResponse callFromServer(String clientRoute, String clientUUID) {
        RSocketRequester requester = RSocketController.clientsManager.getClientRequester(clientUUID);
        if (requester == null) {
            return new ServerResponse("failed: client rSocket has closed.");
        }
        ByteBuf routeMetadata = TaggingMetadataCodec
                .createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList(clientRoute));
    
        Mono.just("Server is calling you.")
    //                .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10)))
                .flatMap(m -> requester.rsocketClient().requestResponse(
                                Mono.just(
                                        ByteBufPayload.create(
                                                ByteBufUtil.writeUtf8(
                                                        ByteBufAllocator.DEFAULT,
                                                        "This is a message from server using spring-stack."),
                                                routeMetadata)))
                        .doOnSubscribe(subscription -> logger.info("subscribed."))
                        .doOnError(throwable -> logger.error("Error when calling client: {}", throwable.toString()))
                        .doOnSuccess(p -> logger.info("[test.connect.requester]Received from client: {}.", p.getDataUtf8()))
                )
                .subscribe();
        return new ServerResponse(String.format("request from server has sent to the client %s.", clientUUID));
    }
    

    我们首先启动服务端再启动客户端,然后测试上述两个 API:

    • 启动两个客户端和服务端后查看连接信息

      image

    • 向其中一个客户端发送一个请求

      image

      可以从客户端的输出看到客户端接收到了这次请求

      image

    4. 让客户端同时接收不同类型的请求

    前面我们提到如果使用 .acceptor(SocketAcceptor.for...) 来添加客户端可以被调用的方法时,只能指定四种通信模式中的一种。

    这时候,我们可以实现 io.rsocket.SocketAcceptor 接口,重写 accept 方法,accept 方法的返回值是 Mono ,我们可以实现 RSocket 接口并重写其中 fireAndForget requestResponse requestStream requestChannel 四个方法来达到让客户端同时接收四种通信模式的目的。

    首先实现 RSocket 接口,并重写其中的方法:

    // https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/service/ClientService.java
    public class ClientService implements RSocket {
    
        Logger logger = LoggerFactory.getLogger(ClientService.class);
    
        static String decodeRoute(ByteBuf metadata) {
            final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
            return routingMetadata.iterator().next();
        }
    
        @Override
        public Mono fireAndForget(Payload payload) {
            logger.info("Receiving: " + payload.getDataUtf8());
            return Mono.empty();
        }
    
        @Override
        public Mono requestResponse(Payload payload) {
            logger.info("Receiving: " + payload.getDataUtf8());
            return Mono.just(DefaultPayload.create("Client received your RequestResponse"));
        }
    
        @Override
        public Flux requestStream(Payload payload) {
            return Flux.range(-5, 10)
                    .delayElements(Duration.ofMillis(500))
                    .map(obj ->
                            ByteBufPayload.create(
                                    ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
        }
    
        @Override
        public Flux requestChannel(Publisher payloads) {
            return Flux.range(-5, 10)
                    .delayElements(Duration.ofMillis(500))
                    .map(obj ->
                            ByteBufPayload.create(
                                    ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
        }
    }
    

    这只是一个示例,如果业务需要也可以解析 Payload 中的 metadata 来实现路由。

    接下来我们实现 RSocketAcceptor 接口:

    // https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/SocketAcceptorImpl.java
    public class SocketAcceptorImpl implements SocketAcceptor {
        @Override
        public Mono accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
            return Mono.just(new ClientService());
        }
    }
    

    然后我们在初始化客户端的时候这样设定 Acceptor 即可:

    RSocket socket = RSocketConnector.create().acceptor(new SocketAcceptorImpl())
    

    下一篇聊聊如何启用 TLS 或者 wss 来保证连接安全,顺便谈谈部署和 nginx 代理 RSocket over WebSocket。

  • 相关阅读:
    Nginx proxy_pass 详解
    云原生基础知识:容器技术的历史
    LabVIEW中使用Get LV Class Default Value 出现错误1498
    【四:Spring整合Junit】
    并行计算技术与SIMD、SIMT
    gen_arrow_contour_xld
    QT 中charts各种图表的综合应用
    网工内推 | 上市公司运维专员,NA/NP认证优先,享有股票期权
    MySQL8实现主从备份
    【git】github 如何同步别人的仓库
  • 原文地址:https://www.cnblogs.com/joexu01/p/rsocket-03-calling-the-client-side.html