• 基于HTTP2/3的流模式消息交换如何实现?


    我想很多人已经体验过GRPC提供的三种流式消息交换(Client Stream、Server Stream和Duplex Stream)模式,在.NET Core上构建的GRPC应用本质上是采用HTTP2/HTTP3协议的ASP.NET Core应用,我们当然也可以在一个普通的ASP.NET Core应用实现这些流模式。不仅如此,HttpClient也提供了响应的支持,这篇文章通过一个简单的实例提供了相应的实现,源代码从这里下载。

    一、双向流的效果
    二、[服务端]流式请求/响应的读写
    三、[客户端]流式响应/请求的读写

    一、双向流的效果

    在提供具体实现之前,我们不妨先来演示一下最终的效果。我们通过下面这段代码构建了一个简单的ASP.NET Core应用,如代码片段所示,在调用WebApplication的静态方法CreateBuilder将WebApplicationBuilder创建出来后,我们调用其扩展方法UseKestrel将默认终结点的监听协议设置为Http1AndHttp2AndHttp3,这样我们的应用将提供针对不同HTTP协议的全面支持。

    复制代码
    var url = "http://localhost:9999";
    var builder = WebApplication.CreateBuilder(args);
    builder.WebHost
        .UseKestrel(kestrel=> kestrel.ConfigureEndpointDefaults(listen=>listen.Protocols = HttpProtocols.Http1AndHttp2AndHttp3))
        .UseUrls(url);
    var app = builder.Build();
    app.MapPost("/", httpContext=> HandleRequestAsync(httpContext, async (request, writer) => {
        Console.WriteLine($"[Server]Receive request message: {request}");
        await writer.WriteStringAsync(request);
    }));
    await app.StartAsync();
    
    await SendStreamRequestAsync(url, ["foo", "bar", "baz", "qux"], reply => {
        Console.WriteLine($"[Client]Receive reply message: {reply}\n");
        return Task.CompletedTask;
    });
    复制代码

    我们针对根路径(/)注册了一个HTTP方法为POST的路由终结点,终结点处理器调用HanleRequestAsync来处理请求。这个方法提供一个Func类型的参数作为处理器,该委托的第一个参数表示接收到的单条请求消息,PipeWriter用来写入响应内容。在这里我们将接收到的消息进行简单格式化后将其输出到控制台上,随之将其作为响应内容进行回写。

    在应用启动之后,我们调用SendStreamRequestAsync方法以流的方式发送请求,并处理接收到的响应内容。该方法的第一个参数为请求发送的目标URL,第二个参数是一个字符串数组,我们将以流的方式逐个发送每个字符串。最后的参数是一个Func类型的委托,用来处理接收到的响应内容(字符串),在这里我们依然是将格式化的响应内容直接打印在控制台上。

    image

    程序启动后控制台上将出现如上图所示的输出,客户端/服务端接收内容的交错输出体现了我们希望的“双向流式”消息交换模式。我们将在后续介绍HanleRequestAsync和SendStreamRequestAsync方法的实现逻辑。

    二、[服务端]流式请求/响应的读写

    HanleRequestAsync方法定义如下。如代码片段所示,我们利用请求的BodyReader和响应的BodyWriter来对请求和响应内容进行读写,它们的类型分别是PipeReader和PipeWriter。在一个循环中,在利用BodyReader将请求缓冲区内容读取出来后,我们将得到的ReadOnlySequence对象作为参数调用辅助方法TryReadMessage读取单条请求消息,并调用handler参数表示的处理器进行处理。当请求内容接收完毕后,循环终止。

    复制代码
    static async Task HandleRequestAsync(HttpContext httpContext, Func<string, PipeWriter, Task> handler)
    {
        var reader = httpContext.Request.BodyReader;
        var writer = httpContext.Response.BodyWriter;
        while (true)
        {
            var result = await reader.ReadAsync();
            var buffer = result.Buffer;
            while (TryReadMessage(ref buffer, out var message))
            {
                await handler(message, writer);
            }
            reader.AdvanceTo(buffer.Start, buffer.End);
            if (result.IsCompleted)
            {
                break;
            }
        }
    }
    复制代码

    由于客户端发送的单条字符串消息长度不限,为了精准地将其读出来,我们需要在输出编码后的消息内容前添加4个字节的整数来表示消息的长度。所以在如下所示的TryReadMessage方法中,我们会先将字节长度读取出来,再据此将消息自身内容读取出来,最终通过解码得到消息字符串。

    复制代码
    static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, [NotNullWhen(true)]out string? message)
    {
        var reader = new SequenceReader<byte>(buffer);
        if (!reader.TryReadLittleEndian(out int length))
        {
            message = default;
            return false;
        }
    
        message = Encoding.UTF8.GetString(buffer.Slice(4, length));
        buffer = buffer.Slice(length + 4);
        return true;
    }
    复制代码

    响应消息的写入是通过如下针对PipeWriter的WriteStringAsync扩展方法实现的,这里的PipeWriter就是响应的BodyWriter,针对“Length + Payload“的消息写入也体现在这里。

    复制代码
    public static class Extensions
    {
        public static ValueTask WriteStringAsync(this PipeWriter writer, string content)
        {
            var length = Encoding.UTF8.GetByteCount(content);
            var span = writer.GetSpan(4 + length);
            BitConverter.TryWriteBytes(span, length);
            Encoding.UTF8.GetBytes(content, span.Slice(4));
            writer.Advance(4 + length);
            return writer.FlushAsync();
        }
    }
    复制代码

    三、[客户端]流式响应/请求的读写

    客户端利用HttpClient发送请求。针对HttpClient的请求通过一个HttpRequestMessage对象表示,其主体内容体现为一个HttpContent。流式请求的发送是通过如下这个StreamContent类型实现的,它派生于HttpContent。我们重写了SerializeToStreamAsync方法,利用自定义的StreamContentWriter将内容写入请求输出流。

    复制代码
    public class StreamContent(StreamContentWriter writer) : HttpContent
    {
        private readonly StreamContentWriter _writer = writer;
        protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) 
    => _writer.SetOutputStream(stream).WaitAsync(); protected override bool TryComputeLength(out long length) => (length = -1) != -1; } public class StreamContentWriter { private readonly TaskCompletionSource _streamSetSource = new(); private readonly TaskCompletionSource _streamEndSource = new(); public StreamContentWriter SetOutputStream(Stream outputStream) { _streamSetSource.SetResult(outputStream); return this; } public async Task WriteAsync(string content) { var stream = await _streamSetSource.Task; await PipeWriter.Create(stream).WriteStringAsync(content); } public void Complete() => _streamEndSource.SetResult(); public Task WaitAsync() => _streamEndSource.Task; }
    复制代码

    StreamContentWriter提供了四个方法,SetOutputStream方法用来设置请求输出流,上面重写的SerializeToStreamAsync调用了此方法。单条字符串消息的写入实现在WriteAsync方法中,它最终调用的依然是上面提供的WriteStringAsync扩展方法。整个流式请求的过程通过一个TaskCompletionSource对象提供的Task来表示,当客户端完成所有输出后,会调用Complete方法,该方法进一步调用这个TaskCompletionSource对象的SetResult方法。由于WaitAsync方法返回TaskCompletionSource对象提供的Task,SerializeToStreamAsync方法会调用此方法等待”客户端输出流“的终结。

    如下的代码片段体现了SendStreamRequestAsync方法的实现。在这里我们创建了一个表示流式请求的HttpRequestMessage对象,我们将协议版本设置为HTTP2,作为主体内容的HttpContent正式根据StreamContentWriter对象创建的StreamContent对象。

    复制代码
    static async Task SendStreamRequestAsync(string url,string[] lines, Func<string, Task> handler)
    {
        using var httpClient = new HttpClient();
        var writer = new StreamContentWriter();
        var request = new HttpRequestMessage(HttpMethod.Post, url)
        {
            Version = HttpVersion.Version20,
            VersionPolicy = HttpVersionPolicy.RequestVersionExact,
            Content = new StreamingWeb.StreamContent(writer)
        };
        var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);
        _ = Task.Run(async () =>
        {
            var response = await task;
            var reader = PipeReader.Create(await response.Content.ReadAsStreamAsync());
            while (true)
            {
                var result = await reader.ReadAsync();
                var buffer = result.Buffer;
                while (TryReadMessage(ref buffer, out var message))
                {
                    await handler(message);
                }
                reader.AdvanceTo(buffer.Start, buffer.End);
                if (result.IsCompleted)
                {
                    break;
                }
            }
        });
    
        foreach (string line in lines)
        {
            await writer.WriteAsync($"{line} ({DateTimeOffset.UtcNow})");
            await Task.Delay(1000);
        }
        writer.Complete();
    }
    复制代码

    我们将这个HttpRequestMessage作为请求利用HttpClient发送出去,实际上发送的内容最终是通过调用StreamContentWriter对象的WriteAsync方法输出的,我们每隔1秒发送一条消息。HttpClient将请求发出去之后会得到一个通过HttpResponseMessage对象表示的响应,在一个异步执行的Task中,我们根据响应流创建一个PipeReader对象,并在一个循环中调用上面定义的TryReadMessage方法逐条读取接收到的单条消息进行处理。

  • 相关阅读:
    java精品旅游项目管理系统计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    基于 Kubernetes 的 DevOps
    【Linux】基础IO
    【手把手反内卷】开创全新AI多模态任务一视听分割:代码实践、优化教程(二)
    java计算机毕业设计音乐网站MyBatis+系统+LW文档+源码+调试部署
    HBuilderX连接安卓模拟器
    【Java八股文总结】之JDK常见问题排查
    Web3 入门手册:从认知到实践
    优思学院|如何领导六西格玛变革?学习哈佛商学院的八步变革模型
    Typora免费版下载【Mac、Windows】
  • 原文地址:https://www.cnblogs.com/artech/p/18021662/streaming_messaging