最近调研grpc的情况,发现grpc实际上还是HTTP2协议,实际上就是http2+proto传输。那么是否可以在现有的server支持呢,试了下,还真可以,但是笔者在返回数据时有个问题一直没有思路。
原生的grpc-java很简单,实际上开源的google原生包和grpc-Spring-boot-starter都有成熟的开源方案,以net.devh为例
- <properties>
- <protobuf.version>3.19.1protobuf.version>
- <protobuf-plugin.version>0.6.1protobuf-plugin.version>
- <grpc.version>1.48.1grpc.version>
- properties>
-
- <dependencies>
- <dependency>
- <groupId>io.grpcgroupId>
- <artifactId>grpc-stubartifactId>
- <version>${grpc.version}version>
- dependency>
- <dependency>
- <groupId>io.grpcgroupId>
- <artifactId>grpc-protobufartifactId>
- <version>${grpc.version}version>
- dependency>
- <dependency>
-
- <groupId>jakarta.annotationgroupId>
- <artifactId>jakarta.annotation-apiartifactId>
- <version>1.3.5version>
- <optional>trueoptional>
- dependency>
- dependencies>
-
- <build>
- <extensions>
- <extension>
- <groupId>kr.motd.mavengroupId>
- <artifactId>os-maven-pluginartifactId>
- <version>1.7.0version>
- extension>
- extensions>
-
- <plugins>
- <plugin>
- <groupId>org.xolstice.maven.pluginsgroupId>
- <artifactId>protobuf-maven-pluginartifactId>
- <version>${protobuf-plugin.version}version>
- <configuration>
- <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}protocArtifact>
- <pluginId>grpc-javapluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}pluginArtifact>
- configuration>
- <executions>
- <execution>
- <goals>
- <goal>compilegoal>
- <goal>compile-customgoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
然后编写proto文件,在src/main/proto下编写
- syntax = "proto3";
-
- package com.feng.proto.api;
-
- option java_multiple_files = true;
- option java_package = "com.feng.proto.api.lib";
- option java_outer_classname = "HelloWorld";
-
- // The greeting service definition.
- service HiService {
- // Sends a greeting
- rpc SayHello (HelloRequest) returns (HelloReply) {
- }
-
- rpc sayAge(HttpDemoRequest) returns (HttpDemoReply) {
-
- }
- }
-
- // The request message containing the user's name.
- message HelloRequest {
- string name = 1;
- }
- message HttpDemoRequest {
- string name = 1;
- int32 age = 2;
- }
- // The response message containing the greetings
- message HelloReply {
- string message = 1;
- }
- message HttpDemoReply {
- string message = 1;
- int32 age = 2;
- }
关键点,包名+服务名+方法名是调用的关键,参数与返回值是grpc特定的proto协议,普通的proto是无法处理的
编译成java源码和class文件
依赖刚刚创建的grpc-api的module
- <dependency>
- <groupId>net.devhgroupId>
- <artifactId>grpc-server-spring-boot-starterartifactId>
- <version>2.13.1.RELEASEversion>
- dependency>
- <dependency>
- <groupId>org.examplegroupId>
- <artifactId>grpc-interfaceartifactId>
- <version>1.0-SNAPSHOTversion>
- dependency>
-
编写grpc实现逻辑
- @GrpcService
- public class MyServiceImpl extends HiServiceGrpc.HiServiceImplBase {
- @Override
- public void sayHello(HelloRequest req, StreamObserver
responseObserver) { - HelloReply reply = HelloReply.newBuilder().setMessage("Hello ==> " + req.getName()).build();
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- }
-
- }
-
- @SpringBootApplication
- public class ServerMain {
- public static void main(String[] args) {
- SpringApplication.run(ServerMain.class, args);
- }
- }
boot web自行依赖,关键依赖如下:
- <dependency>
- <groupId>org.examplegroupId>
- <artifactId>grpc-interfaceartifactId>
- <version>1.0-SNAPSHOTversion>
- dependency>
- <dependency>
- <groupId>net.devhgroupId>
- <artifactId>grpc-client-spring-boot-starterartifactId>
- <version>2.13.1.RELEASEversion>
- dependency>
实现调用
- @Service
- public class MyServiceClient {
-
- @GrpcClient("myClient")
- private HiServiceGrpc.HiServiceBlockingStub stub;
-
- public String sayHello(String name) {
- HelloRequest request = HelloRequest.newBuilder()
- .setName(name)
- .build();
- return stub.sayHello(request).getMessage();
- }
-
- public int sayAge(int age){
- HttpDemoRequest request = HttpDemoRequest.newBuilder()
- .setName("tom")
- .setAge(age)
- .build();
- return stub.sayAge(request).getAge();
- }
-
- }
-
- @SpringBootApplication
- @RestController
- public class ClientMain {
- public static void main(String[] args) {
- SpringApplication.run(ClientMain.class, args);
- }
-
- @Autowired
- private MyServiceClient myServiceClient;
-
- @RequestMapping("/hello")
- public String call(){
-
- String res = myServiceClient.sayHello("str222");
- return res;
- }
-
- @RequestMapping("/hello2")
- public Integer callAge(){
- int res = myServiceClient.sayAge(11);
- return res;
- }
- }
配置端口(本机需要),server的url,毕竟没接入注册中心,而且H2的SSL/TLS实际上,内部环境用途不大,适合对外使用,可以关闭,使用H2C。
- server.port = 8082
- grpc.client.myClient.address=static://localhost:9090
- grpc.client.myClient.negotiationType=PLAINTEXT
wireshark默认不识别http2,需要配置,以macOS为例,使用sudo chown -R xxx:admin /dev/bpf*
打开wireshark,Analyze Decode As
编辑协议,端口,
然后对lo环回网卡抓包
可以读取到请求和返回包,可以看到url header body
可以看到url就是包名.服务名/方法名,header是特定义的,body实际上也是特殊的proto协议
再看看返回结果
返回居然在Response后再次发送了Header,这个是笔者在使用普通Tomcat处理时尚未解决的地方
实际上Springboot原生支持protobuf协议,跟http 1.1还是2.0无关,仿造写一个支持grpc的,因为grpc的协议是定制的,所以需要一些包
-
-
org.springframework.boot -
spring-boot-starter-web -
2.7.5 -
-
-
org.example -
grpc-interface -
1.0-SNAPSHOT -
-
-
com.google.protobuf -
protobuf-java-util -
3.21.7 -
-
-
com.googlecode.protobuf-java-format -
protobuf-java-format -
1.4 -
-
-
converter代码
- package org.springframework.http.converter.protobuf;
-
- import com.google.protobuf.*;
- import com.google.protobuf.util.JsonFormat;
- import com.googlecode.protobuf.format.FormatFactory;
- import com.googlecode.protobuf.format.ProtobufFormatter;
- import io.grpc.protobuf.lite.ProtoLiteUtils;
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.HttpInputMessage;
- import org.springframework.http.HttpOutputMessage;
- import org.springframework.http.MediaType;
- import org.springframework.http.converter.AbstractHttpMessageConverter;
- import org.springframework.http.converter.HttpMessageConversionException;
- import org.springframework.http.converter.HttpMessageNotReadableException;
- import org.springframework.http.converter.HttpMessageNotWritableException;
- import org.springframework.lang.Nullable;
- import org.springframework.util.Assert;
- import org.springframework.util.ClassUtils;
- import org.springframework.util.ConcurrentReferenceHashMap;
-
- import java.io.*;
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
- import java.nio.charset.Charset;
- import java.nio.charset.StandardCharsets;
- import java.util.Arrays;
- import java.util.Map;
-
- import static org.springframework.http.MediaType.*;
- import static org.springframework.http.MediaType.APPLICATION_JSON;
-
- public class ProtobufGrpcFormatHttpMessageConverter extends AbstractHttpMessageConverter
{ -
- /**
- * The default charset used by the converter.
- */
- public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
-
- /**
- * The media-type for protobuf {@code application/x-protobuf}.
- */
- public static final MediaType PROTOBUF = new MediaType("application", "grpc", DEFAULT_CHARSET);
-
- /**
- * The HTTP header containing the protobuf schema.
- */
- public static final String X_PROTOBUF_SCHEMA_HEADER = "X-Protobuf-Schema";
-
- /**
- * The HTTP header containing the protobuf message.
- */
- public static final String X_PROTOBUF_MESSAGE_HEADER = "X-Protobuf-Message";
-
-
- private static final Map
, Method> methodCache = new ConcurrentReferenceHashMap<>(); -
- final ExtensionRegistry extensionRegistry;
-
- @Nullable
- private final ProtobufFormatSupport protobufFormatSupport;
-
-
- /**
- * Construct a new {@code ProtobufGrpcFormatHttpMessageConverter}.
- */
- public ProtobufGrpcFormatHttpMessageConverter() {
- this(null, null);
- }
-
- /**
- * Construct a new {@code ProtobufGrpcFormatHttpMessageConverter} with an
- * initializer that allows the registration of message extensions.
- * @param registryInitializer an initializer for message extensions
- * @deprecated as of Spring Framework 5.1, use {@link #ProtobufGrpcFormatHttpMessageConverter(ExtensionRegistry)} instead
- */
- @Deprecated
- public ProtobufGrpcFormatHttpMessageConverter(@Nullable ExtensionRegistryInitializer registryInitializer) {
- this(null, null);
- if (registryInitializer != null) {
- registryInitializer.initializeExtensionRegistry(this.extensionRegistry);
- }
- }
-
- /**
- * Construct a new {@code ProtobufGrpcFormatHttpMessageConverter} with a registry that specifies
- * protocol message extensions.
- * @param extensionRegistry the registry to populate
- */
- public ProtobufGrpcFormatHttpMessageConverter(ExtensionRegistry extensionRegistry) {
- this(null, extensionRegistry);
- }
-
- ProtobufGrpcFormatHttpMessageConverter(@Nullable ProtobufFormatSupport formatSupport,
- @Nullable ExtensionRegistry extensionRegistry) {
-
- if (formatSupport != null) {
- this.protobufFormatSupport = formatSupport;
- }
- else if (ClassUtils.isPresent("com.googlecode.protobuf.format.FormatFactory", getClass().getClassLoader())) {
- this.protobufFormatSupport = new ProtobufJavaFormatSupport();
- }
- else if (ClassUtils.isPresent("com.google.protobuf.util.JsonFormat", getClass().getClassLoader())) {
- this.protobufFormatSupport = new ProtobufJavaUtilSupport(null, null);
- }
- else {
- this.protobufFormatSupport = null;
- }
-
- setSupportedMediaTypes(Arrays.asList(this.protobufFormatSupport != null ?
- this.protobufFormatSupport.supportedMediaTypes() : new MediaType[] {PROTOBUF, TEXT_PLAIN}));
-
- this.extensionRegistry = (extensionRegistry == null ? ExtensionRegistry.newInstance() : extensionRegistry);
- }
-
-
- @Override
- protected boolean supports(Class> clazz) {
- return Message.class.isAssignableFrom(clazz);
- }
-
- @Override
- protected MediaType getDefaultContentType(Message message) {
- return PROTOBUF;
- }
-
- @Override
- protected Message readInternal(Class extends Message> clazz, HttpInputMessage inputMessage)
- throws IOException, HttpMessageNotReadableException {
-
- MediaType contentType = inputMessage.getHeaders().getContentType();
- if (contentType == null) {
- contentType = PROTOBUF;
- }
- Charset charset = contentType.getCharset();
- if (charset == null) {
- charset = DEFAULT_CHARSET;
- }
-
- Message.Builder builder = getMessageBuilder(clazz);
- if (PROTOBUF.isCompatibleWith(contentType)) {
- try {
- Method method = clazz.getDeclaredMethod("getDefaultInstance");
- MessageLite defaultInstance = (MessageLite) method.invoke(null);
- MessageLite messageLite = ProtoLiteUtils.marshaller(defaultInstance).parse(new BufferInputStream(inputMessage.getBody()));
- return (Message) messageLite;
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
-
- //builder.mergeFrom(inputMessage.getBody(), this.extensionRegistry);
- }
- else if (TEXT_PLAIN.isCompatibleWith(contentType)) {
- InputStreamReader reader = new InputStreamReader(inputMessage.getBody(), charset);
- TextFormat.merge(reader, this.extensionRegistry, builder);
- }
- else if (this.protobufFormatSupport != null) {
- this.protobufFormatSupport.merge(
- inputMessage.getBody(), charset, contentType, this.extensionRegistry, builder);
- }
- return builder.build();
- }
-
- /**
- * Create a new {@code Message.Builder} instance for the given class.
- *
This method uses a ConcurrentReferenceHashMap for caching method lookups.
- */
- private Message.Builder getMessageBuilder(Class extends Message> clazz) {
- try {
- Method method = methodCache.get(clazz);
- if (method == null) {
- method = clazz.getMethod("newBuilder");
- methodCache.put(clazz, method);
- }
- return (Message.Builder) method.invoke(clazz);
- }
- catch (Exception ex) {
- throw new HttpMessageConversionException(
- "Invalid Protobuf Message type: no invocable newBuilder() method on " + clazz, ex);
- }
- }
-
-
- @Override
- protected boolean canWrite(@Nullable MediaType mediaType) {
- return (super.canWrite(mediaType) ||
- (this.protobufFormatSupport != null && this.protobufFormatSupport.supportsWriteOnly(mediaType)));
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected void writeInternal(Message message, HttpOutputMessage outputMessage)
- throws IOException, HttpMessageNotWritableException {
-
- MediaType contentType = outputMessage.getHeaders().getContentType();
- if (contentType == null) {
- contentType = getDefaultContentType(message);
- Assert.state(contentType != null, "No content type");
- }
- Charset charset = contentType.getCharset();
- if (charset == null) {
- charset = DEFAULT_CHARSET;
- }
-
- if (PROTOBUF.isCompatibleWith(contentType)) {
- outputMessage.getHeaders().add("grpc-encoding", "identity");
- outputMessage.getHeaders().add("grpc-accept-encoding", "gzip");
- // outputMessage.getHeaders().add("grpc-status", "0");
- //setProtoHeader(outputMessage, message);
- CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(outputMessage.getBody());
- MessageLite messageLite = message;
- int length = messageLite.getSerializedSize();
- byte[] pre = LengthBytesUtils.intToBytes(length, 5);
- MessageOutputStreamUtils.writePrefix(codedOutputStream, pre);
- message.writeTo(codedOutputStream);
-
- // HttpHeaders headers = new HttpHeaders();
- // headers.add("grpc-status", "0");
- outputMessage.getHeaders().add("grpc-status", "0");
- // headers.get
- codedOutputStream.flush();
- // MessageOutputStreamUtils.writeSuffix(codedOutputStream);
- // codedOutputStream.flush();
- }
- else if (TEXT_PLAIN.isCompatibleWith(contentType)) {
- OutputStreamWriter outputStreamWriter = new OutputStreamWriter(outputMessage.getBody(), charset);
- TextFormat.print(message, outputStreamWriter); // deprecated on Protobuf 3.9
- outputStreamWriter.flush();
- outputMessage.getBody().flush();
- }
- else if (this.protobufFormatSupport != null) {
- this.protobufFormatSupport.print(message, outputMessage.getBody(), contentType, charset);
- outputMessage.getBody().flush();
- }
- }
-
- /**
- * Set the "X-Protobuf-*" HTTP headers when responding with a message of
- * content type "application/x-protobuf"
- *
Note: outputMessage.getBody()
should not have been called
- * before because it writes HTTP headers (making them read only).
- */
- private void setProtoHeader(HttpOutputMessage response, Message message) {
- response.getHeaders().set(X_PROTOBUF_SCHEMA_HEADER, message.getDescriptorForType().getFile().getName());
- response.getHeaders().set(X_PROTOBUF_MESSAGE_HEADER, message.getDescriptorForType().getFullName());
- }
-
-
- /**
- * Protobuf format support.
- */
- interface ProtobufFormatSupport {
-
- MediaType[] supportedMediaTypes();
-
- boolean supportsWriteOnly(@Nullable MediaType mediaType);
-
- void merge(InputStream input, Charset charset, MediaType contentType,
- ExtensionRegistry extensionRegistry, Message.Builder builder)
- throws IOException, HttpMessageConversionException;
-
- void print(Message message, OutputStream output, MediaType contentType, Charset charset)
- throws IOException, HttpMessageConversionException;
- }
-
-
- /**
- * {@link ProtobufFormatSupport} implementation used when
- * {@code com.googlecode.protobuf.format.FormatFactory} is available.
- */
- static class ProtobufJavaFormatSupport implements ProtobufFormatSupport {
-
- private final ProtobufFormatter jsonFormatter;
-
- private final ProtobufFormatter xmlFormatter;
-
- private final ProtobufFormatter htmlFormatter;
-
- public ProtobufJavaFormatSupport() {
- FormatFactory formatFactory = new FormatFactory();
- this.jsonFormatter = formatFactory.createFormatter(FormatFactory.Formatter.JSON);
- this.xmlFormatter = formatFactory.createFormatter(FormatFactory.Formatter.XML);
- this.htmlFormatter = formatFactory.createFormatter(FormatFactory.Formatter.HTML);
- }
-
- @Override
- public MediaType[] supportedMediaTypes() {
- return new MediaType[] {PROTOBUF, TEXT_PLAIN, APPLICATION_XML, APPLICATION_JSON};
- }
-
- @Override
- public boolean supportsWriteOnly(@Nullable MediaType mediaType) {
- return TEXT_HTML.isCompatibleWith(mediaType);
- }
-
- @Override
- public void merge(InputStream input, Charset charset, MediaType contentType,
- ExtensionRegistry extensionRegistry, Message.Builder builder)
- throws IOException, HttpMessageConversionException {
-
- if (contentType.isCompatibleWith(APPLICATION_JSON)) {
- this.jsonFormatter.merge(input, charset, extensionRegistry, builder);
- }
- else if (contentType.isCompatibleWith(APPLICATION_XML)) {
- this.xmlFormatter.merge(input, charset, extensionRegistry, builder);
- }
- else {
- throw new HttpMessageConversionException(
- "protobuf-java-format does not support parsing " + contentType);
- }
- }
-
- @Override
- public void print(Message message, OutputStream output, MediaType contentType, Charset charset)
- throws IOException, HttpMessageConversionException {
-
- if (contentType.isCompatibleWith(APPLICATION_JSON)) {
- this.jsonFormatter.print(message, output, charset);
- }
- else if (contentType.isCompatibleWith(APPLICATION_XML)) {
- this.xmlFormatter.print(message, output, charset);
- }
- else if (contentType.isCompatibleWith(TEXT_HTML)) {
- this.htmlFormatter.print(message, output, charset);
- }
- else {
- throw new HttpMessageConversionException(
- "protobuf-java-format does not support printing " + contentType);
- }
- }
- }
-
-
- /**
- * {@link ProtobufFormatSupport} implementation used when
- * {@code com.google.protobuf.util.JsonFormat} is available.
- */
- static class ProtobufJavaUtilSupport implements ProtobufFormatSupport {
-
- private final JsonFormat.Parser parser;
-
- private final JsonFormat.Printer printer;
-
- public ProtobufJavaUtilSupport(@Nullable JsonFormat.Parser parser, @Nullable JsonFormat.Printer printer) {
- this.parser = (parser != null ? parser : JsonFormat.parser());
- this.printer = (printer != null ? printer : JsonFormat.printer());
- }
-
- @Override
- public MediaType[] supportedMediaTypes() {
- return new MediaType[] {PROTOBUF, TEXT_PLAIN, APPLICATION_JSON};
- }
-
- @Override
- public boolean supportsWriteOnly(@Nullable MediaType mediaType) {
- return false;
- }
-
- @Override
- public void merge(InputStream input, Charset charset, MediaType contentType,
- ExtensionRegistry extensionRegistry, Message.Builder builder)
- throws IOException, HttpMessageConversionException {
-
- if (contentType.isCompatibleWith(APPLICATION_JSON)) {
- InputStreamReader reader = new InputStreamReader(input, charset);
- this.parser.merge(reader, builder);
- }
- else {
- throw new HttpMessageConversionException(
- "protobuf-java-util does not support parsing " + contentType);
- }
- }
-
- @Override
- public void print(Message message, OutputStream output, MediaType contentType, Charset charset)
- throws IOException, HttpMessageConversionException {
-
- if (contentType.isCompatibleWith(APPLICATION_JSON)) {
- OutputStreamWriter writer = new OutputStreamWriter(output, charset);
- this.printer.appendTo(message, writer);
- writer.flush();
- }
- else {
- throw new HttpMessageConversionException(
- "protobuf-java-util does not support printing " + contentType);
- }
- }
- }
-
- }
核心关键点
并改造write和read,上面的代码已经改造了
- @RestController
- public class GrpcController {
-
- @RequestMapping(value = "/com.feng.proto.api.HiService/SayHello", method = RequestMethod.POST, produces = "application/grpc")
- public HelloReply sayHello(@RequestBody HelloRequest helloRequest) {
- HelloReply helloReply = HelloReply.newBuilder()
- .setMessage("hello ---- " + helloRequest.getName())
- .build();
- return helloReply;
- }
-
- @RequestMapping(value = "/com.feng.proto.api.HiService/sayAge", method = RequestMethod.POST, produces = "application/grpc")
- public HttpDemoReply sayAge(@RequestBody HttpDemoRequest helloRequest) {
- HttpDemoReply helloReply = HttpDemoReply.newBuilder()
- .setMessage("hello ---- " + helloRequest.getName())
- .setAge(helloRequest.getAge())
- .build();
- return helloReply;
- }
- }
-
- @Configuration
- public class ConfigGrpcConverter {
- @Bean
- public ProtobufGrpcFormatHttpMessageConverter initProtobufGrpcFormatHttpMessageConverter(){
- return new ProtobufGrpcFormatHttpMessageConverter();
- }
- }
抓包可以发现返回跟原生的grpc返回少了最后的http2发送header的逻辑
请求包是一样的
返回就不对了,缺少了grpc-status 0的header通过http2写回来。
导致了,grpc的client端,实际上Response的body已经解析成功了,只不过结束符判断不对
client端已经读取到消息了,并正常解析
grpc的传输二进制不是传统的protobuf,在这个基础上定制了
是:5个字节数组+protobuf字节数组
这5个字节数组是protobuf的length int转为byte[]
而且谷歌的长度转换逻辑是定制的,从0~126再到-127~-1,共254个,那么进制就是253.
- package org.springframework.http.converter.protobuf;
-
- public class LengthBytesUtils {
- public static byte[] intToBytes(int length, int size) {
- byte[] bytes = new byte[size];
- int temp;
- for (int i = size - 1; i > -1; i--) {
- temp = (int) ((length / (Math.pow(253, (size - 1 - i)))) % 253);
- if (temp > 127) {
- temp = -128 + 1 + temp - 127 + 1;
- }
- bytes[i] = (byte) temp;
- }
- return bytes;
- }
-
- public static int bytesToInt(byte[] bytes) {
- int length = 0;
- int size = bytes.length;
- for (int i = 0; i < size; i++) {
- if (bytes[i] == 0){
- continue;
- }
- if (bytes[i] > 0) {
- length += bytes[i] * Math.pow(253, size-1-i);
- } else {
- length += (127-1 + bytes[i] + 128-1) * Math.pow(253, size-1-i);
- }
- }
- return length;
- }
- }
为此特意处理输入输出流
- package org.springframework.http.converter.protobuf;
-
- import com.google.common.base.Preconditions;
- import io.grpc.Detachable;
- import io.grpc.HasByteBuffer;
- import io.grpc.KnownLength;
-
- import javax.annotation.Nullable;
- import java.io.IOException;
- import java.io.InputStream;
- import java.nio.ByteBuffer;
-
- public class BufferInputStream extends InputStream
- implements KnownLength, HasByteBuffer, Detachable {
- private InputStream buffer;
-
- public BufferInputStream(InputStream buffer) {
- this.buffer = Preconditions.checkNotNull(buffer, "buffer");
- }
-
- @Override
- public int available() throws IOException {
- byte[] lengthBytes = new byte[5];
- buffer.read(lengthBytes, 0, 5);
- int length = LengthBytesUtils.bytesToInt(lengthBytes);
- byte[] valid = LengthBytesUtils.intToBytes(length, 5);
- // buffer.skip(2);
- return length;
- }
-
- @Override
- public int read() throws IOException {
- if (buffer.read() == 0) {
- // EOF.
- return -1;
- }
- return buffer.read();
- }
-
- @Override
- public int read(byte[] dest, int destOffset, int length) throws IOException {
- buffer.read(dest, destOffset, length);
- return length;
- }
-
- @Override
- public long skip(long n) throws IOException {
- buffer.skip(n);
- return n;
- }
-
- @Override
- public void mark(int readlimit) {
- buffer.mark(readlimit);
- }
-
- @Override
- public void reset() throws IOException {
- buffer.reset();
- }
-
- @Override
- public boolean markSupported() {
- return buffer.markSupported();
- }
-
- @Override
- public boolean byteBufferSupported() {
- return false;
- }
-
- @Nullable
- @Override
- public ByteBuffer getByteBuffer() {
- return null;
- }
-
- @Override
- public InputStream detach() {
- InputStream detachedBuffer = buffer;
- try {
- buffer.reset();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return new BufferInputStream(detachedBuffer);
- }
-
- @Override
- public void close() throws IOException {
- buffer.close();
- }
- }
-
- package org.springframework.http.converter.protobuf;
-
- import com.google.protobuf.ByteOutput;
-
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- import org.springframework.http.HttpHeaders;
-
- import java.io.ObjectOutputStream;
- import java.nio.charset.StandardCharsets;
-
- public class MessageOutputStreamUtils {
-
- public static void writePrefix(ByteOutput outputStream, byte[] bytes) throws IOException {
- outputStream.write(bytes, 0, bytes.length);
- }
-
- public static void writeSuffix(ByteOutput outputStream) throws IOException {
- HttpHeaders headers = new HttpHeaders();
- headers.add("grpc-status", "0");
- byte[] bytes;
- try (ByteArrayOutputStream bo = new ByteArrayOutputStream();
- ObjectOutputStream oo = new ObjectOutputStream(bo);) {
- oo.writeObject(headers);
- bytes = bo.toByteArray();
- outputStream.write(bytes, 0, bytes.length);
- }
- }
- }
笔者试着在输出流多输出header,但是传到body里面了,而不是一个返回发送2次header
因为缺少最后这个Header,所以grpc原生client端解析结束不正确
grpc实际上本质还是Http2.0+谷歌定制的protobuf,表现形式为rpc调用,依赖变重,如果还需要ssl/tls就会需要证书加密传输,在内部环境实际上是没必要的,适合对外接口和非浏览器模式,可以实现推送(HTTP2.0的能力,现在不推荐用这个能力了),实际上也可以跟传统的Tomcat通信,笔者已经实现调用通过,只有最后的传输结束还没处理好。