• 【RSocket】使用 RSocket(二)——四种通信模式实践


    Source Code: https://github.com/joexu01/rsocket-demo

    0. 四种通信模式

    让我们来简单复习一下 RSocket 的四种通信模式:

    • 即发即忘 - FireAndForget:立即发送一个请求,无需为这个请求发送响应报文。适用于监控埋点,日志上报等,这种场景下无需回执,丢失几个请求无伤大雅

    • 请求响应 - RequestResponse:请求方发送一条请求消息,响应方收到请求后并返回一条响应消息。传统的HTTP是典型的Request-Response

    • 流式响应 - RequestStream:请求方发送一个请求报文,响应方发回N个响应报文。传统的MQ是典型的RequestStream

    • 双向通道 - Channel:创建一个通道上下文,双方可以互相发送消息。IM是个典型的RequestChannel通讯场景

    1. 客户端生成和解析路由信息

    *本篇文章的客户端示例文件在 rsocket-client-raw/src/main/java/org/example/FourCommunicationScheme.java

    我们使用 decodeRouteencodeRoute 函数来解码和编码路由信息。

    static String decodeRoute(ByteBuf metadata) {
            final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
    
            return routingMetadata.iterator().next();
    }
    
    static ByteBuf encodeRoute(String route) {
            return TaggingMetadataCodec.createTaggingContent(
                    ByteBufAllocator.DEFAULT,
                    Collections.singletonList(route));
    }
    

    2. RequestResponse

    服务端处理函数

    在这里我们编写一个简单的 Handler,它的 Route 是 test.echo,它接收一个请求并返回请求 Payload 的 data 中的字符串。

    @MessageMapping("test.echo")
    public Mono simplyEcho(String data) throws InterruptedException {
        Thread.sleep(1500);
        logger.info("[test.echo]Received echo string from client: {}", data);
        return Mono.just(String.format("[test.echo]I received your string: %s. Thank you.", data));
    }
    

    注意,这里的参数也可以是 Mono ,然后对 Mono 进行操作并返回。事实上,如果严格按照响应式编程的策略,这里应该直接对 Mono 进行操作。

    客户端发送请求

    1. 生成 metadata 的 route 信息,然后将字符串和 metadata 放入 Payload
    ByteBuf routeMetadata = encodeRoute("test.echo");
    Payload echoPayload = ByteBufPayload.create(
            ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "This is a message from client using rsocket-java library."),
            routeMetadata);
    
    1. 新建一个 RequestResponse,并对这次请求做一些设置;可以看到 RequestResponse 方法返回的数据类型是 Mono。然后我们对这个 Mono 设定一些操作(具体操作请看代码注释):
    Mono requestResponse = socket.requestResponse(echoPayload);
    requestResponse
            // 当 subscribe() 操作开始执行时打印一下日志
            .doOnSubscribe(subscription -> logger.info("Test1 subscribed to {}", subscription.toString()))
            // 当携带的请求成功后要做的事情
            .doOnSuccess(payload -> {
                logger.info("Test1 - Successfully returned: {}", payload.getDataUtf8());
                payload.release();
            })
            .doOnError(throwable -> logger.info("Test1 doOnError: {}", throwable.toString()))
            // 可以使用 timeout 丢弃等待超时的 Mono
            //.timeout(Duration.ofSeconds(1))
            // 可以使用 doOnTerminate 在请求结束后做一些工作
            // .doOnTerminate(() -> {})
            // 但是一定要设置 doOnError
            //.doOnError(TimeoutException.class, e -> logger.info("Test1 doOnError: {}", e.toString()))
            // .onErrorReturn(TimeoutException.class, DefaultPayload.create("Payload: Test1 - timeout"))
            // 可以使用 log() 来观察数据的状态
            //.log()
            // 客户端在执行 subscribe() 操作时才会开始从服务端接收数据流
            // 在响应式编程中使用 subscribe 操作符是订阅一个数据流并处理发布的数据、错误和完成信号的核心方式之一
            .subscribe();
    

    请求发出后主线程不会阻塞,所以我们需要使用 socket.onClose().block(); 保持连接。

    然后我们尝试运行服务端和客户端,看看一看客户端的输出:

    [main] INFO org.example.RSocketClientRaw - My UUID is 0718ef3b-9ee0-42f1-9003-700a8aa9a98d
    [main] INFO org.example.RSocketClientRaw - Test1 subscribed to RequestResponseRequesterMono
    [reactor-tcp-epoll-2] INFO org.example.RSocketClientRaw - Test1 - Successfully returned: [test.echo]I received your string: This is a message from client using rsocket-java library.. Thank you.
    

    服务端日志:

    2023-03-12 21:47:29.291  INFO 32099 --- [or-http-epoll-2] o.example.controller.RSocketController   : [connect.setup]Client connection: 0718ef3b-9ee0-42f1-9003-700a8aa9a98d
    2023-03-12 21:47:32.304  INFO 32099 --- [or-http-epoll-2] o.example.controller.RSocketController   : [test.echo]Received echo string from client: This is a message from client using rsocket-java library.
    

    客户端成功地发出请求并收到来自服务端的回复。

    3. FireAndForget

    服务端

    @MessageMapping("upload.log")
    public void fireAndForgetHandler(@Headers Map header, RSocketRequester requester, String data) {
        header.forEach((k, v) -> System.out.printf("[upload.log]header key: %s, val: %s\n", k, v));
        System.out.printf("[upload.log]UploadEventLogs: Received log string from client: %s\n", data);
    }
    

    服务端接受一个请求,不返回任何结果(Fire'n'Forget),只在服务端打印 Header 的内容。

    客户端

    // 测试 FnF
    routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("upload.log"));
    socket.fireAndForget(
        ByteBufPayload.create(
        ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "This is a log from client using rsocket-java library."),
            routeMetadata))
        .doOnSubscribe(subscription -> logger.info("Test2 - Fire And Forget onSubscribe: {}", subscription.toString()))
        .subscribe();
    

    客户端输出:

    [main] INFO org.example.RSocketClientRaw - Test2 - Fire And Forget onSubscribe: FireAndForgetRequesterMono
    

    服务端输出:

    2023-03-10 15:10:25.675  INFO 5318 --- [or-http-epoll-4] o.example.controller.RSocketController   : [test.echo]Received echo string from client: This is a message from client using rsocket-java library.
    [upload.log]header key: dataBufferFactory, val: NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true))
    [upload.log]header key: rsocketRequester, val: org.springframework.messaging.rsocket.DefaultRSocketRequester@607cc59
    [upload.log]header key: lookupDestination, val: upload.log
    [upload.log]header key: contentType, val: application/binary
    [upload.log]header key: rsocketFrameType, val: REQUEST_FNF
    [upload.log]UploadEventLogs: Received log string from client: This is a log from client using rsocket-java library.
    

    4. RequestStream

    服务端

    服务端接收一个 Mono 然后返回给客户端包含 10 个 StringFlux

    事实上,严格按照响应式编程的策略,这里应该直接对 Mono 进行操作,可以使用 flatMapMany() 把生成的数据流通过异步方式处理,扩展出新的数据流。下面是扩展新数据流的简单示例:

    Mono.just(3)
        .flatMapMany(i -> Flux.range(0, i))
        .subscribe(System.out::println);
    

    在这里为了演示方便就先打印 Mono 然后新生成一个 Flux

    @MessageMapping("handler.request.stream")
    public Flux responseStreaming(Mono request) {
        request
                .doOnNext(s -> logger.info("[handler.request.stream]: {}", s))
                // 可以使用 then() 结束操作链
                .then()
                .subscribe();
    
        return Flux
                .range(1, 10)
                .map(idx -> String.format("Resp from Server: %s, Thank you!", idx));
    }
    

    客户端

    请看代码注释来理解对数据流 Flux 的各种操作:

    // 测试 RequestStream
    routeMetadata = encodeRoute("handler.request.stream");
    Flux requestStream = socket.requestStream(
            ByteBufPayload.create(
                    ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "TEST3 - Request&Stream"),
                    routeMetadata));
    
    requestStream
            // 当然可以使用 map 对每个 Payload 进行操作,这会改变数据
            // .map(payload -> System.out.printf("%s\n", payload.getDataUtf8()))
            .doOnSubscribe(subscription -> logger.info("Test3 subscribed to {}", subscription.toString()))
            // 使用 doOnNext 不会对流的数据进行改变
            // doOnNext()是一个 Reactor 式流操作符,它允许编写者注册一个在每次出现新元素时执行的回调函数
            .doOnNext(nextPayload -> System.out.println("Test3 Received payload: " + nextPayload.getDataUtf8()))
            // 当需要从流中选择一些特定的元素时,可以使用 Flux.take(long n) 操作符
            // 该操作符将创建一个新的 Flux,该 Flux 包含原始 Flux 的前 n 个元素
            // take 操作符发出了指定数量的元素之后,就不再接收任何元素,并且将取消其上游发布者的订阅
            // 在这里服务端使用 Flux.range 来限定 Flux 流中的元素个数
            // 如果服务端使用 Flux.interval 生成一个无限长度的流,客户端使用 take 接收限定个数的元素
            // 便会取消发布者的订阅
            .take(5)
            .subscribe();
    

    客户端输出结果:

    [main] INFO org.example.RSocketClientRaw - My UUID is 28afc749-75e1-4289-8607-14810103de6c
    [main] INFO org.example.RSocketClientRaw - Test3 subscribed to RequestStreamRequesterFlux
    Test3 Received payload: Resp from Server: 1, Thank you!
    Test3 Received payload: Resp from Server: 2, Thank you!
    Test3 Received payload: Resp from Server: 3, Thank you!
    Test3 Received payload: Resp from Server: 4, Thank you!
    Test3 Received payload: Resp from Server: 5, Thank you!
    

    服务端接收到了请求:

    2023-03-12 22:01:33.520  INFO 32099 --- [or-http-epoll-3] o.example.controller.RSocketController   : [handler.request.stream]: TEST3 - Request&Stream
    

    5. Channel

    服务端

    服务端接收来自客户端的整数字符串,将它们乘以2以后发送回去。我们不妨把处理客户端请求流的函数封装为一个 Spring Service:

    @Service
    public class MathService {
        public Flux doubleInteger(Flux request) {
            return request
                    .map(s -> {
                        System.out.println("received " + s);
                        int i = Integer.parseInt(s);
                        return String.valueOf(i * 2);
                    });
        }
    
    }
    

    编写处理函数:

    @Autowired
    private MathService mathService;
    
    @MessageMapping("handler.request.channel")
    public Flux responseChannel(Flux payloads) {
        return this.mathService.doubleInteger(payloads);
    }
    

    客户端

    Flux payloadFlux = Flux.range(-5, 10)
            .delayElements(Duration.ofMillis(500))
            .map(obj ->
            {
                ByteBuf metadata = encodeRoute("handler.request.channel");
                return ByteBufPayload.create(
                        ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString()), metadata);
            });
    
    Flux channelResp = socket.requestChannel(payloadFlux);
    channelResp
            .doOnSubscribe(subscription -> logger.info("Test4 subscribed to {}", subscription.toString()))
            .doOnError(throwable -> logger.info(throwable.toString()))
            .doOnNext(nextPayload -> System.out.println("Test4 Received payload: " + nextPayload.getDataUtf8()))
            .subscribe();
    

    客户端输出:

    [main] INFO org.example.RSocketClientRaw - My UUID is 96ff8fe7-416c-4607-9518-463114725a7a
    [main] INFO org.example.RSocketClientRaw - Test4 subscribed to RequestChannelRequesterFlux
    Test4 Received payload: -10
    Test4 Received payload: -8
    Test4 Received payload: -6
    Test4 Received payload: -4
    Test4 Received payload: -2
    Test4 Received payload: 0
    Test4 Received payload: 2
    Test4 Received payload: 4
    Test4 Received payload: 6
    Test4 Received payload: 8
    

    服务端输出:

    2023-03-12 22:07:05.542  INFO 33083 --- [or-http-epoll-2] o.example.controller.RSocketController   : [connect.setup]Client connection: 96ff8fe7-416c-4607-9518-463114725a7a
    
    received -5
    received -4
    received -3
    received -2
    received -1
    received 0
    received 1
    received 2
    received 3
    received 4
    

    下一篇文章会展示服务端如何主动调用客户端的函数。如有错误欢迎在评论区批评指正!

  • 相关阅读:
    第4周学习:MobileNetV1, V2, V3
    走进Redis:哨兵集群
    【蓝桥备战】DFS、BFS
    C++11—线程库
    分享的ise文件synthesize出错,如何解决?
    Ubuntu 安装 tbb 步骤详解
    数据结构——二叉树(堆的实现)
    求遥感影像利用arcgis或envi进行景观布局优化分析思路操作,进行什么操作或可使用什么工具
    技术学习:Python(18)|爬虫篇|解析器BeautifulSoup4(一)
    Scrum 敏捷管理流程图及敏捷管理工具
  • 原文地址:https://www.cnblogs.com/joexu01/p/rsocket-02-four-types-of-communication.html