• Go语言操作grpc详细使用


    零、参考链接

    视频:90分钟搞懂分布式RPC开源框架

    一、protobuf的详细使用

    protobuf的单独使用

    二、grpc与protobuf的go文件的生成

    1.安装两个插件

    go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
    go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
    
    • 1
    • 2

    2.写proto文件

    syntax = "proto3";
    
    option go_package = "../proto_go;protos";
    
    
    message Person{
      string name = 1;
      int32 id = 2;
    }
    
    message IPhoneInfo {
      string name = 1;
      int32 id = 2;
      string email = 3;
    
      enum phoneType{
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
      }
    
      message phoneNumber{
        string number = 1;
        phoneType type = 2;
      }
    
      phoneNumber phones = 4;
    }
    
    service ProdService{    //grpc服务,这是最简单的一元grpc模式
      rpc GetProdStock(Person) returns (IPhoneInfo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    3.编译proto文件,生成go文件

    在pbfiles的文件目录下,运行下面的命令

    protoc --go_out=../proto_go --go_opt=paths=source_relative --go-grpc_out=../proto_go --go-grpc_opt=paths=source_relative echo.proto
    
    • 1

    会在proto_go下生成两个文件
    1111

    注意:每次修改了pbfiles中的.proto文件之后,都要重新编译生成两个文件

    三、grpc的详细使用

    可以直接看这个:gRPC开发: gRPC的四种通信模式

    1.一元RPC模式

    proto文件,最后的服务函数写这个,然后编译文件。

    service ProdService{
      rpc GetProdStock(Person) returns (IPhoneInfo);
    }
    
    • 1
    • 2
    • 3

    客户端

    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	protos "grpc/proto_go"
    	"log"
    	"time"
    )
    
    var person = []string{"cjs", "dh", "jay", "tom"}
    
    func main() {
    	conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		log.Fatalf("Did not connect:%v", err)
    	}
    	defer conn.Close()
    	c := protos.NewProdServiceClient(conn)
    
    	for _, name := range person {
    		p := &protos.Person{
    			Name: name,
    			Id:   1,  //无用瞎写的
    		}
    		ack, err := c.GetProdStock(context.Background(), p)
    		if err != nil {
    			panic(err)
    		}
    		fmt.Println(ack)
    		time.Sleep(time.Second)
    	}
    //并发协程测试,也是没问题的
    	var wg sync.WaitGroup
    	wg.Add(len(person))
    	for _, name := range person {
    		p := &protos.Person{
    			Name: name,
    			Id:   1,
    		}
    
    		go func(*protos.Person) {
    			ack, err := c.GetProdStock(context.Background(), p)
    			if err != nil {
    				panic(err)
    			}
    			fmt.Println(ack)
    			wg.Done()
    		}(p)
    
    	}
    
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    服务端

    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	protos "grpc/proto_go"
    	"net"
    )
    
    type server struct {
    	protos.UnimplementedProdServiceServer
    	//或者在这里面添加你需要的字段
    }
    
    func (s *server) GetProdStock(ctx context.Context, req *protos.Person) (*protos.IPhoneInfo, error) {
    	name := req.GetName()
    	fmt.Println(name)
    	if info, ok := AddressBook[name]; ok {
    		return info, nil
    	}
    	return nil, status.Errorf(codes.NotFound, "Order does not exist.:", req.Name, req.Id)
    }
    
    var AddressBook map[string]*protos.IPhoneInfo
    
    func main() {
    	//0.创建db
    	DBAddressBook()
    	//1.创建rpcserver
    	rpcs := grpc.NewServer()
    	//2.将上一步创建的rpc服务器,和对应的服务,进行注册
    	protos.RegisterProdServiceServer(rpcs, &server{})
    	//3.创建一个监听服务
    	listen, err := net.Listen("tcp", ":8080")
    	if err != nil {
    		panic(err)
    	}
    	defer listen.Close()
    	//4.启动rpc服务
    	if err = rpcs.Serve(listen); err != nil {
    		panic(err)
    	}
    }
    
    func DBAddressBook() {
    	AddressBook = make(map[string]*protos.IPhoneInfo)
    	AddressBook["cjs"] = &protos.IPhoneInfo{
    		Name:  "cjs",
    		Id:    1,
    		Email: "cjs_svip@163.com",
    		Phones: &protos.IPhoneInfoPhoneNumber{
    			Number: "18131371662",
    			Type:   1,
    		},
    	}
    	AddressBook["dh"] = &protos.IPhoneInfo{
    		Name:  "dh",
    		Id:    2,
    		Email: "dh_svip@163.com",
    		Phones: &protos.IPhoneInfoPhoneNumber{
    			Number: "18131371663",
    			Type:   2,
    		},
    	}
    	AddressBook["jay"] = &protos.IPhoneInfo{
    		Name:  "jay",
    		Id:    3,
    		Email: "jay_svip@163.com",
    		Phones: &protos.IPhoneInfoPhoneNumber{
    			Number: "18131371664",
    			Type:   protos.IPhoneInfo_MOBILE, //eunm的坑,注意这里无论是写0还是写这个,都解析不出来
    		},
    	}
    	AddressBook["tom"] = &protos.IPhoneInfo{
    		Name:  "tom",
    		Id:    4,
    		Email: "tom_svip@163.com",
    		Phones: &protos.IPhoneInfoPhoneNumber{
    			Number: "18131371665",
    			Type:   1,
    		},
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    2.客户端流RPC模式

    proto文件,最后的服务函数写这个,然后编译文件。

    message AddressBook{
      repeated IPhoneInfo people = 1;
    }
    
    service ProdService{
      rpc GetProdStock(Person) returns (IPhoneInfo);  //上一步中的,留着也没问题
      rpc GetCStreamStock(stream Person) returns (AddressBook);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    客户端

    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	protos "grpc/proto_go"
    	"log"
    )
    
    var person = []string{"cjs", "dh", "jay", "tom"}
    
    func main() {
    	conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		log.Fatalf("Did not connect:%v", err)
    	}
    	defer conn.Close()
    	//1.依托链接,创建rpc客户端
    	c := protos.NewProdServiceClient(conn)
    	//2.依托链接,获取客户端流传输器
    	stream, err := c.GetCStreamStock(context.Background())
    	if err != nil {
    		panic(err)
    	}
    
    	for _, name := range person {
    		p := &protos.Person{
    			Name: name,
    			Id:   1,
    		}
    		//3.利用stream的send实现流传输
    		err := stream.Send(p)
    		if err != nil {
    			panic(err)
    		}
    	}
    
    	recv, err := stream.CloseAndRecv() //4.结束客户端流,接收数据
    	if err != nil {
    		panic(err)
    	}
    	fmt.Println(recv)
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    服务端(把这个函数添加到上一步骤的服务端文件中即可)

    func (s *server) GetCStreamStock(stream protos.ProdService_GetCStreamStockServer) error {
    	book := protos.AddressBook{People: make([]*protos.IPhoneInfo, 0)}
    	for {
    		recv, err := stream.Recv()
    		if err == io.EOF {
    			return stream.SendAndClose(&book)
    		}
    		name := recv.GetName()
    		fmt.Println(name)
    		if info, ok := AddressBook[name]; ok {
    			book.People = append(book.People, info)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.服务器端流RPC模式

    proto文件,最后的服务函数写这个,然后编译文件。

    message SearchPerson{
      repeated Person per = 1;
    }
    service ProdService{
      rpc GetProdStock(Person) returns (IPhoneInfo);  //上一步中的,留着也没问题
      rpc GetCStreamStock(stream Person) returns (AddressBook);  //上一步中的,留着也没问题
      rpc GetSStreamStock(SearchPerson) returns (stream IPhoneInfo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    客户端

    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	protos "grpc/proto_go"
    	"io"
    	"log"
    )
    
    var person = []string{"cjs", "dh", "jay", "tom"}
    
    func main() {
    	conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		log.Fatalf("Did not connect:%v", err)
    	}
    	defer conn.Close()
    	c := protos.NewProdServiceClient(conn)
    	//1.因为是服务端流模式,客户端可以把数据一次性发送过去
    	lists := make([]*protos.Person, 0)
    	for _, name := range person {
    		p := &protos.Person{
    			Name: name,
    			Id:   1,
    		}
    		lists = append(lists, p)
    	}
    	//2.以服务端流接收方式,发送数据
    	ackStream, err := c.GetSStreamStock(context.Background(), &protos.SearchPerson{Per: lists})
    	if err != nil {
    		panic(err)
    	}
    	//3.循环接收服务端的流数据
    	for {
    		recv, err := ackStream.Recv()
    		if err != nil && err != io.EOF {
    			panic(err)
    		} else if err == io.EOF { //4.当EOF的时候代表没数据了
    			break
    		}
    
    		fmt.Println(recv)
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    服务端(把这个函数添加到上一步骤的服务端文件中即可)

    func (s *server) GetSStreamStock(sp *protos.SearchPerson, stream protos.ProdService_GetSStreamStockServer) error {
    	for _, person := range sp.Per {
    		if info, ok := AddressBook[person.Name]; ok {
    			err := stream.Send(info) //每次发送一部分数据
    			if err != nil {
    				return fmt.Errorf("error sending message to stream:%v", err)
    			}
    		}
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.双向流RPC模式

    proto文件,最后的服务函数写这个,然后编译文件。

    service ProdService{ 
      rpc GetProdStock(Person) returns (IPhoneInfo);//上一步中的,留着也没问题
      rpc GetCStreamStock(stream Person) returns (AddressBook);//上一步中的,留着也没问题
      rpc GetSStreamStock(SearchPerson) returns (stream IPhoneInfo);//上一步中的,留着也没问题
      rpc GetCSStreamStock(stream Person) returns (stream IPhoneInfo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    客户端

    package main
    
    import (
    	"context"
    	"fmt"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	protos "grpc/proto_go"
    	"io"
    	"log"
    	"sync"
    	"time"
    )
    
    var person = []string{"cjs", "dh", "jay", "tom"}
    
    func main() {
    	conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		log.Fatalf("Did not connect:%v", err)
    	}
    	defer conn.Close()
    	c := protos.NewProdServiceClient(conn)
    	stream, err := c.GetCSStreamStock(context.Background())
    	if err != nil {
    		panic(err)
    	}
    
    	var wg sync.WaitGroup
    	wg.Add(1)
    	go func() { //启动流接收协程
    		for {
    			recv, err := stream.Recv()
    			if err != nil && err != io.EOF {
    				panic(err)
    			} else if err == io.EOF {
    				wg.Done()
    				return
    			}
    			fmt.Println(recv)
    		}
    	}()
    
    	go func() { //启动流发送协程
    		for _, name := range person {
    			p := &protos.Person{
    				Name: name,
    				Id:   1,
    			}
    
    			err := stream.Send(p)
    			if err != nil {
    				panic(err)
    			}
    
    			time.Sleep(time.Second)
    		}
    		err := stream.CloseSend()
    		if err != nil {
    			panic(err)
    		}
    	}()
    
    	wg.Wait()
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    服务端(把这个函数添加到上一步骤的服务端文件中即可)

    func (s *server) GetCSStreamStock(stream protos.ProdService_GetCSStreamStockServer) error {
    	for {
    		recv, err := stream.Recv()
    		if err != nil && err != io.EOF {
    			panic(err)
    		} else if err == io.EOF {
    			return nil
    		}
    		if info, ok := AddressBook[recv.Name]; ok {
    			fmt.Println(recv.Name)
    			err := stream.Send(info)
    			if err != nil {
    				panic(err)
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • 相关阅读:
    CentOS 7 使用pugixml 库
    Elasticsearch学习
    视频号视频如何下载(WeChatVideoDownloader)
    【机器学习-西瓜书】-第3章-线性回归-学习笔记-下
    算法与数据结构【30天】集训营——图的遍历之深度优先搜索、广度优先搜索(28)
    关系数据库是个啥
    写给儿子的一封信
    让你随时随地访问金蝶云星空企业版v8.0,内网穿透轻松实现远程办公!
    接口压力测试 jmeter--增强篇(二)
    记一个带批注、表头样式的导入导出excel方法(基于easyexcel)
  • 原文地址:https://blog.csdn.net/qq_26039331/article/details/126918192