前言
我们使用gRPC,一把是把它作为微服务。因为,它与语言无关,可以适配多种语言。它的底层实现,使用的是HTTP/2。
在使用时,我们需要通过"protoc"命令,为我们生成protocol-buffers的相关代码,它还会为我们生成gRPC相关代码。然后分别在客户端、服务端分别使用相应的代码即可。
1.安装"protocol-buffers"代码生成工具、"gRPC"代码生成工具。
- $ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
- $ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
2.创建一个新的项目
项目名称任意就好,在该项目的根路径下创建一个"proto"的文件夹,用来存放我们的".proto"文件。
这个".proto"文件,其实就是protocol-buffers的文件,它可以为我们不同语言,生成符合protocol-buffers规范的代码。
这种代码一般以"***.pb.go"结尾。protocol-buffers存储结构利于传输,比json效率要更加好。因此,它得到了广泛的使用。
3.创建"product.proto"文件
并把下面的内容复制到该文件中
- # proto 语法,有2和3之分,这里我们使用proto3的语法
- syntax = "proto3";
-
-
- # path;name path,是指proto生成代码的保存路径,name是指生成代码的文件名称
- option go_package = "../service";
-
- # 指定被生成代码的包名
- package service;
-
- # 定义对象、结构体,与我们的JavaBean很相似,这是一个请求对象
- message ProductRequest{
- int32 productId = 1;
- }
-
- # 定义对象、结构体,与我们的JavaBean很相似,这是一个响应对象
- message ProductResponse{
- int32 productStock = 1;
- }
-
-
- # 定义我们的服务,类似于Java的controller
- service ProductService{
- // 普通 rpc
- rpc GetProductStock(ProductRequest) returns(ProductResponse);
- // 客户端流式 rpc
- rpc QueryProductStockClientStream(stream ProductRequest) returns(ProductResponse);
- // 服务端流式 rpc
- rpc QueryProductStockServerStream(ProductRequest) returns(stream ProductResponse);
- // 双向流式 rpc
- rpc QueryProductStockStream(stream ProductRequest) returns(stream ProductResponse);
- }
4.使用"protoc"生成代码
在执行下面的命令生成之前,请在根目录下,先创建一个"service"的目录,用来保存我们服务的代码。
使用命令行工具,进入到我们项目的根路径下,并执行如下命令:
- # protoc protocol buffers命令
- # go_out 参数指定pb文件生成目录
- # go-grpc_out参数指定服务代码生成目录
- # proto/product.proto 指定.proto文件
- $ protoc --go_out=./service/ --go-grpc_out=./service/ proto/product.proto
效果图:

5.项目下载"gRPC"相关资源到本地
使用命令行工具,进入到更目录下,执行如下命令:
$ go mod tidy
执行后,它就会自动下载相应资源
6.分别在项目根路径创建client、server目录,并在两个目录中都新建"main.go"文件
用来模拟客户端、服务端访问。
客户端代码:
- package main
-
- import (
- "context"
- "io"
- "log"
- "math/rand"
- "sync"
- "time"
-
-
- "04-stream/service"
- "04-stream/util"
-
-
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- )
-
- var wg = &sync.WaitGroup{}
-
- func getProductStock(client service.ProductServiceClient) {
- defer wg.Done()
-
- ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
- defer cancel()
-
- response, err := client.GetProductStock(ctx, &service.ProductRequest{ProductId: 100})
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- log.Println(response.ProductStock)
- }
-
- //func queryProductStockStream(client service.ProductServiceClient) {
- // defer wg.Done()
- //
- // ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
- // defer cancel()
- //
- // streamClient, err := client.QueryProductStockClientStream(ctx)
- // if err != nil {
- // log.Fatalln(err)
- // return
- // }
- //
- // err = streamClient.Send(&service.ProductRequest{ProductId: 222})
- // if err != nil {
- // log.Fatalln(err)
- // return
- // }
- //
- // response, err := streamClient.CloseAndRecv()
- // if err != nil {
- // log.Fatalln(err)
- // return
- // }
- //
- // log.Println(response.ProductStock)
- //}
-
- func queryProductStockClientStream(client service.ProductServiceClient) {
- defer wg.Done()
-
- count := 0
-
- ctx := context.TODO()
-
- stream, err := client.QueryProductStockClientStream(ctx)
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- for true {
- count++
-
- request := &service.ProductRequest{ProductId: rand.Int31()}
- log.Println("send ProductId:", request.ProductId, ", count:", count)
-
- err = stream.Send(request)
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- time.Sleep(time.Second)
- if count >= 10 {
- break
- }
- }
-
- response, err := stream.CloseAndRecv()
- if err != nil {
- log.Fatalln(err)
- return
- }
- log.Println("收到服务器的数据:", response.ProductStock)
- }
-
- func queryProductStockServerStream(client service.ProductServiceClient) {
- defer wg.Done()
-
- count := 0
-
- // 客户端只发送一次
- serverStream, err := client.QueryProductStockServerStream(context.TODO(), &service.ProductRequest{ProductId: rand.Int31()})
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- // 接收多次
- for {
- count++
-
- response, err := serverStream.Recv()
- if err == io.EOF {
- log.Println("客户端接收完毕.")
- return
- }
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- log.Println("客户端收到:", response.ProductStock, count)
- }
-
- }
-
- func queryProductStockStream(client service.ProductServiceClient) {
- defer wg.Done()
-
- stream, err := client.QueryProductStockStream(context.TODO())
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- for {
- err := stream.Send(&service.ProductRequest{ProductId: rand.Int31()})
- if err != nil {
- log.Fatalln(err)
- return
- }
- time.Sleep(time.Second)
-
- response, err := stream.Recv()
- if err != nil {
- log.Fatalln(err)
- return
- }
- log.Println("收到服务端数据:", response.ProductStock)
- }
-
- }
-
- type queryFn func(client service.ProductServiceClient)
-
- func main() {
- qs := []queryFn{
- //getProductStock,
- //queryProductStockClientStream,
- //queryProductStockServerStream,
- queryProductStockStream,
- }
-
- conn, err := grpc.Dial(util.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- client := service.NewProductServiceClient(conn)
-
- for _, fn := range qs {
- wg.Add(1)
- go fn(client)
- }
-
- wg.Wait()
- }
服务端代码:
- package main
-
- import (
- "04-stream/service"
- "04-stream/util"
- "google.golang.org/grpc"
- "log"
- "net"
- )
-
- func main() {
- server := grpc.NewServer()
-
- service.RegisterProductServiceServer(server, service.ProductService)
-
- listen, err := net.Listen("tcp", util.Address)
- if err != nil {
- log.Fatalln(err)
- return
- }
-
- if err = server.Serve(listen); err != nil {
- return
- }
-
- }
service代码:
- package service
-
- import (
- "context"
- "io"
- "log"
- "math/rand"
- "time"
- )
-
- type productService struct {
- }
-
- func (ps *productService) mustEmbedUnimplementedProductServiceServer() {}
-
- // GetProductStock 获取库存
- func (ps *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
- log.Println("id:", request.ProductId)
- return &ProductResponse{ProductStock: request.ProductId * 123 * rand.Int31()}, nil
- }
-
- // QueryProductStockClientStream
- // 客户端请求多次,服务器只写一次
- func (ps *productService) QueryProductStockClientStream(stream ProductService_QueryProductStockClientStreamServer) error {
- //request, err := stream.Recv()
- //if err != nil {
- // log.Fatalln(err)
- // return err
- //}
- //log.Println("stream request, id:", request.ProductId)
- //return stream.SendAndClose(&ProductResponse{ProductStock: request.ProductId * 1234 * rand.Int31()})
-
- count := 0
- for {
- count++
-
- request, err := stream.Recv() // 接收多次请求
- if err == io.EOF {
- log.Println("收到EOF,结束读取")
-
- err = stream.SendAndClose(&ProductResponse{ProductStock: rand.Int31()})
- if err != nil {
- log.Fatalln(err)
- return err
- }
-
- log.Println("发送完成,结束调用")
- return nil
- }
- if err != nil {
- log.Fatalln(err)
- return err
- }
- log.Println("stream id:", request.ProductId, ", count :", count)
- }
- }
-
- // QueryProductStockServerStream
- // 客户端只请求一次,服务器写多次
- func (ps *productService) QueryProductStockServerStream(request *ProductRequest, stream ProductService_QueryProductStockServerStreamServer) error {
- count := 0
-
- for {
- count++
-
- pr := &ProductResponse{ProductStock: request.ProductId * rand.Int31()}
- log.Println("发送给客户端:", pr.ProductStock, count)
-
- err := stream.Send(pr)
- if err != nil {
- log.Fatalln(err)
- return err
- }
-
- if count >= 10 {
- // 发送结束
- log.Println("发送结束.")
- return nil
- }
-
- time.Sleep(1 * time.Second)
- }
- }
-
- // QueryProductStockStream 双向流式 rpc
- func (ps *productService) QueryProductStockStream(stream ProductService_QueryProductStockStreamServer) error {
- count := 0
- for {
- count++
-
- request, err := stream.Recv()
- if err != nil {
- log.Fatalln(err)
- return err
- }
- log.Println("收到客户端请求:", request.ProductId, "count:", count)
-
- time.Sleep(time.Second)
- err = stream.Send(&ProductResponse{ProductStock: request.ProductId})
- if err != nil {
- log.Fatalln(err)
- return err
- }
- }
- }
-
- var ProductService = &productService{}
util代码:
- package util
-
- const Address = ":8803"
7.最后,先启动服务端,在启动客户端。
客户端可以根据需要,来选择执行需要的方法,在main函数中的qs数组中设定。