RPC,Remote Procedure Call 即远程过程调用,远程过程调用其实对标的是本地过程调用
一个RPC框架要进行使用应该要具有如下的组件(功能)
从整体层次来看,一个RPC协议的框架应该具有三个层面:
关于这三个层面,其实细分的话,又可以分为以下几个部分,每一部分完成各自的任务。
1.客户端(客户端发起请求,调用远程方法)
2.客户端存根(存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端)作为一个代理类
3.网络传输 通过网络传输,把我们调用的远程接口中的参数传输给服务端,这样服务端的接口实现类才能进行处理,在处理完成之后,还要通过网络传输的方式把返回的结果发送回来。网络传输一般有原生的Soket方式,还有现在常用的Netty
4.服务端存根( 接收客户端发送过来的请求消息并进行解析,然后再调用服务端的方法进行处理)作为一个代理类
5.服务端 (提供服务的一方,有远程接口和实现类)
Service接口:
public interface HelloService {
String hello(HelloObject object);
}
传输数据的实体类(在网络传输的过程中,实体类都需要实现Serializable接口,代表可序列化):
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HelloObject implements Serializable {
private Integer id;
private String message;
}
Service接口的实现类:
@Service
public class HelloServiceImpl implements HelloService {
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
@Override
public String hello(HelloObject object) {
logger.info("接收到消息:{}", object.getMessage());
return "这是Impl1方法";
}
}
RpcRequest对象:
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest implements Serializable {
/**
* 请求号
*/
private String requestId;
/**
* 待调用接口名称
*/
private String interfaceName;
/**
* 待调用方法名称
*/
private String methodName;
/**
* 调用方法的参数
*/
private Object[] parameters;
/**
* 调用方法的参数类型
*/
private Class<?>[] paramTypes;
/**
* 是否是心跳包
*/
private Boolean heartBeat;
}
RpcResponse对象:
@Data
@NoArgsConstructor
public class RpcResponse<T> implements Serializable {
/**
* 响应对应的请求号
*/
private String requestId;
/**
* 响应状态码
*/
private Integer statusCode;
/**
* 响应状态补充信息
*/
private String message;
/**
* 响应数据
*/
private T data;
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setRequestId(requestId);
response.setStatusCode(ResponseCode.SUCCESS.getCode());
response.setData(data);
return response;
}
public static <T> RpcResponse<T> fail(ResponseCode code, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setRequestId(requestId);
response.setStatusCode(code.getCode());
response.setMessage(code.getMessage());
return response;
}
}
状态枚举类:
@AllArgsConstructor
@Getter
public enum ResponseCode {
SUCCESS(200, "调用方法成功"),
FAIL(500, "调用方法失败"),
METHOD_NOT_FOUND(500, "未找到指定方法"),
CLASS_NOT_FOUND(500, "未找到指定类");
private final int code;
private final String message;
}
客户端方面,由于在客户端这一侧我们并没有接口的具体实现类,就没有办法直接生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法时生成需要的RpcRequest对象并且发送给服务端
public class RpcClientProxy implements InvocationHandler {
private String host;
private int port;
public RpcClientProxy(String host, int port) {
this.host = host;
this.port = port;
}
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameters(args)
.paramTypes(method.getParameterTypes())
.build();
RpcClient rpcClient = new RpcClient();
return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
}
}
生成了RpcRequest对象后,我们使用一个RpcClient来发送这个请求,并且通过getData方法来获取响应的数据
public class RpcClient {
private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
try (Socket socket = new Socket(host, port)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
objectOutputStream.writeObject(rpcRequest);
objectOutputStream.flush();
return objectInputStream.readObject();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用时有错误发生:", e);
return null;
}
}
}
这里的实现方式就是直接使用Java的序列化方式(通过实现Serizlizable),创建一个Socket,利用Socket进行传输,获取ObjectOutputStream对象,然后把需要发送的对象传进去即可,接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。
主要流程就是使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池的方式。
public class RpcServer {
private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
public RpcServer() {
int corePoolSize = 5;
int maximumPoolSize = 50;
long keepAliveTime = 60;
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
}
public void register(Object service, int port) {
try (ServerSocket serverSocket = new ServerSocket(port)) {
logger.info("服务器正在启动...");
Socket socket;
while((socket = serverSocket.accept()) != null) {
logger.info("客户端连接!Ip为:" + socket.getInetAddress());
threadPool.execute(new WorkerThread(socket, service));
}
} catch (IOException e) {
logger.error("连接时有错误发生:", e);
}
}
}
这里向工作线程WorkerThread传入了socket和用于服务端实例service。WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去。run方法如下:
@Override
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
Object returnObject = method.invoke(service, rpcRequest.getParameters());
objectOutputStream.writeObject(RpcResponse.success(returnObject));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
logger.error("调用或发送时有错误发生:", e);
}
}
上面已经实现了一个HelloService的实现类了,现在我们只需要创建一个RpcServer并且把这个实现类注册进去就行了
public class TestServer {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.register(helloService, 9000);
}
}
服务端开放在9000端口。
客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的
public class TestClient {
public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
HelloService helloService = proxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(12, "This is a message");
String res = helloService.hello(object);
System.out.println(res);
}
}