所谓RPC,指的是远程服务调用。抽离所有技术细节,其大致流程不外乎如下几个部分:
今天我们会开一个新坑,手写一个RPC框架。这个坑会分成好几季,这是第一季。本文会协助你开始一个最简单版本的RPC框架,特点有以下几个:
学习本文你应当有的前置知识:
本系列绝对不是入门RPC的文章,还是有那么点,不太好懂的,如果是刚开始学习Java或者springboot的同学请先去看我写的Springboot入门系列和Thrift、Dubbo等现成Rpc框架的使用,有了初步的概念再来看这一篇会好很多。
一个最简单RPC框架应当包含哪些部分呢?
我们先来看一下经典的RPC框架都有什么:服务接口定义,服务提供者,序列化,stub代理,传输协议,注册中心,服务消费者,反序列化。
如果说其中哪些部分可以暂时放一放,或者用简化方案代替,那必然是序列化、反序列化、传输协议,不是说这些不需要,而是可以用现成的方案,不需要自己造轮子(当然如果你要实现商用闭源的RPC框架那肯定要自己设计协议)。其实服务接口定义也可以不要,如果你只是做一个测试方法比如sayHello,那要不要接口真的不重要。
所以,最终我们知道,服务提供方和服务消费方以及实现socket传输的部分是绝对必要的。那么我们可以这样简化:
Service Api -> Consumer Proxy <---------socket传输---------> Provider Reflect -> Service Impl
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
</dependencies>
package service;
public interface HelloService {
/**
* 服务接口
*
* @param content
* @return
*/
public String sayHello(String content);
}
package service;
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String content) {
return String.format("hello, %s", content);
}
}
package framework;
/**
* @Desc
**/
public class ProviderReflect {
private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
5,
200,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
public static void provider(final Object service, int port) throws Exception {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
final Socket socket = serverSocket.accept();
EXECUTOR_SERVICE.execute(() -> {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
try {
String methodName = input.readUTF();
Object[] arguments = (Object[]) input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Object result = MethodUtils.invokeExactMethod(service, methodName, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
public class RpcProviderMain {
public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
ProviderReflect.provider(service, 10020);
}
}
package framework;
/**
* @Desc
**/
public class ConsumerProxy {
@SuppressWarnings("unchecked")
public static <T> T consume(final Class<T> interfaceClass, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass},
(proxy, method, args) -> {
try (Socket socket = new Socket(host, port)) {
try (ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())) {
output.writeUTF(method.getName());
output.writeObject(args);
try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream())) {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
}
}
}
});
}
}
package invoke;
/**
* @Desc
**/
public class RpcConsumerMain {
public static void main(String[] args) throws Exception {
HelloService service = ConsumerProxy.consume(HelloService.class, "127.0.0.1", 10020);
for(int i = 0; i < 1000; ++i) {
String hello = service.sayHello(String.format("test_%d", i));
System.out.println(hello);
Thread.sleep(1000);
}
}
}
虽然本例中借口定义、数据传输都非常简陋,但实际上已经具备了RPC的基本要素,服务提供,服务消费,和远程调用,这个例子将是我们这个系列的起点