• gRPC四种通信模式


    gRPC四种通信模式

    gRPC有四种通信⽅式,分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Clientstreaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有以下特点:

    服务类型特点
    简单 RPC⼀般的rpc调⽤,传⼊⼀个请求对象,返回⼀个返回对象
    服务端流式 RPC传⼊⼀个请求对象,服务端可以返回多个结果对象
    客户端流式 RPC客户端传⼊多个请求对象,服务端返回⼀个结果对象
    双向流式 RPC结合客户端流式RPC和服务端流式RPC,可以传⼊多个请求对象,返回多个结果对

    RPC(Unary RPC)

    简单rpc 这就是⼀般的rpc调⽤,⼀个请求对象对应⼀个返回对象

    客户端发起⼀次请求,服务端响应⼀个数据,即标准RPC通信。

    这种模式,⼀个每⼀次都是发起⼀个独⽴的tcp连接,⾛⼀次三次握⼿和四次挥⼿!

    这个就是我们基础案例的示例,模式图如下

    在这里插入图片描述

    服务端流RPC

    服务端流式rpc ⼀个请求对象,服务端可以传回多个结果对象

    服务端流 RPC 下,客户端发出⼀个请求,但不会⽴即得到⼀个响应,⽽是在服务端与客户端之间建⽴⼀个单向的流,服务端可以随时向流
    中写⼊多个响应消息,最后主动关闭流,⽽客户端需要监听这个流,不断获取响应直到流关闭
    应⽤场景举例:

    典型的例⼦是客户端向服务端发送⼀个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

    在这里插入图片描述

    客户端流RPC

    客户端流式rpc 客户端传⼊多个请求对象,服务端返回⼀个响应结果

    应用场景如:物联⽹终端向服务器报送数据
    在这里插入图片描述

    双向流RPC

    双向流式rpc 结合客户端流式rpc和服务端流式rpc,可以传⼊多个对象,返回多个响应对象

    应⽤场景:聊天应⽤

    在这里插入图片描述

    案例

    新建 streamdemo.proto

    syntax = "proto3";
    option go_package = ".;proto";
    // 定义一个服务,gRPC自有的,它需要用grpc插件生成,也就是咱们安装的那个插件
    service Greeter{
      // 服务端流模式
      rpc GetStream(StreamRequestData) returns(stream StreamResponseData);
      // 客户端流模式
      rpc PutStream(stream StreamRequestData) returns(StreamResponseData);
      // 双向流模式
      rpc AllStream(stream StreamRequestData) returns(stream StreamResponseData);
    }
    
    // 类似于go的结构体,可以定义属性
    message StreamRequestData {
      string data = 1;
    }
    // 定义一个响应的类型
    message StreamResponseData {
      string data = 1;
    }
    

    执行命令,生成go文件

    protoc --go_out=. --go_opt=paths=source_relative ./streamdemo.proto
    
    protoc --go-grpc_out=. --go-grpc_opt=require_unimplemented_servers=false --go-grpc_opt=paths=source_relative ./streamdemo.proto
    

    客户端

    package main
    
    import (
    	"context"
    	"fmt"
    	"go_test_learn/grpc_stream/proto"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/credentials/insecure"
    	"sync"
    	"time"
    )
    
    func main() {
    	conn, err := grpc.Dial("127.0.0.1:50052", grpc.WithTransportCredentials(insecure.NewCredentials()))
    	if err != nil {
    		panic(err)
    	}
    	defer conn.Close()
    	client := proto.NewGreeterClient(conn)
    	// ******服务端流模式示例***start******
    	//res,err:=client.GetStream(context.Background(),&proto.StreamRequestData{Data: "开始要时间"})
    	//if err!=nil{
    	//	panic(err)
    	//}
    	//for  { // 开启死循环,不停接收数据
    	//	r,err:=res.Recv() // 本质就是socket接收数据
    	//	if err != nil {
    	//		fmt.Println("接收出错")
    	//		break
    	//	}
    	//	fmt.Println(r.Data)
    	//}
    	// ******服务端流模式示例***end******
    
    	// ******客户端端流模式示例***start******
    	//res, err := client.PutStream(context.Background())
    	//if err != nil {
    	//	fmt.Println(err)
    	//	panic(err)
    	//}
    	//var i = 0
    	//for {
    	//	res.Send(&proto.StreamRequestData{
    	//		Data: fmt.Sprintf("客户端当前时间为:%v", time.Now().Unix()),
    	//	})
    	//	time.Sleep(time.Second)
    	//	i++
    	//	if i == 10 {
    	//		break
    	//	}
    	//}
    	// ******客户端流模式示例***end******
    
    	// ******双向流模式示例***start******
    	req, err := client.AllStream(context.Background())
    	wg := sync.WaitGroup{}
    	wg.Add(2)
    
    	// 开协程发送数据,发送10次
    	go func() {
    		for i := 0; i < 10; i++ {
    			req.Send(&proto.StreamRequestData{
    				Data: "客户端发送的数据",
    			})
    			time.Sleep(time.Second)
    		}
    		wg.Done()
    	}()
    	// 开协程接收数据,接收10次
    	stream, err := client.AllStream(context.Background())
    	if err != nil {
    		panic(err)
    	}
    	wg := sync.WaitGroup{}
    	wg.Add(2)
    
    	// 开 goroutine 发送数据
    	go func() {
    		for i := 0; i < 10; i++ {
    			stream.Send(&proto.StreamRequestData{
    				Data: "客户端发送的数据",
    			})
    			time.Sleep(time.Second)
    		}
    		wg.Done()
    	}()
    
    	// 开goroutine 接收数据
    	go func() {
    		for i := 0; i < 10; i++ {
    			recv, err := stream.Recv()
    			if err != nil {
    				panic(err)
    			}
    			fmt.Println(recv.Data)
    		}
    		wg.Done()
    	}()
    	wg.Wait()
    	// ******双向流模式示例***end******
    
    }
    

    服务端

    package main
    
    import (
    	"fmt"
    	"go_test_learn/grpc_stream/proto"
    	"google.golang.org/grpc"
    	"net"
    	"sync"
    	"time"
    )
    
    type GreeterServer struct {
    }
    
    // 服务端流模式
    func (ser *GreeterServer) GetStream(request *proto.StreamRequestData, response proto.Greeter_GetStreamServer) error {
    	fmt.Println(request.Data) // 打印了一下从客户端传入的数据
    	var i = 0                 // 定义一个数字,就发送10次
    	for {                     // 死循环源源不断向客户端发送
    		// 以流的方式向客户端发送数据
    		response.Send(&proto.StreamResponseData{
    			Data: fmt.Sprintf("当前服务器时间为:%v", time.Now().Unix()),
    		})
    		time.Sleep(time.Second) // 睡1s钟再发
    		i++                     //自增1
    		if i == 10 {            // i为10的时候结束
    			break
    		}
    	}
    	return nil
    }
    
    // 客户端流模式
    func (ser *GreeterServer) PutStream(request proto.Greeter_PutStreamServer) error {
    	for {
    		// 不断的接收并打印客户端发送过来的数据
    		res, err := request.Recv()
    		if err != nil {
    			return err
    		}
    		fmt.Println(res.Data)
    	}
    	return nil
    }
    
    // 双向流模式
    func (ser *GreeterServer) AllStream(request proto.Greeter_AllStreamServer) error {
    	// 开启两个协程,一个接收数据,一个发送数据
    	var wg=sync.WaitGroup{}
    	wg.Add(2)
    	go func() { // 发送数据
    		for {
    			err := request.Send(&proto.StreamResponseData{
    				Data: "服务端给你的数据",
    			})
    			if err != nil {
    				fmt.Println(err)
    				break
    			}
    			time.Sleep(time.Second) // 睡1s发送一次
    		}
    		wg.Done()
    
    	}()
    	go func() { // 不停接收数据
    		for {
    			res, err := request.Recv()
    			if err != nil {
    				fmt.Println(err)
    				break
    			}
    			fmt.Println(res.Data)
    		}
    		wg.Done()
    
    	}()
    
    	wg.Wait()
    	return nil
    }
    
    func main() {
    	// 创建一个监听tcp,50052端口
    	lis, err := net.Listen("tcp", ":50052")
    	if err != nil {
    		panic(err)
    	}
    	// new一个grpc的server
    	ser := grpc.NewServer()
    	// 注册服务
    	proto.RegisterGreeterServer(ser, &GreeterServer{})
    	// 让grpc对外提供服务
    	err = ser.Serve(lis)
    	if err != nil {
    		panic(err)
    	}
    
    }
    

    总结

    gRPC设计为低延迟和⾼吞吐量通信。gRPC⾮常适⽤于效率⾄关重要的轻型微服务。点对点实时通信 - gRPC对双向流媒体提供出⾊的⽀持。

    gRPC服务可以实时推送消息⽽⽆需轮询。多语⾔混合开发环境 - gRPC⼯具⽀持所有流⾏的开发语⾔,使gRPC成为多语⾔开发环境的理想选择。

    ⽹络受限环境 - 使⽤Protobuf(⼀种轻量级消息格式)序列化gRPC消息。gRPC消息始终⼩于等效的JSON消息

  • 相关阅读:
    快速部署 微软开源的 Garnet 键值数据库
    docker---dockerfile相关知识
    人工智能GPT-4o?
    SpringBoot
    Windows定时截屏、后台自动截屏工具,带有密码保护功能 —— 定时执行专家
    编码器如何控制单相霍尔电机。只有一路霍尔信号,电机只能正转不能反转。能移植野火Pid控制吗
    竞赛选题 目标检测-行人车辆检测流量计数
    剑指 Offer 06. 从尾到头打印链表
    java毕业设计项目源代码S2SH健身俱乐部会员系统|健身房
    AS模拟器不能启动出现Could not start AVD
  • 原文地址:https://blog.csdn.net/qq_55752792/article/details/127068092