• grpc Java demo与Springboot改造支持grpc通信


    前言

    最近调研grpc的情况,发现grpc实际上还是HTTP2协议,实际上就是http2+proto传输。那么是否可以在现有的server支持呢,试了下,还真可以,但是笔者在返回数据时有个问题一直没有思路。

    grpc原生demo

    原生的grpc-java很简单,实际上开源的google原生包和grpc-Spring-boot-starter都有成熟的开源方案,以net.devh为例

    grpc-api

    1. <properties>
    2. <protobuf.version>3.19.1protobuf.version>
    3. <protobuf-plugin.version>0.6.1protobuf-plugin.version>
    4. <grpc.version>1.48.1grpc.version>
    5. properties>
    6. <dependencies>
    7. <dependency>
    8. <groupId>io.grpcgroupId>
    9. <artifactId>grpc-stubartifactId>
    10. <version>${grpc.version}version>
    11. dependency>
    12. <dependency>
    13. <groupId>io.grpcgroupId>
    14. <artifactId>grpc-protobufartifactId>
    15. <version>${grpc.version}version>
    16. dependency>
    17. <dependency>
    18. <groupId>jakarta.annotationgroupId>
    19. <artifactId>jakarta.annotation-apiartifactId>
    20. <version>1.3.5version>
    21. <optional>trueoptional>
    22. dependency>
    23. dependencies>
    24. <build>
    25. <extensions>
    26. <extension>
    27. <groupId>kr.motd.mavengroupId>
    28. <artifactId>os-maven-pluginartifactId>
    29. <version>1.7.0version>
    30. extension>
    31. extensions>
    32. <plugins>
    33. <plugin>
    34. <groupId>org.xolstice.maven.pluginsgroupId>
    35. <artifactId>protobuf-maven-pluginartifactId>
    36. <version>${protobuf-plugin.version}version>
    37. <configuration>
    38. <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}protocArtifact>
    39. <pluginId>grpc-javapluginId>
    40. <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}pluginArtifact>
    41. configuration>
    42. <executions>
    43. <execution>
    44. <goals>
    45. <goal>compilegoal>
    46. <goal>compile-customgoal>
    47. goals>
    48. execution>
    49. executions>
    50. plugin>
    51. plugins>
    52. build>

    然后编写proto文件,在src/main/proto下编写

    1. syntax = "proto3";
    2. package com.feng.proto.api;
    3. option java_multiple_files = true;
    4. option java_package = "com.feng.proto.api.lib";
    5. option java_outer_classname = "HelloWorld";
    6. // The greeting service definition.
    7. service HiService {
    8. // Sends a greeting
    9. rpc SayHello (HelloRequest) returns (HelloReply) {
    10. }
    11. rpc sayAge(HttpDemoRequest) returns (HttpDemoReply) {
    12. }
    13. }
    14. // The request message containing the user's name.
    15. message HelloRequest {
    16. string name = 1;
    17. }
    18. message HttpDemoRequest {
    19. string name = 1;
    20. int32 age = 2;
    21. }
    22. // The response message containing the greetings
    23. message HelloReply {
    24. string message = 1;
    25. }
    26. message HttpDemoReply {
    27. string message = 1;
    28. int32 age = 2;
    29. }

    关键点,包名+服务名+方法名是调用的关键,参数与返回值是grpc特定的proto协议,普通的proto是无法处理的

     

     编译成java源码和class文件

     

    grpc-server

    依赖刚刚创建的grpc-api的module

    1. <dependency>
    2. <groupId>net.devhgroupId>
    3. <artifactId>grpc-server-spring-boot-starterartifactId>
    4. <version>2.13.1.RELEASEversion>
    5. dependency>
    6. <dependency>
    7. <groupId>org.examplegroupId>
    8. <artifactId>grpc-interfaceartifactId>
    9. <version>1.0-SNAPSHOTversion>
    10. dependency>

    编写grpc实现逻辑

    1. @GrpcService
    2. public class MyServiceImpl extends HiServiceGrpc.HiServiceImplBase {
    3. @Override
    4. public void sayHello(HelloRequest req, StreamObserver responseObserver) {
    5. HelloReply reply = HelloReply.newBuilder().setMessage("Hello ==> " + req.getName()).build();
    6. responseObserver.onNext(reply);
    7. responseObserver.onCompleted();
    8. }
    9. }
    10. @SpringBootApplication
    11. public class ServerMain {
    12. public static void main(String[] args) {
    13. SpringApplication.run(ServerMain.class, args);
    14. }
    15. }

     

    grpc-client

    boot web自行依赖,关键依赖如下:

    1. <dependency>
    2. <groupId>org.examplegroupId>
    3. <artifactId>grpc-interfaceartifactId>
    4. <version>1.0-SNAPSHOTversion>
    5. dependency>
    6. <dependency>
    7. <groupId>net.devhgroupId>
    8. <artifactId>grpc-client-spring-boot-starterartifactId>
    9. <version>2.13.1.RELEASEversion>
    10. dependency>

    实现调用

    1. @Service
    2. public class MyServiceClient {
    3. @GrpcClient("myClient")
    4. private HiServiceGrpc.HiServiceBlockingStub stub;
    5. public String sayHello(String name) {
    6. HelloRequest request = HelloRequest.newBuilder()
    7. .setName(name)
    8. .build();
    9. return stub.sayHello(request).getMessage();
    10. }
    11. public int sayAge(int age){
    12. HttpDemoRequest request = HttpDemoRequest.newBuilder()
    13. .setName("tom")
    14. .setAge(age)
    15. .build();
    16. return stub.sayAge(request).getAge();
    17. }
    18. }
    19. @SpringBootApplication
    20. @RestController
    21. public class ClientMain {
    22. public static void main(String[] args) {
    23. SpringApplication.run(ClientMain.class, args);
    24. }
    25. @Autowired
    26. private MyServiceClient myServiceClient;
    27. @RequestMapping("/hello")
    28. public String call(){
    29. String res = myServiceClient.sayHello("str222");
    30. return res;
    31. }
    32. @RequestMapping("/hello2")
    33. public Integer callAge(){
    34. int res = myServiceClient.sayAge(11);
    35. return res;
    36. }
    37. }

    配置端口(本机需要),server的url,毕竟没接入注册中心,而且H2的SSL/TLS实际上,内部环境用途不大,适合对外使用,可以关闭,使用H2C。

    1. server.port = 8082
    2. grpc.client.myClient.address=static://localhost:9090
    3. grpc.client.myClient.negotiationType=PLAINTEXT

    访问localhost:8082/hello 

     wireshark抓包

    wireshark默认不识别http2,需要配置,以macOS为例,使用sudo chown -R xxx:admin /dev/bpf*

    打开wireshark,Analyze Decode As

    编辑协议,端口,

     

    然后对lo环回网卡抓包

     

    可以读取到请求和返回包,可以看到url header body

    可以看到url就是包名.服务名/方法名,header是特定义的,body实际上也是特殊的proto协议 

     再看看返回结果

    返回居然在Response后再次发送了Header,这个是笔者在使用普通Tomcat处理时尚未解决的地方

     

     

    springboot支持grpc协议

    实际上Springboot原生支持protobuf协议,跟http 1.1还是2.0无关,仿造写一个支持grpc的,因为grpc的协议是定制的,所以需要一些包

    1. org.springframework.boot
    2. spring-boot-starter-web
    3. 2.7.5
    4. org.example
    5. grpc-interface
    6. 1.0-SNAPSHOT
    7. com.google.protobuf
    8. protobuf-java-util
    9. 3.21.7
    10. com.googlecode.protobuf-java-format
    11. protobuf-java-format
    12. 1.4

    converter代码 

    1. package org.springframework.http.converter.protobuf;
    2. import com.google.protobuf.*;
    3. import com.google.protobuf.util.JsonFormat;
    4. import com.googlecode.protobuf.format.FormatFactory;
    5. import com.googlecode.protobuf.format.ProtobufFormatter;
    6. import io.grpc.protobuf.lite.ProtoLiteUtils;
    7. import org.springframework.http.HttpHeaders;
    8. import org.springframework.http.HttpInputMessage;
    9. import org.springframework.http.HttpOutputMessage;
    10. import org.springframework.http.MediaType;
    11. import org.springframework.http.converter.AbstractHttpMessageConverter;
    12. import org.springframework.http.converter.HttpMessageConversionException;
    13. import org.springframework.http.converter.HttpMessageNotReadableException;
    14. import org.springframework.http.converter.HttpMessageNotWritableException;
    15. import org.springframework.lang.Nullable;
    16. import org.springframework.util.Assert;
    17. import org.springframework.util.ClassUtils;
    18. import org.springframework.util.ConcurrentReferenceHashMap;
    19. import java.io.*;
    20. import java.lang.reflect.InvocationTargetException;
    21. import java.lang.reflect.Method;
    22. import java.nio.charset.Charset;
    23. import java.nio.charset.StandardCharsets;
    24. import java.util.Arrays;
    25. import java.util.Map;
    26. import static org.springframework.http.MediaType.*;
    27. import static org.springframework.http.MediaType.APPLICATION_JSON;
    28. public class ProtobufGrpcFormatHttpMessageConverter extends AbstractHttpMessageConverter {
    29. /**
    30. * The default charset used by the converter.
    31. */
    32. public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
    33. /**
    34. * The media-type for protobuf {@code application/x-protobuf}.
    35. */
    36. public static final MediaType PROTOBUF = new MediaType("application", "grpc", DEFAULT_CHARSET);
    37. /**
    38. * The HTTP header containing the protobuf schema.
    39. */
    40. public static final String X_PROTOBUF_SCHEMA_HEADER = "X-Protobuf-Schema";
    41. /**
    42. * The HTTP header containing the protobuf message.
    43. */
    44. public static final String X_PROTOBUF_MESSAGE_HEADER = "X-Protobuf-Message";
    45. private static final Map, Method> methodCache = new ConcurrentReferenceHashMap<>();
    46. final ExtensionRegistry extensionRegistry;
    47. @Nullable
    48. private final ProtobufFormatSupport protobufFormatSupport;
    49. /**
    50. * Construct a new {@code ProtobufGrpcFormatHttpMessageConverter}.
    51. */
    52. public ProtobufGrpcFormatHttpMessageConverter() {
    53. this(null, null);
    54. }
    55. /**
    56. * Construct a new {@code ProtobufGrpcFormatHttpMessageConverter} with an
    57. * initializer that allows the registration of message extensions.
    58. * @param registryInitializer an initializer for message extensions
    59. * @deprecated as of Spring Framework 5.1, use {@link #ProtobufGrpcFormatHttpMessageConverter(ExtensionRegistry)} instead
    60. */
    61. @Deprecated
    62. public ProtobufGrpcFormatHttpMessageConverter(@Nullable ExtensionRegistryInitializer registryInitializer) {
    63. this(null, null);
    64. if (registryInitializer != null) {
    65. registryInitializer.initializeExtensionRegistry(this.extensionRegistry);
    66. }
    67. }
    68. /**
    69. * Construct a new {@code ProtobufGrpcFormatHttpMessageConverter} with a registry that specifies
    70. * protocol message extensions.
    71. * @param extensionRegistry the registry to populate
    72. */
    73. public ProtobufGrpcFormatHttpMessageConverter(ExtensionRegistry extensionRegistry) {
    74. this(null, extensionRegistry);
    75. }
    76. ProtobufGrpcFormatHttpMessageConverter(@Nullable ProtobufFormatSupport formatSupport,
    77. @Nullable ExtensionRegistry extensionRegistry) {
    78. if (formatSupport != null) {
    79. this.protobufFormatSupport = formatSupport;
    80. }
    81. else if (ClassUtils.isPresent("com.googlecode.protobuf.format.FormatFactory", getClass().getClassLoader())) {
    82. this.protobufFormatSupport = new ProtobufJavaFormatSupport();
    83. }
    84. else if (ClassUtils.isPresent("com.google.protobuf.util.JsonFormat", getClass().getClassLoader())) {
    85. this.protobufFormatSupport = new ProtobufJavaUtilSupport(null, null);
    86. }
    87. else {
    88. this.protobufFormatSupport = null;
    89. }
    90. setSupportedMediaTypes(Arrays.asList(this.protobufFormatSupport != null ?
    91. this.protobufFormatSupport.supportedMediaTypes() : new MediaType[] {PROTOBUF, TEXT_PLAIN}));
    92. this.extensionRegistry = (extensionRegistry == null ? ExtensionRegistry.newInstance() : extensionRegistry);
    93. }
    94. @Override
    95. protected boolean supports(Class clazz) {
    96. return Message.class.isAssignableFrom(clazz);
    97. }
    98. @Override
    99. protected MediaType getDefaultContentType(Message message) {
    100. return PROTOBUF;
    101. }
    102. @Override
    103. protected Message readInternal(Class clazz, HttpInputMessage inputMessage)
    104. throws IOException, HttpMessageNotReadableException {
    105. MediaType contentType = inputMessage.getHeaders().getContentType();
    106. if (contentType == null) {
    107. contentType = PROTOBUF;
    108. }
    109. Charset charset = contentType.getCharset();
    110. if (charset == null) {
    111. charset = DEFAULT_CHARSET;
    112. }
    113. Message.Builder builder = getMessageBuilder(clazz);
    114. if (PROTOBUF.isCompatibleWith(contentType)) {
    115. try {
    116. Method method = clazz.getDeclaredMethod("getDefaultInstance");
    117. MessageLite defaultInstance = (MessageLite) method.invoke(null);
    118. MessageLite messageLite = ProtoLiteUtils.marshaller(defaultInstance).parse(new BufferInputStream(inputMessage.getBody()));
    119. return (Message) messageLite;
    120. } catch (NoSuchMethodException e) {
    121. throw new RuntimeException(e);
    122. } catch (InvocationTargetException e) {
    123. throw new RuntimeException(e);
    124. } catch (IllegalAccessException e) {
    125. throw new RuntimeException(e);
    126. }
    127. //builder.mergeFrom(inputMessage.getBody(), this.extensionRegistry);
    128. }
    129. else if (TEXT_PLAIN.isCompatibleWith(contentType)) {
    130. InputStreamReader reader = new InputStreamReader(inputMessage.getBody(), charset);
    131. TextFormat.merge(reader, this.extensionRegistry, builder);
    132. }
    133. else if (this.protobufFormatSupport != null) {
    134. this.protobufFormatSupport.merge(
    135. inputMessage.getBody(), charset, contentType, this.extensionRegistry, builder);
    136. }
    137. return builder.build();
    138. }
    139. /**
    140. * Create a new {@code Message.Builder} instance for the given class.
    141. *

      This method uses a ConcurrentReferenceHashMap for caching method lookups.

    142. */
    143. private Message.Builder getMessageBuilder(Class clazz) {
    144. try {
    145. Method method = methodCache.get(clazz);
    146. if (method == null) {
    147. method = clazz.getMethod("newBuilder");
    148. methodCache.put(clazz, method);
    149. }
    150. return (Message.Builder) method.invoke(clazz);
    151. }
    152. catch (Exception ex) {
    153. throw new HttpMessageConversionException(
    154. "Invalid Protobuf Message type: no invocable newBuilder() method on " + clazz, ex);
    155. }
    156. }
    157. @Override
    158. protected boolean canWrite(@Nullable MediaType mediaType) {
    159. return (super.canWrite(mediaType) ||
    160. (this.protobufFormatSupport != null && this.protobufFormatSupport.supportsWriteOnly(mediaType)));
    161. }
    162. @SuppressWarnings("deprecation")
    163. @Override
    164. protected void writeInternal(Message message, HttpOutputMessage outputMessage)
    165. throws IOException, HttpMessageNotWritableException {
    166. MediaType contentType = outputMessage.getHeaders().getContentType();
    167. if (contentType == null) {
    168. contentType = getDefaultContentType(message);
    169. Assert.state(contentType != null, "No content type");
    170. }
    171. Charset charset = contentType.getCharset();
    172. if (charset == null) {
    173. charset = DEFAULT_CHARSET;
    174. }
    175. if (PROTOBUF.isCompatibleWith(contentType)) {
    176. outputMessage.getHeaders().add("grpc-encoding", "identity");
    177. outputMessage.getHeaders().add("grpc-accept-encoding", "gzip");
    178. // outputMessage.getHeaders().add("grpc-status", "0");
    179. //setProtoHeader(outputMessage, message);
    180. CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(outputMessage.getBody());
    181. MessageLite messageLite = message;
    182. int length = messageLite.getSerializedSize();
    183. byte[] pre = LengthBytesUtils.intToBytes(length, 5);
    184. MessageOutputStreamUtils.writePrefix(codedOutputStream, pre);
    185. message.writeTo(codedOutputStream);
    186. // HttpHeaders headers = new HttpHeaders();
    187. // headers.add("grpc-status", "0");
    188. outputMessage.getHeaders().add("grpc-status", "0");
    189. // headers.get
    190. codedOutputStream.flush();
    191. // MessageOutputStreamUtils.writeSuffix(codedOutputStream);
    192. // codedOutputStream.flush();
    193. }
    194. else if (TEXT_PLAIN.isCompatibleWith(contentType)) {
    195. OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputMessage.getBody(), charset);
    196. TextFormat.print(message, outputStreamWriter); // deprecated on Protobuf 3.9
    197. outputStreamWriter.flush();
    198. outputMessage.getBody().flush();
    199. }
    200. else if (this.protobufFormatSupport != null) {
    201. this.protobufFormatSupport.print(message, outputMessage.getBody(), contentType, charset);
    202. outputMessage.getBody().flush();
    203. }
    204. }
    205. /**
    206. * Set the "X-Protobuf-*" HTTP headers when responding with a message of
    207. * content type "application/x-protobuf"
    208. *

      Note: outputMessage.getBody() should not have been called

    209. * before because it writes HTTP headers (making them read only).

    210. */
    211. private void setProtoHeader(HttpOutputMessage response, Message message) {
    212. response.getHeaders().set(X_PROTOBUF_SCHEMA_HEADER, message.getDescriptorForType().getFile().getName());
    213. response.getHeaders().set(X_PROTOBUF_MESSAGE_HEADER, message.getDescriptorForType().getFullName());
    214. }
    215. /**
    216. * Protobuf format support.
    217. */
    218. interface ProtobufFormatSupport {
    219. MediaType[] supportedMediaTypes();
    220. boolean supportsWriteOnly(@Nullable MediaType mediaType);
    221. void merge(InputStream input, Charset charset, MediaType contentType,
    222. ExtensionRegistry extensionRegistry, Message.Builder builder)
    223. throws IOException, HttpMessageConversionException;
    224. void print(Message message, OutputStream output, MediaType contentType, Charset charset)
    225. throws IOException, HttpMessageConversionException;
    226. }
    227. /**
    228. * {@link ProtobufFormatSupport} implementation used when
    229. * {@code com.googlecode.protobuf.format.FormatFactory} is available.
    230. */
    231. static class ProtobufJavaFormatSupport implements ProtobufFormatSupport {
    232. private final ProtobufFormatter jsonFormatter;
    233. private final ProtobufFormatter xmlFormatter;
    234. private final ProtobufFormatter htmlFormatter;
    235. public ProtobufJavaFormatSupport() {
    236. FormatFactory formatFactory = new FormatFactory();
    237. this.jsonFormatter = formatFactory.createFormatter(FormatFactory.Formatter.JSON);
    238. this.xmlFormatter = formatFactory.createFormatter(FormatFactory.Formatter.XML);
    239. this.htmlFormatter = formatFactory.createFormatter(FormatFactory.Formatter.HTML);
    240. }
    241. @Override
    242. public MediaType[] supportedMediaTypes() {
    243. return new MediaType[] {PROTOBUF, TEXT_PLAIN, APPLICATION_XML, APPLICATION_JSON};
    244. }
    245. @Override
    246. public boolean supportsWriteOnly(@Nullable MediaType mediaType) {
    247. return TEXT_HTML.isCompatibleWith(mediaType);
    248. }
    249. @Override
    250. public void merge(InputStream input, Charset charset, MediaType contentType,
    251. ExtensionRegistry extensionRegistry, Message.Builder builder)
    252. throws IOException, HttpMessageConversionException {
    253. if (contentType.isCompatibleWith(APPLICATION_JSON)) {
    254. this.jsonFormatter.merge(input, charset, extensionRegistry, builder);
    255. }
    256. else if (contentType.isCompatibleWith(APPLICATION_XML)) {
    257. this.xmlFormatter.merge(input, charset, extensionRegistry, builder);
    258. }
    259. else {
    260. throw new HttpMessageConversionException(
    261. "protobuf-java-format does not support parsing " + contentType);
    262. }
    263. }
    264. @Override
    265. public void print(Message message, OutputStream output, MediaType contentType, Charset charset)
    266. throws IOException, HttpMessageConversionException {
    267. if (contentType.isCompatibleWith(APPLICATION_JSON)) {
    268. this.jsonFormatter.print(message, output, charset);
    269. }
    270. else if (contentType.isCompatibleWith(APPLICATION_XML)) {
    271. this.xmlFormatter.print(message, output, charset);
    272. }
    273. else if (contentType.isCompatibleWith(TEXT_HTML)) {
    274. this.htmlFormatter.print(message, output, charset);
    275. }
    276. else {
    277. throw new HttpMessageConversionException(
    278. "protobuf-java-format does not support printing " + contentType);
    279. }
    280. }
    281. }
    282. /**
    283. * {@link ProtobufFormatSupport} implementation used when
    284. * {@code com.google.protobuf.util.JsonFormat} is available.
    285. */
    286. static class ProtobufJavaUtilSupport implements ProtobufFormatSupport {
    287. private final JsonFormat.Parser parser;
    288. private final JsonFormat.Printer printer;
    289. public ProtobufJavaUtilSupport(@Nullable JsonFormat.Parser parser, @Nullable JsonFormat.Printer printer) {
    290. this.parser = (parser != null ? parser : JsonFormat.parser());
    291. this.printer = (printer != null ? printer : JsonFormat.printer());
    292. }
    293. @Override
    294. public MediaType[] supportedMediaTypes() {
    295. return new MediaType[] {PROTOBUF, TEXT_PLAIN, APPLICATION_JSON};
    296. }
    297. @Override
    298. public boolean supportsWriteOnly(@Nullable MediaType mediaType) {
    299. return false;
    300. }
    301. @Override
    302. public void merge(InputStream input, Charset charset, MediaType contentType,
    303. ExtensionRegistry extensionRegistry, Message.Builder builder)
    304. throws IOException, HttpMessageConversionException {
    305. if (contentType.isCompatibleWith(APPLICATION_JSON)) {
    306. InputStreamReader reader = new InputStreamReader(input, charset);
    307. this.parser.merge(reader, builder);
    308. }
    309. else {
    310. throw new HttpMessageConversionException(
    311. "protobuf-java-util does not support parsing " + contentType);
    312. }
    313. }
    314. @Override
    315. public void print(Message message, OutputStream output, MediaType contentType, Charset charset)
    316. throws IOException, HttpMessageConversionException {
    317. if (contentType.isCompatibleWith(APPLICATION_JSON)) {
    318. OutputStreamWriter writer = new OutputStreamWriter(output, charset);
    319. this.printer.appendTo(message, writer);
    320. writer.flush();
    321. }
    322. else {
    323. throw new HttpMessageConversionException(
    324. "protobuf-java-util does not support printing " + contentType);
    325. }
    326. }
    327. }
    328. }

     核心关键点

    并改造write和read,上面的代码已经改造了

    1. @RestController
    2. public class GrpcController {
    3. @RequestMapping(value = "/com.feng.proto.api.HiService/SayHello", method = RequestMethod.POST, produces = "application/grpc")
    4. public HelloReply sayHello(@RequestBody HelloRequest helloRequest) {
    5. HelloReply helloReply = HelloReply.newBuilder()
    6. .setMessage("hello ---- " + helloRequest.getName())
    7. .build();
    8. return helloReply;
    9. }
    10. @RequestMapping(value = "/com.feng.proto.api.HiService/sayAge", method = RequestMethod.POST, produces = "application/grpc")
    11. public HttpDemoReply sayAge(@RequestBody HttpDemoRequest helloRequest) {
    12. HttpDemoReply helloReply = HttpDemoReply.newBuilder()
    13. .setMessage("hello ---- " + helloRequest.getName())
    14. .setAge(helloRequest.getAge())
    15. .build();
    16. return helloReply;
    17. }
    18. }
    19. @Configuration
    20. public class ConfigGrpcConverter {
    21. @Bean
    22. public ProtobufGrpcFormatHttpMessageConverter initProtobufGrpcFormatHttpMessageConverter(){
    23. return new ProtobufGrpcFormatHttpMessageConverter();
    24. }
    25. }

    抓包可以发现返回跟原生的grpc返回少了最后的http2发送header的逻辑

     请求包是一样的

     返回就不对了,缺少了grpc-status 0的header通过http2写回来。

     导致了,grpc的client端,实际上Response的body已经解析成功了,只不过结束符判断不对

     

    client端已经读取到消息了,并正常解析 

     

    grpc的protocol格式

    grpc的传输二进制不是传统的protobuf,在这个基础上定制了

    是:5个字节数组+protobuf字节数组

    这5个字节数组是protobuf的length int转为byte[]

    length长度算法

    而且谷歌的长度转换逻辑是定制的,从0~126再到-127~-1,共254个,那么进制就是253.

    1. package org.springframework.http.converter.protobuf;
    2. public class LengthBytesUtils {
    3. public static byte[] intToBytes(int length, int size) {
    4. byte[] bytes = new byte[size];
    5. int temp;
    6. for (int i = size - 1; i > -1; i--) {
    7. temp = (int) ((length / (Math.pow(253, (size - 1 - i)))) % 253);
    8. if (temp > 127) {
    9. temp = -128 + 1 + temp - 127 + 1;
    10. }
    11. bytes[i] = (byte) temp;
    12. }
    13. return bytes;
    14. }
    15. public static int bytesToInt(byte[] bytes) {
    16. int length = 0;
    17. int size = bytes.length;
    18. for (int i = 0; i < size; i++) {
    19. if (bytes[i] == 0){
    20. continue;
    21. }
    22. if (bytes[i] > 0) {
    23. length += bytes[i] * Math.pow(253, size-1-i);
    24. } else {
    25. length += (127-1 + bytes[i] + 128-1) * Math.pow(253, size-1-i);
    26. }
    27. }
    28. return length;
    29. }
    30. }

    为此特意处理输入输出流

    1. package org.springframework.http.converter.protobuf;
    2. import com.google.common.base.Preconditions;
    3. import io.grpc.Detachable;
    4. import io.grpc.HasByteBuffer;
    5. import io.grpc.KnownLength;
    6. import javax.annotation.Nullable;
    7. import java.io.IOException;
    8. import java.io.InputStream;
    9. import java.nio.ByteBuffer;
    10. public class BufferInputStream extends InputStream
    11. implements KnownLength, HasByteBuffer, Detachable {
    12. private InputStream buffer;
    13. public BufferInputStream(InputStream buffer) {
    14. this.buffer = Preconditions.checkNotNull(buffer, "buffer");
    15. }
    16. @Override
    17. public int available() throws IOException {
    18. byte[] lengthBytes = new byte[5];
    19. buffer.read(lengthBytes, 0, 5);
    20. int length = LengthBytesUtils.bytesToInt(lengthBytes);
    21. byte[] valid = LengthBytesUtils.intToBytes(length, 5);
    22. // buffer.skip(2);
    23. return length;
    24. }
    25. @Override
    26. public int read() throws IOException {
    27. if (buffer.read() == 0) {
    28. // EOF.
    29. return -1;
    30. }
    31. return buffer.read();
    32. }
    33. @Override
    34. public int read(byte[] dest, int destOffset, int length) throws IOException {
    35. buffer.read(dest, destOffset, length);
    36. return length;
    37. }
    38. @Override
    39. public long skip(long n) throws IOException {
    40. buffer.skip(n);
    41. return n;
    42. }
    43. @Override
    44. public void mark(int readlimit) {
    45. buffer.mark(readlimit);
    46. }
    47. @Override
    48. public void reset() throws IOException {
    49. buffer.reset();
    50. }
    51. @Override
    52. public boolean markSupported() {
    53. return buffer.markSupported();
    54. }
    55. @Override
    56. public boolean byteBufferSupported() {
    57. return false;
    58. }
    59. @Nullable
    60. @Override
    61. public ByteBuffer getByteBuffer() {
    62. return null;
    63. }
    64. @Override
    65. public InputStream detach() {
    66. InputStream detachedBuffer = buffer;
    67. try {
    68. buffer.reset();
    69. } catch (IOException e) {
    70. throw new RuntimeException(e);
    71. }
    72. return new BufferInputStream(detachedBuffer);
    73. }
    74. @Override
    75. public void close() throws IOException {
    76. buffer.close();
    77. }
    78. }
    79. package org.springframework.http.converter.protobuf;
    80. import com.google.protobuf.ByteOutput;
    81. import java.io.ByteArrayOutputStream;
    82. import java.io.IOException;
    83. import org.springframework.http.HttpHeaders;
    84. import java.io.ObjectOutputStream;
    85. import java.nio.charset.StandardCharsets;
    86. public class MessageOutputStreamUtils {
    87. public static void writePrefix(ByteOutput outputStream, byte[] bytes) throws IOException {
    88. outputStream.write(bytes, 0, bytes.length);
    89. }
    90. public static void writeSuffix(ByteOutput outputStream) throws IOException {
    91. HttpHeaders headers = new HttpHeaders();
    92. headers.add("grpc-status", "0");
    93. byte[] bytes;
    94. try (ByteArrayOutputStream bo = new ByteArrayOutputStream();
    95. ObjectOutputStream oo = new ObjectOutputStream(bo);) {
    96. oo.writeObject(headers);
    97. bytes = bo.toByteArray();
    98. outputStream.write(bytes, 0, bytes.length);
    99. }
    100. }
    101. }

    笔者试着在输出流多输出header,但是传到body里面了,而不是一个返回发送2次header

     

    待解决的问题

    因为缺少最后这个Header,所以grpc原生client端解析结束不正确 

    总结

    grpc实际上本质还是Http2.0+谷歌定制的protobuf,表现形式为rpc调用,依赖变重,如果还需要ssl/tls就会需要证书加密传输,在内部环境实际上是没必要的,适合对外接口和非浏览器模式,可以实现推送(HTTP2.0的能力,现在不推荐用这个能力了),实际上也可以跟传统的Tomcat通信,笔者已经实现调用通过,只有最后的传输结束还没处理好。

  • 相关阅读:
    SSM SpringBoot vue快递柜管理系统
    【信号去噪】基于快速子带自适应滤波 (FSAF)实现信号去噪处理附matlab代码
    元数据简析:定义及管理
    多关键字dp,P1687 机器人小Q
    Springboot+vue的企业人事管理系统(有报告),Javaee项目,springboot vue前后端分离项目。
    Elasticsearch 8.X 如何生成 TB 级的测试数据 ?
    如何写一个sh脚本将一个本地文件通过 scp命令上传到远程的 centos服务器?
    JVM对象创建、对象内存分配、对象内存回收机制
    妈妈的爱,从口腔微生物开始
    销售运营管理
  • 原文地址:https://blog.csdn.net/fenglllle/article/details/127829481