GRPC基础概念:
- GRPC是google开源的一个高性能、跨语言的RPC框架,基于HTTP2协议,基于protobuf 3.x,基于Netty 4.x.
Protocol Buffers:
- 一个跨语言、跨平台的具有可扩展机制的序列化数据工具。也就是说,我在ubuntu下用python语言序列化一个对象,并使用http协议传输到使用java语言的android客户端,java使用对用的代码工具进行反序列化,也可以得到对应的对象
proto 基础语法:
//指定proto3语法
syntax = "proto3";
//指定作用域
package xxx;
//java_multiple_files = true; 表示在生成Java代码时,每个`.proto`文件都会生成一个独立的Java文件
//声明 rpc 服务接口
//关键字: service 声明需要生成的服务接口"类"
service Greeter {
// 关键字: rpc 声明服务方法,包括方法名、请求消息(请求体)、相应消息(响应体)
rpc SayHello(HelloRequest) returns (HelloResponse);
}
//声明请求、响应消息
//关键字: message 声明请求体和响应体
message HelloRequest {
//标识号 1
//编号的范围为1 ~ 536,870,911(2^29-1),其中19000~19999不可用。因为Protobuf协议的实现过程中对预留了这些编号
string name = 1;
//表示一个人有多个号码
repeated string phone = 3;
}
message HelloResponse {
string message = 1;
}
调用类型:
Unary RPC 一元RPC调用,也叫简单RPC调用
rpc SayHello(HelloRequest) returns (HelloResponse);
【服务端 Server Stream RPC】流式RPC. 客户端向服务端发送单个请求,服务端以流的方式返回一系列消息。客户端从流中读取消息,直到没有更多的消息。当然,返回的消息当然不是乱序的,gRPC保证单个请求中的消息顺序
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
【客户端 Client Stream RPC】流式RPC调用。客户端向服务端请求一系列的消息,一旦客户端完成消息写入,就会等待服务端读取所有消息并处理它们。gRPC同样会保证单个请求中消息的顺序性
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
【双向流式调用 Bidirectional Streaming RPC】就是客户端和服务端均以流的方式进行读写消息。这两个流式完全独立的,因此,服务端和客户端可以按照他们喜欢的方式写入和读取流。比如:服务端可以在等待所有的客户端消息发送到后,再处理;也可以交替读取请求、写入响应信息等。当然,每个流中的消息顺序是可以保证的。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
Channels通道
gRPC通道提供了一条链接到指定主机和端口号的服务端的链接。他在创建客户端stub的时候使用。客户端可以指定通道参数来修改gRPC的默认行为,例如:打开或者关闭消息压缩。一个通道是有连接和空闲两个状态的
mvn 插件整合,proto编译,生成代码
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
exclude>
excludes>
configuration>
plugin>
<plugin>
<groupId>org.xolstice.maven.pluginsgroupId>
<artifactId>protobuf-maven-pluginartifactId>
<version>0.6.1version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}protocArtifact>
<pluginId>grpc-javapluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/protoprotoSourceRoot>
<outputDirectory>${project.basedir}/src/main/javaoutputDirectory>
<clearOutputDirectory>falseclearOutputDirectory>
configuration>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>compile-customgoal>
goals>
execution>
executions>
plugin>
plugins>
<extensions>
<extension>
<groupId>kr.motd.mavengroupId>
<artifactId>os-maven-pluginartifactId>
<version>1.6.2version>
extension>
extensions>
build>
Spring boot 整合 grpc
大致流程:
项目结构:
整合代码:
父 pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.7.17version>
<relativePath/>
parent>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-demoartifactId>
<version>1.0-SNAPSHOTversion>
<packaging>pompackaging>
<name>grpc-demoname>
<modules>
<module>grpc-servermodule>
<module>grpc-clientmodule>
<module>grpc-protomodule>
modules>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
<java.version>1.8java.version>
<grpc.version>2.15.0.RELEASEgrpc.version>
properties>
<dependencies>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.10version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>net.devhgroupId>
<artifactId>grpc-spring-boot-starterartifactId>
<version>${grpc.version}version>
<type>pomtype>
<scope>importscope>
dependency>
<dependency>
<groupId>net.devhgroupId>
<artifactId>grpc-server-spring-boot-starterartifactId>
<version>${grpc.version}version>
<type>pomtype>
<scope>importscope>
dependency>
<dependency>
<groupId>net.devhgroupId>
<artifactId>grpc-client-spring-boot-starterartifactId>
<version>${grpc.version}version>
<type>pomtype>
<scope>importscope>
dependency>
dependencies>
dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
proto 模块
syntax = "proto3";
package com.ht.meta;
// 生成类的包名
option java_multiple_files = true;
// 生成代码位置
option java_package = "com.meta";
// 定义的所有消息、枚举和服务生成对应的多个类文件,而不是以内部类的形式出现
option java_outer_classname = "HtMetaInfoSyncProto";
service HtMetaInfoSyncService {
rpc syncMeta (HtMetaSyncRequest) returns (HtMetaSyncResponse) {}
}
//同步类型
enum SyncType {
ADD = 0; // 新增
DEL = 1; // 删除
EDIT = 2; // 修改
}
//同步Request
message HtMetaSyncRequest {
//json内容
string syncJson = 1;
//同步类型
SyncType syncType = 2;
}
//响应Response
message HtMetaSyncResponse {
//响应码
string code = 1;
//提示
string msg = 2;
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-demoartifactId>
<version>1.0-SNAPSHOTversion>
parent>
<artifactId>grpc-protoartifactId>
<packaging>jarpackaging>
<name>grpc-protoname>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
<dependencies>
<dependency>
<groupId>net.devhgroupId>
<artifactId>grpc-spring-boot-starterartifactId>
<version>${grpc.version}version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
exclude>
excludes>
configuration>
plugin>
<plugin>
<groupId>org.xolstice.maven.pluginsgroupId>
<artifactId>protobuf-maven-pluginartifactId>
<version>0.6.1version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}protocArtifact>
<pluginId>grpc-javapluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/protoprotoSourceRoot>
<outputDirectory>${project.basedir}/src/main/javaoutputDirectory>
<clearOutputDirectory>falseclearOutputDirectory>
configuration>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>compile-customgoal>
goals>
execution>
executions>
plugin>
plugins>
<extensions>
<extension>
<groupId>kr.motd.mavengroupId>
<artifactId>os-maven-pluginartifactId>
<version>1.6.2version>
extension>
extensions>
build>
project>
服务端:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-demoartifactId>
<version>1.0-SNAPSHOTversion>
parent>
<artifactId>grpc-serverartifactId>
<packaging>jarpackaging>
<name>grpc-servername>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
<dependencies>
<dependency>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-protoartifactId>
<version>1.0-SNAPSHOTversion>
dependency>
<dependency>
<groupId>net.devhgroupId>
<artifactId>grpc-server-spring-boot-starterartifactId>
<version>${grpc.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.28version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.83version>
dependency>
dependencies>
project>
server:
port: 8080
spring:
application:
name: spring-boot-grpc-server
# gRPC有关的配置,这里只需要配置服务端口号
grpc:
server:
port: 9898
服务端代码:
import com.meta.HtMetaInfoSyncServiceGrpc;
import com.meta.HtMetaSyncRequest;
import com.meta.HtMetaSyncResponse;
import com.meta.SyncType;
import com.util.JsonUtil;
import com.vo.HtMetaClusterInfoVo;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.List;
/**
* @author 何永豪
* @className HtMetaSyncService
* @description TODO
* @date 2023/11/6 15:25
*/
@Slf4j
@GrpcService
public class HtMetaSyncService extends HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceImplBase {
@Override
public void syncMeta(HtMetaSyncRequest request, StreamObserver<HtMetaSyncResponse> responseObserver) {
String syncJson = request.getSyncJson();
log.info("接收到json:{}",syncJson);
List<HtMetaClusterInfoVo> list = JsonUtil.toList(syncJson, HtMetaClusterInfoVo.class);
SyncType syncType = request.getSyncType();
int number = syncType.getNumber();
log.info("同步类型:{}",number);
HtMetaSyncResponse syncResponse = HtMetaSyncResponse.newBuilder()
.setCode("1000")
.setMsg("同步成功").build();
responseObserver.onNext(syncResponse);
responseObserver.onCompleted();
}
}
客户端:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-demoartifactId>
<version>1.0-SNAPSHOTversion>
parent>
<artifactId>grpc-clientartifactId>
<packaging>jarpackaging>
<name>grpc-clientname>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
<dependencies>
<dependency>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-protoartifactId>
<version>1.0-SNAPSHOTversion>
dependency>
<dependency>
<groupId>net.devhgroupId>
<artifactId>grpc-client-spring-boot-starterartifactId>
<version>${grpc.version}version>
dependency>
dependencies>
project>
yml
server:
port: 8088
spring:
application:
name: local-client
grpc:
client:
# gRPC配置的名字,GrpcClient注解会用到
local-grpc-server:
# gRPC服务端地址
address: 'static://127.0.0.1:9898'
enableKeepAlive: true
keepAliveWithoutCalls: true
#认证类型,无加密
negotiationType: plaintext
客户端stub
import com.meta.HtMetaInfoSyncServiceGrpc;
import com.meta.HtMetaSyncRequest;
import com.meta.HtMetaSyncResponse;
import com.meta.SyncType;
import io.grpc.StatusRuntimeException;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
/**
* @author xiaoshu
*/
@Service
public class HtMetaInfoSyncClient {
@GrpcClient("local-grpc-server")
private HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceBlockingStub stub;
public String syncMeta(final String json,final SyncType syncType) {
try {
HtMetaSyncResponse htMetaSyncResponse = stub.syncMeta((
HtMetaSyncRequest.newBuilder()
.setSyncJson(json)
.setSyncType(syncType)
.build()));
String code = htMetaSyncResponse.getCode();
String msg = htMetaSyncResponse.getMsg();
return code+":"+msg;
} catch (final StatusRuntimeException e) {
return "FAILED with " + e.getStatus().getCode().name();
}
}
}
实际调用:
@Autowired
private HtMetaInfoSyncClient htMetaInfoSyncClient;
@RequestMapping("/htMetaSync")
public String htMetaSync() {
String json="";
return htMetaInfoSyncClient.syncMeta(json, SyncType.ADD);
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.7.14version>
<relativePath/>
parent>
<groupId>com.xiaoshuzxcgroupId>
<artifactId>grpc-native-apiartifactId>
<packaging>jarpackaging>
<name>grpc-native-apiname>
<description>grpc原生api整合方式description>
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<grpc.version>1.29.0grpc.version>
<proto.version>3.12.0proto.version>
<netty.tcnative.version>2.0.30.Finalnetty.tcnative.version>
properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpcgroupId>
<artifactId>grpc-bomartifactId>
<version>${grpc.version}version>
<type>pomtype>
<scope>importscope>
dependency>
dependencies>
dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-configuration-processorartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>com.xiaoshugroupId>
<artifactId>grpc-protoartifactId>
<version>1.0-SNAPSHOTversion>
dependency>
<dependency>
<groupId>io.grpcgroupId>
<artifactId>grpc-protobufartifactId>
dependency>
<dependency>
<groupId>io.grpcgroupId>
<artifactId>grpc-stubartifactId>
dependency>
<dependency>
<groupId>io.grpcgroupId>
<artifactId>grpc-nettyartifactId>
dependency>
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-tcnative-boringssl-staticartifactId>
<version>${netty.tcnative.version}version>
<scope>runtimescope>
dependency>
dependencies>
project>
import org.springframework.stereotype.Service;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface GrpcService {
}
启动grpc服务监听:
import com.annotation.GrpcService;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.stream.Stream;
/**
* @author 何永豪
* @className GrpcServer
* @description TODO
* @date 2023/11/7 15:56
*/
@Slf4j
public class GrpcServer implements CommandLineRunner, DisposableBean {
private Server server;
@Resource
private AbstractApplicationContext applicationContext;
@Resource
private GrpcProperties grpcProperties;
@Override
public void destroy() {
stop();
}
@Override
public void run(String... args) throws Exception {
start();
}
private SslContextBuilder getSslContextBuilder(){
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(new File(grpcProperties.getServerCertPath()), new File(grpcProperties.getServerPrivateKeyPath()));
sslContextBuilder.trustManager(new File(grpcProperties.getServerTrustCertPath()));
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
return GrpcSslContexts.configure(sslContextBuilder);
}
private void start() throws IOException {
NettyServerBuilder nettyServerBuilder = NettyServerBuilder
.forPort(grpcProperties.getPort());
// .sslContext(getSslContextBuilder().build()
// );
scanBeanWithAnnotation(GrpcService.class, BindableService.class)
.forEach(e->{
BindableService bindableService = applicationContext.getBeanFactory().getBean(e, BindableService.class);
nettyServerBuilder.addService(bindableService.bindService());
});
server = nettyServerBuilder.build().start();
log.info("grpc start listen {}",grpcProperties.getPort());
Thread thread = new Thread(() -> {
try {
GrpcServer.this.blockUntilShutdown();
} catch (InterruptedException e) {
log.error("grpc server stopped");
throw new RuntimeException(e);
}
});
thread.setDaemon(false);
thread.start();
}
private void stop(){
if (server !=null){
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server !=null ){
server.awaitTermination();
}
}
private <T> Stream<String> scanBeanWithAnnotation(Class<? extends Annotation> annotionType,Class<T> beanType){
String[] beanNamesForType = applicationContext.getBeanNamesForType(beanType);
return Stream.of(beanNamesForType).filter(e->{
BeanDefinition beanDefinition = applicationContext.getBeanFactory().getBeanDefinition(e);
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(annotionType);
if (beansWithAnnotation.containsKey(e)){
return true;
}else if (beanDefinition.getSource() instanceof AnnotatedTypeMetadata){
return AnnotatedTypeMetadata.class.cast(beanDefinition.getSource()).isAnnotated(annotionType.getName());
}
return false;
});
}
}
服务端:
@Slf4j
@GrpcService
public class HtMetaSyncService extends HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceImplBase {
@Override
public void syncMeta(HtMetaSyncRequest request, StreamObserver<HtMetaSyncResponse> responseObserver) {
String syncJson = request.getSyncJson();
log.info("接收到json:{}",syncJson);
SyncType syncType = request.getSyncType();
int number = syncType.getNumber();
log.info("同步类型:{}",number);
HtMetaSyncResponse syncResponse = HtMetaSyncResponse.newBuilder()
.setCode("1000")
.setMsg("同步成功").build();
responseObserver.onNext(syncResponse);
responseObserver.onCompleted();
}
}
客户端
import com.config.GrpcProperties;
import com.meta.HtMetaInfoSyncServiceGrpc;
import com.meta.HtMetaSyncRequest;
import com.meta.HtMetaSyncResponse;
import com.meta.SyncType;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyChannelBuilder;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* @author xiaoshu
*/
@Component
public class HtMetaInfoSyncClient {
@Resource
private GrpcProperties grpcProperties;
@SneakyThrows
public String syncMeta(final String json,final SyncType syncType) {
ManagedChannel channel = null;
try {
channel=NettyChannelBuilder.
forAddress(grpcProperties.getServerIp(),grpcProperties.getPort())
//非加密连接
.usePlaintext()
//加密
//.sslContext(grpcProperties.buildClentSslContext())
.build();
HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceBlockingStub stub = HtMetaInfoSyncServiceGrpc.newBlockingStub(channel);
HtMetaSyncResponse htMetaSyncResponse = stub.syncMeta((
HtMetaSyncRequest.newBuilder()
.setSyncJson(json)
.setSyncType(syncType)
.build()));
String code = htMetaSyncResponse.getCode();
String msg = htMetaSyncResponse.getMsg();
return code+":"+msg;
} catch (final StatusRuntimeException e) {
return "FAILED with " + e.getStatus().getCode().name();
}finally {
if (Objects.nonNull(channel)){
channel.shutdown();
}
}
}
}
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 何永豪
* @className GrpcConfig
* @description TODO
* @date 2023/11/7 16:58
*/
@Configuration
public class GrpcConfig {
@Bean
@ConditionalOnProperty(value = "grpc.enable",havingValue = "true",matchIfMissing = true)
public GrpcServer grpcServer(){
return new GrpcServer();
}
}
@Component
@ConfigurationProperties(prefix = "grpc")
@Data
public class GrpcProperties {
private Integer port;
private String serverIp;
private String serverCertPath;
private String serverPrivateKeyPath;
private String serverTrustCertPath;
private String clientCertPath;
private String clientCertChainPath;
private String clientPrivateKeyPath;
/*public SslContext buildClentSslContext() throws SSLException {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
sslContextBuilder.trustManager(new File(clientCertPath));
sslContextBuilder.keyManager(new File(clientCertChainPath),new File(clientPrivateKeyPath));
return sslContextBuilder.build();
}*/
}
server:
port: 8077
spring:
application:
name: grpc-api
grpc:
#监听端口
port: 6100
#目标IP
server-ip: 127.0.0.1
#ssl配置
server-cert-path: /ca/server/server.crt
server-private-key-path: /ca/server/server.pem
server-trust-cert-path: /ca/server/ca.crt
客户端 -> -> -> 服务端
服务端 -> -> 客户端
service HtMetaInfoSyncService {
rpc syncMeta (stream HtMetaSyncRequest) returns (stream HtMetaSyncResponse) {}
}
客户端:
import com.meta.HtMetaInfoSyncServiceGrpc;
import com.meta.HtMetaSyncRequest;
import com.meta.HtMetaSyncResponse;
import com.meta.SyncType;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
/**
* @author xiaoshu
*/
@Slf4j
@Service
public class HtMetaInfoSyncClient {
@GrpcClient("local-grpc-server")
private HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceStub stub;
public void syncMeta(final String json,final SyncType syncType) {
try {
StreamObserver<HtMetaSyncResponse> responseObserver = new StreamObserver<HtMetaSyncResponse>() {
@Override
public void onNext(HtMetaSyncResponse response) {
// 处理响应
log.info("处理响应");
String msg = response.getMsg();
String code = response.getCode();
log.info("msg: {}",msg);
log.info("code: {}",code);
}
@Override
public void onError(Throwable t) {
// 处理错误
log.info("处理错误");
}
@Override
public void onCompleted() {
// 处理完成
log.info("处理完成");
}
};
stub.syncMeta(responseObserver);
StreamObserver<HtMetaSyncRequest> streamRequest = stub.syncMeta(responseObserver);
// 发送请求
streamRequest.onNext(
HtMetaSyncRequest.newBuilder()
.setSyncType(syncType)
.setSyncJson(json)
.build());
// 发送请求
streamRequest.onNext(
HtMetaSyncRequest.newBuilder()
.setSyncType(syncType)
.setSyncJson(json+"请求2")
.build());
// 完成请求
responseObserver.onCompleted();
} catch (final StatusRuntimeException e) {
Status status = e.getStatus();
}
}
}
服务端:
import com.meta.HtMetaInfoSyncServiceGrpc;
import com.meta.HtMetaSyncRequest;
import com.meta.HtMetaSyncResponse;
import com.meta.SyncType;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
/**
* @author 何永豪
* @className HtMetaSyncService
* @description TODO
* @date 2023/11/6 15:25
*/
@Slf4j
@GrpcService
public class HtMetaSyncService extends HtMetaInfoSyncServiceGrpc.HtMetaInfoSyncServiceImplBase {
@Override
public StreamObserver<HtMetaSyncRequest> syncMeta(StreamObserver<HtMetaSyncResponse> responseObserver) {
return new StreamObserver<HtMetaSyncRequest>() {
@Override
public void onNext(HtMetaSyncRequest request) {
String syncJson = request.getSyncJson();
log.info("syncJson: {}",syncJson);
SyncType syncType = request.getSyncType();
log.info("syncType: {}",syncType);
// 处理请求并生成响应
HtMetaSyncResponse response = HtMetaSyncResponse.newBuilder()
.setCode("1000")
.setMsg("ok")
.build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
// 处理错误
log.info("");
}
@Override
public void onCompleted() {
// 处理完成
responseObserver.onCompleted();
}
};
}
}