• 文盘Rust -- tonic-Rust grpc初体验 | 京东云技术团队


    gRPC 是开发中常用的开源高性能远程过程调用(RPC)框架,tonic 是基于 HTTP/2 的 gRPC 实现,专注于高性能、互操作性和灵活性。该库的创建是为了对 async/await 提供一流的支持,并充当用 Rust 编写的生产系统的核心构建块。今天我们聊聊通过使用tonic 调用grpc的的具体过程。

    工程规划

    rpc程序一般包含server端和client端,为了方便我们把两个程序打包到一个工程里面 新建tonic_sample工程

    cargo new tonic_sample
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Cargo.toml 如下

    [package]
    name = "tonic_sample"
    version = "0.1.0"
    edition = "2021"
    
    [[bin]] # Bin to run the gRPC server
    name = "stream-server"
    path = "src/stream_server.rs"
    
    [[bin]] # Bin to run the gRPC client
    name = "stream-client"
    path = "src/stream_client.rs"
    
    
    [dependencies]
    tokio.workspace = true
    tonic = "0.9"
    tonic-reflection = "0.9.2"
    prost = "0.11"
    tokio-stream = "0.1"
    async-stream = "0.2"
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    rand = "0.7"
    h2 = { version = "0.3" }
    anyhow = "1.0.75"
    futures-util = "0.3.28"
    
    [build-dependencies]
    tonic-build = "0.9"
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    tonic 的示例代码还是比较齐全的,本次我们参考 tonic 的 streaming example

    首先编写 proto 文件,用来描述报文。 proto/echo.proto

    syntax = "proto3";
    
    package stream;
    
    // EchoRequest is the request for echo.
    message EchoRequest { string message = 1; }
    
    // EchoResponse is the response for echo.
    message EchoResponse { string message = 1; }
    
    // Echo is the echo service.
    service Echo {
      // UnaryEcho is unary echo.
      rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
      // ServerStreamingEcho is server side streaming.
      rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
      // ClientStreamingEcho is client side streaming.
      rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
      // BidirectionalStreamingEcho is bidi streaming.
      rpc BidirectionalStreamingEcho(stream EchoRequest)
          returns (stream EchoResponse) {}
    }
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    文件并不复杂,只有两个 message 一个请求一个返回,之所以选择这个示例是因为该示例包含了rpc中的流式处理,包扩了server 流、client 流以及双向流的操作。 编辑build.rs 文件

    use std::{env, path::PathBuf};
    
    fn main() -> Result<(), Box<dyn std::error::Error>> {
        tonic_build::compile_protos("proto/echo.proto")?;
        Ok(())
    }
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    该文件用来通过 tonic-build 生成 grpc 的 rust 基础代码

    完成上述工作后就可以构建 server 和 client 代码了

    stream_server.rs

    pub mod pb {
        tonic::include_proto!("stream");
    }
    
    use anyhow::Result;
    use futures_util::FutureExt;
    use pb::{EchoRequest, EchoResponse};
    use std::{
        error::Error,
        io::ErrorKind,
        net::{SocketAddr, ToSocketAddrs},
        pin::Pin,
        thread,
        time::Duration,
    };
    use tokio::{
        net::TcpListener,
        sync::{
            mpsc,
            oneshot::{self, Receiver, Sender},
            Mutex,
        },
        task::{self, JoinHandle},
    };
    use tokio_stream::{
        wrappers::{ReceiverStream, TcpListenerStream},
        Stream, StreamExt,
    };
    use tonic::{transport::Server, Request, Response, Status, Streaming};
    type EchoResult<T> = Result<Response<T>, Status>;
    type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
    
    fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
        let mut err: &(dyn Error + 'static) = err_status;
    
        loop {
            if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
                return Some(io_err);
            }
    
            // h2::Error do not expose std::io::Error with `source()`
            // https://github.com/hyperium/h2/pull/462
            if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
                if let Some(io_err) = h2_err.get_io() {
                    return Some(io_err);
                }
            }
    
            err = match err.source() {
                Some(err) => err,
                None => return None,
            };
        }
    }
    
    #[derive(Debug)]
    pub struct EchoServer {}
    
    #[tonic::async_trait]
    impl pb::echo_server::Echo for EchoServer {
        async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> {
            let req_str = req.into_inner().message;
    
            let response = EchoResponse { message: req_str };
            Ok(Response::new(response))
        }
    
        type ServerStreamingEchoStream = ResponseStream;
    
        async fn server_streaming_echo(
            &self,
            req: Request<EchoRequest>,
        ) -> EchoResult<Self::ServerStreamingEchoStream> {
            println!("EchoServer::server_streaming_echo");
            println!("\tclient connected from: {:?}", req.remote_addr());
    
            // creating infinite stream with requested message
            let repeat = std::iter::repeat(EchoResponse {
                message: req.into_inner().message,
            });
            let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
    
            let (tx, rx) = mpsc::channel(128);
            tokio::spawn(async move {
                while let Some(item) = stream.next().await {
                    match tx.send(Result::<_, Status>::Ok(item)).await {
                        Ok(_) => {
                            // item (server response) was queued to be send to client
                        }
                        Err(_item) => {
                            // output_stream was build from rx and both are dropped
                            break;
                        }
                    }
                }
                println!("\tclient disconnected");
            });
    
            let output_stream = ReceiverStream::new(rx);
            Ok(Response::new(
                Box::pin(output_stream) as Self::ServerStreamingEchoStream
            ))
        }
    
        async fn client_streaming_echo(
            &self,
            _: Request<Streaming<EchoRequest>>,
        ) -> EchoResult<EchoResponse> {
            Err(Status::unimplemented("not implemented"))
        }
    
        type BidirectionalStreamingEchoStream = ResponseStream;
    
        async fn bidirectional_streaming_echo(
            &self,
            req: Request<Streaming<EchoRequest>>,
        ) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
            println!("EchoServer::bidirectional_streaming_echo");
    
            let mut in_stream = req.into_inner();
            let (tx, rx) = mpsc::channel(128);
    
            tokio::spawn(async move {
                while let Some(result) = in_stream.next().await {
                    match result {
                        Ok(v) => tx
                            .send(Ok(EchoResponse { message: v.message }))
                            .await
                            .expect("working rx"),
                        Err(err) => {
                            if let Some(io_err) = match_for_io_error(&err) {
                                if io_err.kind() == ErrorKind::BrokenPipe {
                                    eprintln!("\tclient disconnected: broken pipe");
                                    break;
                                }
                            }
    
                            match tx.send(Err(err)).await {
                                Ok(_) => (),
                                Err(_err) => break, // response was droped
                            }
                        }
                    }
                }
                println!("\tstream ended");
            });
    
            // echo just write the same data that was received
            let out_stream = ReceiverStream::new(rx);
    
            Ok(Response::new(
                Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
            ))
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        // 基础server
        let server = EchoServer {};
        Server::builder()
            .add_service(pb::echo_server::EchoServer::new(server))
            .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
            .await
            .unwrap();
        Ok(())
    }
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172

    server 端的代码还是比较清晰的,首先通过 tonic::include_proto! 宏引入grpc定义,参数是 proto 文件中定义的 package 。我们重点说说 server_streaming_echo function 。这个function 的处理流程明白了,其他的流式处理大同小异。首先 通过std::iter::repeat function 定义一个迭代器;然后构建 tokio_stream 在本示例中 每 200毫秒产生一个 repeat;最后构建一个 channel ,tx 用来发送从stream中获取的内容太,rx 封装到response 中返回。 最后 main 函数 拉起服务。

    client 代码如下

    pub mod pb {
        tonic::include_proto!("stream");
    }
    
    use std::time::Duration;
    use tokio_stream::{Stream, StreamExt};
    use tonic::transport::Channel;
    
    use pb::{echo_client::EchoClient, EchoRequest};
    
    fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
        tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
            message: format!("msg {:02}", i),
        })
    }
    
    async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) {
        for i in 0..num {
            let req = tonic::Request::new(EchoRequest {
                message: "msg".to_string() + &i.to_string(),
            });
            let resp = client.unary_echo(req).await.unwrap();
            println!("resp:{}", resp.into_inner().message);
        }
    }
    
    async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
        let stream = client
            .server_streaming_echo(EchoRequest {
                message: "foo".into(),
            })
            .await
            .unwrap()
            .into_inner();
    
        // stream is infinite - take just 5 elements and then disconnect
        let mut stream = stream.take(num);
        while let Some(item) = stream.next().await {
            println!("\treceived: {}", item.unwrap().message);
        }
        // stream is droped here and the disconnect info is send to server
    }
    
    async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
        let in_stream = echo_requests_iter().take(num);
    
        let response = client
            .bidirectional_streaming_echo(in_stream)
            .await
            .unwrap();
    
        let mut resp_stream = response.into_inner();
    
        while let Some(received) = resp_stream.next().await {
            let received = received.unwrap();
            println!("\treceived message: `{}`", received.message);
        }
    }
    
    async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
        let in_stream = echo_requests_iter().throttle(dur);
    
        let response = client
            .bidirectional_streaming_echo(in_stream)
            .await
            .unwrap();
    
        let mut resp_stream = response.into_inner();
    
        while let Some(received) = resp_stream.next().await {
            let received = received.unwrap();
            println!("\treceived message: `{}`", received.message);
        }
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();
        println!("Unary echo:");
        unary_echo(&mut client, 10).await;
        tokio::time::sleep(Duration::from_secs(1)).await;
    
        println!("Streaming echo:");
        streaming_echo(&mut client, 5).await;
        tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
    
        // Echo stream that sends 17 requests then graceful end that connection
        println!("\r\nBidirectional stream echo:");
        bidirectional_streaming_echo(&mut client, 17).await;
    
        // Echo stream that sends up to `usize::MAX` requests. One request each 2s.
        // Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
        // graceful client disconnection (above example) on the server side.
        println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
        bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
    
        Ok(())
    }
    
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103

    测试一下,分别运行 server 和 client

    cargo run --bin stream-server
    cargo run --bin stream-client
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在开发中,我们通常不会再 client 和 server都开发好的情况下才开始测试。通常在开发server 端的时候采用 grpcurl 工具进行测试工作

    grpcurl -import-path ./proto -proto echo.proto list
    grpcurl -import-path ./proto -proto  echo.proto describe stream.Echo
    grpcurl -plaintext -import-path ./proto -proto  echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    此时,如果我们不指定 -import-path 参数,执行如下命令

    grpcurl -plaintext 127.0.0.1:50051 list
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    会出现如下报错信息

    Failed to list services: server does not support the reflection API
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    让服务端程序支持 reflection API

    首先改造build.rs

    use std::{env, path::PathBuf};
    
    fn main() -> Result<(), Box<dyn std::error::Error>> {
        let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
        tonic_build::configure()
            .file_descriptor_set_path(out_dir.join("stream_descriptor.bin"))
            .compile(&["proto/echo.proto"], &["proto"])
            .unwrap();
        Ok(())
    }
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    file_descriptor_set_path 生成一个文件,其中包含为协议缓冲模块编码的 prost_types::FileDescriptorSet 文件。这是实现 gRPC 服务器反射所必需的。

    接下来改造一下 stream-server.rs,涉及两处更改。

    新增 STREAM_DESCRIPTOR_SET 常量

    pub mod pb {
        tonic::include_proto!("stream");
        pub const STREAM_DESCRIPTOR_SET: &[u8] =
            tonic::include_file_descriptor_set!("stream_descriptor");
    }
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    修改main函数

    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        // 基础server
        // let server = EchoServer {};
        // Server::builder()
        //     .add_service(pb::echo_server::EchoServer::new(server))
        //     .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
        //     .await
        //     .unwrap();
    
        // tonic_reflection 
        let service = tonic_reflection::server::Builder::configure()
            .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET)
            .with_service_name("stream.Echo")
            .build()
            .unwrap();
    
        let addr = "0.0.0.0:50051".parse().unwrap();
    
        let server = EchoServer {};
    
        Server::builder()
            .add_service(service)
            .add_service(pb::echo_server::EchoServer::new(server))
            .serve(addr)
            .await?;
        Ok(())
    }
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    register_encoded_file_descriptor_set 将包含编码的 prost_types::FileDescriptorSet 的 byte slice 注册到 gRPC Reflection 服务生成器注册。

    再次测试

    grpcurl -plaintext 127.0.0.1:50051 list
    grpcurl -plaintext 127.0.0.1:50051 describe stream.Echo
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    返回正确结果。

    以上完整代码地址

    作者:京东科技 贾世闻

    来源:京东云开发者社区 转载请注明来源

  • 相关阅读:
    树的应用 —— 二叉树:二叉树的性质
    Node.js 未来发展趋势
    Victor CMS SQL注入漏洞( CVE-2022-26201)
    JavaScript笔记(本文中将JavaScript简写为JS)
    微信小程序进阶(1)--自定义组件
    印尼全面禁止直播带货
    高频数据分析:使用数据透视
    uni.getLocation() 微信小程序 线上获取失败
    [SQL开发笔记]SELECT DISTINCT语句:返回唯一不同的值
    ZCMU--1515: ZDD vs. TIANKENG(C语言)
  • 原文地址:https://blog.csdn.net/JDDTechTalk/article/details/132977232