• Go-gRPC示例


    前言

    我们使用gRPC,一把是把它作为微服务。因为,它与语言无关,可以适配多种语言。它的底层实现,使用的是HTTP/2。

    在使用时,我们需要通过"protoc"命令,为我们生成protocol-buffers的相关代码,它还会为我们生成gRPC相关代码。然后分别在客户端、服务端分别使用相应的代码即可。

    1.安装"protocol-buffers"代码生成工具、"gRPC"代码生成工具。

    1. $ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
    2. $ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

    2.创建一个新的项目

    项目名称任意就好,在该项目的根路径下创建一个"proto"的文件夹,用来存放我们的".proto"文件。

    这个".proto"文件,其实就是protocol-buffers的文件,它可以为我们不同语言,生成符合protocol-buffers规范的代码。

    这种代码一般以"***.pb.go"结尾。protocol-buffers存储结构利于传输,比json效率要更加好。因此,它得到了广泛的使用。

    3.创建"product.proto"文件

    并把下面的内容复制到该文件中

    1. # proto 语法,有2和3之分,这里我们使用proto3的语法
    2. syntax = "proto3";
    3. # path;name path,是指proto生成代码的保存路径,name是指生成代码的文件名称
    4. option go_package = "../service";
    5. # 指定被生成代码的包名
    6. package service;
    7. # 定义对象、结构体,与我们的JavaBean很相似,这是一个请求对象
    8. message ProductRequest{
    9. int32 productId = 1;
    10. }
    11. # 定义对象、结构体,与我们的JavaBean很相似,这是一个响应对象
    12. message ProductResponse{
    13. int32 productStock = 1;
    14. }
    15. # 定义我们的服务,类似于Java的controller
    16. service ProductService{
    17. // 普通 rpc
    18. rpc GetProductStock(ProductRequest) returns(ProductResponse);
    19. // 客户端流式 rpc
    20. rpc QueryProductStockClientStream(stream ProductRequest) returns(ProductResponse);
    21. // 服务端流式 rpc
    22. rpc QueryProductStockServerStream(ProductRequest) returns(stream ProductResponse);
    23. // 双向流式 rpc
    24. rpc QueryProductStockStream(stream ProductRequest) returns(stream ProductResponse);
    25. }

    4.使用"protoc"生成代码

    在执行下面的命令生成之前,请在根目录下,先创建一个"service"的目录,用来保存我们服务的代码。

    使用命令行工具,进入到我们项目的根路径下,并执行如下命令:

    1. # protoc protocol buffers命令
    2. # go_out 参数指定pb文件生成目录
    3. # go-grpc_out参数指定服务代码生成目录
    4. # proto/product.proto 指定.proto文件
    5. $ protoc --go_out=./service/ --go-grpc_out=./service/ proto/product.proto

    效果图: 

     

    5.项目下载"gRPC"相关资源到本地

    使用命令行工具,进入到更目录下,执行如下命令:

    $ go mod tidy

    执行后,它就会自动下载相应资源

    6.分别在项目根路径创建client、server目录,并在两个目录中都新建"main.go"文件

    用来模拟客户端、服务端访问。

    客户端代码:

    1. package main
    2. import (
    3. "context"
    4. "io"
    5. "log"
    6. "math/rand"
    7. "sync"
    8. "time"
    9. "04-stream/service"
    10. "04-stream/util"
    11. "google.golang.org/grpc"
    12. "google.golang.org/grpc/credentials/insecure"
    13. )
    14. var wg = &sync.WaitGroup{}
    15. func getProductStock(client service.ProductServiceClient) {
    16. defer wg.Done()
    17. ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
    18. defer cancel()
    19. response, err := client.GetProductStock(ctx, &service.ProductRequest{ProductId: 100})
    20. if err != nil {
    21. log.Fatalln(err)
    22. return
    23. }
    24. log.Println(response.ProductStock)
    25. }
    26. //func queryProductStockStream(client service.ProductServiceClient) {
    27. // defer wg.Done()
    28. //
    29. // ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
    30. // defer cancel()
    31. //
    32. // streamClient, err := client.QueryProductStockClientStream(ctx)
    33. // if err != nil {
    34. // log.Fatalln(err)
    35. // return
    36. // }
    37. //
    38. // err = streamClient.Send(&service.ProductRequest{ProductId: 222})
    39. // if err != nil {
    40. // log.Fatalln(err)
    41. // return
    42. // }
    43. //
    44. // response, err := streamClient.CloseAndRecv()
    45. // if err != nil {
    46. // log.Fatalln(err)
    47. // return
    48. // }
    49. //
    50. // log.Println(response.ProductStock)
    51. //}
    52. func queryProductStockClientStream(client service.ProductServiceClient) {
    53. defer wg.Done()
    54. count := 0
    55. ctx := context.TODO()
    56. stream, err := client.QueryProductStockClientStream(ctx)
    57. if err != nil {
    58. log.Fatalln(err)
    59. return
    60. }
    61. for true {
    62. count++
    63. request := &service.ProductRequest{ProductId: rand.Int31()}
    64. log.Println("send ProductId:", request.ProductId, ", count:", count)
    65. err = stream.Send(request)
    66. if err != nil {
    67. log.Fatalln(err)
    68. return
    69. }
    70. time.Sleep(time.Second)
    71. if count >= 10 {
    72. break
    73. }
    74. }
    75. response, err := stream.CloseAndRecv()
    76. if err != nil {
    77. log.Fatalln(err)
    78. return
    79. }
    80. log.Println("收到服务器的数据:", response.ProductStock)
    81. }
    82. func queryProductStockServerStream(client service.ProductServiceClient) {
    83. defer wg.Done()
    84. count := 0
    85. // 客户端只发送一次
    86. serverStream, err := client.QueryProductStockServerStream(context.TODO(), &service.ProductRequest{ProductId: rand.Int31()})
    87. if err != nil {
    88. log.Fatalln(err)
    89. return
    90. }
    91. // 接收多次
    92. for {
    93. count++
    94. response, err := serverStream.Recv()
    95. if err == io.EOF {
    96. log.Println("客户端接收完毕.")
    97. return
    98. }
    99. if err != nil {
    100. log.Fatalln(err)
    101. return
    102. }
    103. log.Println("客户端收到:", response.ProductStock, count)
    104. }
    105. }
    106. func queryProductStockStream(client service.ProductServiceClient) {
    107. defer wg.Done()
    108. stream, err := client.QueryProductStockStream(context.TODO())
    109. if err != nil {
    110. log.Fatalln(err)
    111. return
    112. }
    113. for {
    114. err := stream.Send(&service.ProductRequest{ProductId: rand.Int31()})
    115. if err != nil {
    116. log.Fatalln(err)
    117. return
    118. }
    119. time.Sleep(time.Second)
    120. response, err := stream.Recv()
    121. if err != nil {
    122. log.Fatalln(err)
    123. return
    124. }
    125. log.Println("收到服务端数据:", response.ProductStock)
    126. }
    127. }
    128. type queryFn func(client service.ProductServiceClient)
    129. func main() {
    130. qs := []queryFn{
    131. //getProductStock,
    132. //queryProductStockClientStream,
    133. //queryProductStockServerStream,
    134. queryProductStockStream,
    135. }
    136. conn, err := grpc.Dial(util.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
    137. if err != nil {
    138. log.Fatalln(err)
    139. return
    140. }
    141. client := service.NewProductServiceClient(conn)
    142. for _, fn := range qs {
    143. wg.Add(1)
    144. go fn(client)
    145. }
    146. wg.Wait()
    147. }

    服务端代码:

    1. package main
    2. import (
    3. "04-stream/service"
    4. "04-stream/util"
    5. "google.golang.org/grpc"
    6. "log"
    7. "net"
    8. )
    9. func main() {
    10. server := grpc.NewServer()
    11. service.RegisterProductServiceServer(server, service.ProductService)
    12. listen, err := net.Listen("tcp", util.Address)
    13. if err != nil {
    14. log.Fatalln(err)
    15. return
    16. }
    17. if err = server.Serve(listen); err != nil {
    18. return
    19. }
    20. }

    service代码:

    1. package service
    2. import (
    3. "context"
    4. "io"
    5. "log"
    6. "math/rand"
    7. "time"
    8. )
    9. type productService struct {
    10. }
    11. func (ps *productService) mustEmbedUnimplementedProductServiceServer() {}
    12. // GetProductStock 获取库存
    13. func (ps *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
    14. log.Println("id:", request.ProductId)
    15. return &ProductResponse{ProductStock: request.ProductId * 123 * rand.Int31()}, nil
    16. }
    17. // QueryProductStockClientStream
    18. // 客户端请求多次,服务器只写一次
    19. func (ps *productService) QueryProductStockClientStream(stream ProductService_QueryProductStockClientStreamServer) error {
    20. //request, err := stream.Recv()
    21. //if err != nil {
    22. // log.Fatalln(err)
    23. // return err
    24. //}
    25. //log.Println("stream request, id:", request.ProductId)
    26. //return stream.SendAndClose(&ProductResponse{ProductStock: request.ProductId * 1234 * rand.Int31()})
    27. count := 0
    28. for {
    29. count++
    30. request, err := stream.Recv() // 接收多次请求
    31. if err == io.EOF {
    32. log.Println("收到EOF,结束读取")
    33. err = stream.SendAndClose(&ProductResponse{ProductStock: rand.Int31()})
    34. if err != nil {
    35. log.Fatalln(err)
    36. return err
    37. }
    38. log.Println("发送完成,结束调用")
    39. return nil
    40. }
    41. if err != nil {
    42. log.Fatalln(err)
    43. return err
    44. }
    45. log.Println("stream id:", request.ProductId, ", count :", count)
    46. }
    47. }
    48. // QueryProductStockServerStream
    49. // 客户端只请求一次,服务器写多次
    50. func (ps *productService) QueryProductStockServerStream(request *ProductRequest, stream ProductService_QueryProductStockServerStreamServer) error {
    51. count := 0
    52. for {
    53. count++
    54. pr := &ProductResponse{ProductStock: request.ProductId * rand.Int31()}
    55. log.Println("发送给客户端:", pr.ProductStock, count)
    56. err := stream.Send(pr)
    57. if err != nil {
    58. log.Fatalln(err)
    59. return err
    60. }
    61. if count >= 10 {
    62. // 发送结束
    63. log.Println("发送结束.")
    64. return nil
    65. }
    66. time.Sleep(1 * time.Second)
    67. }
    68. }
    69. // QueryProductStockStream 双向流式 rpc
    70. func (ps *productService) QueryProductStockStream(stream ProductService_QueryProductStockStreamServer) error {
    71. count := 0
    72. for {
    73. count++
    74. request, err := stream.Recv()
    75. if err != nil {
    76. log.Fatalln(err)
    77. return err
    78. }
    79. log.Println("收到客户端请求:", request.ProductId, "count:", count)
    80. time.Sleep(time.Second)
    81. err = stream.Send(&ProductResponse{ProductStock: request.ProductId})
    82. if err != nil {
    83. log.Fatalln(err)
    84. return err
    85. }
    86. }
    87. }
    88. var ProductService = &productService{}

    util代码:

    1. package util
    2. const Address = ":8803"

    7.最后,先启动服务端,在启动客户端。

    客户端可以根据需要,来选择执行需要的方法,在main函数中的qs数组中设定。

  • 相关阅读:
    Django后台管理(二)
    基于AT89C51单片机的数字电压表PROTEUS仿真设计
    Python基础(适合初学-完整教程-学习时间一周左右-节约您的时间)
    【C++学习笔记】C++20的jthread
    Java基础
    【操作系统】文件系统之文件共享与文件保护
    【深度学习】实验3布置:PyTorch实战——CIFAR图像分类
    【操作系统——内存基本分段式存储管理】
    出行生态布局初显!魅族董事长沈子瑜:Flyme系统明年“上车”
    uart驱动框架及编程方法
  • 原文地址:https://blog.csdn.net/a145127/article/details/126323526