1)RPC定义: Remote Procedure Call, 远程过程调用协议;
远程过程调用是一种软件通讯协议,基于该协议,一个程序可以通过网络调用另一台计算机上的服务而无需关心细节,就像调用本地系统的服务一样。
【补充】RMI:Remote Method Invocation,远程方法调用
2)常见的 RPC 框架有:
1)当触发远程过程调用,调用环境会被挂起,过程参数会通过网络传送到过程执行环境(过程参数可以理解为是类名,方法名,方法参数,方法参数类型等)。
2)当服务器过程执行完成,结果会被回传到调用环境;只要过程调用返回则客户端立即恢复执行;
3)RPC调用过程中,会经过如下步骤:
4)RPC 调用流程图

【图解】 在RPC中:
1)优点:
2)缺点:
部分代码转自: 简单RPC之Socket实现_归田的博客-CSDN博客_rpc socket
0)目录结构:

【图解】RPC的实现代码包含3部分:
1)公共协议:
2)客户端(服务消费方):
3)服务端(服务提供方):
注意(报文的序列化与反序列化):
报文:
- /**
- * @Description 报文(注意:要可序列化)
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月12日
- */
- public class MyRpcMessage implements Externalizable {
- /** 全限定类名 */
- private String className;
- /** 方法名 */
- private String methodName;
- /** 参数类型clazz数组 */
- private Class>[] parameterTypes;
- /** 参数值 */
- private Object[] parameterValues;
-
- /**
- * @description 私有构造器
- * @author xiao tang
- * @date 2022/9/12
- */
- public MyRpcMessage() {
- }
-
- /**
- * @description 创建生成器
- * @return 生成器
- * @author xiao tang
- * @date 2022/9/12
- */
- public static Builder builder() {
- Builder builder = new Builder();
- return builder;
- }
-
- public String getClassName() {
- return className;
- }
-
- public void setClassName(String className) {
- this.className = className;
- }
-
- public String getMethodName() {
- return methodName;
- }
-
- public void setMethodName(String methodName) {
- this.methodName = methodName;
- }
-
- public Class>[] getParameterTypes() {
- return parameterTypes;
- }
-
- public void setParameterTypes(Class>[] parameterTypes) {
- this.parameterTypes = parameterTypes;
- }
-
- public Object[] getParameterValues() {
- return parameterValues;
- }
-
- public void setParameterValues(Object[] parameterValues) {
- this.parameterValues = parameterValues;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(className);
- out.writeUTF(methodName);
- out.writeObject(parameterTypes);
- out.writeObject(parameterValues);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- this.className = in.readUTF();
- this.methodName = in.readUTF();
- this.parameterTypes = (Class>[]) in.readObject();
- this.parameterValues = (Object[]) in.readObject();
- }
-
- /**
- * @description 生成器
- * @author xiao tang
- * @date 2022/9/12
- */
- public static class Builder {
- /** rpc报文对象 */
- private MyRpcMessage myRpcMessage;
-
- /**
- * @description 私有构造器
- * @author xiao tang
- * @date 2022/9/12
- */
- private Builder() {
- this.myRpcMessage = new MyRpcMessage();
- }
-
- /**
- * @description 设置全限定类名
- * @return 生成器
- * @author xiao tang
- * @date 2022/9/12
- */
- public Builder className(String className) {
- this.myRpcMessage.setClassName(className);
- return this;
- }
-
- /**
- * @description 设置方法名
- * @return 生成器
- * @author xiao tang
- * @date 2022/9/12
- */
- public Builder methodName(String methodName) {
- this.myRpcMessage.setMethodName(methodName);
- return this;
- }
-
- /**
- * @description 设置参数类型数组
- * @return 生成器
- * @author xiao tang
- * @date 2022/9/12
- */
- public Builder parameterTypes(Class>[] parameterTypes) {
- this.myRpcMessage.setParameterTypes(parameterTypes);
- return this;
- }
-
- /**
- * @description 设置参数值数组
- * @return 生成器
- * @author xiao tang
- * @date 2022/9/12
- */
- public Builder parameterValues(Object[] parameterValues) {
- this.myRpcMessage.setParameterValues(parameterValues);
- return this;
- }
-
- /**
- * @description 获得rpc报文对象
- * @return rpc报文对象
- * @author xiao tang
- * @date 2022/9/12
- */
- public MyRpcMessage build() {
- return myRpcMessage;
- }
- }
-
- }
方法接口定义:
- /**
- * @Description 过程调用接口
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public interface IHelloRpc {
-
- String hello(String name);
- }
- /**
- * @Description rpc服务器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class MyRpcSocketServer {
- /** 线程池 */
- private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
-
- public static void main(String[] args) {
- try {
- startServer0(8089);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- THREAD_POOL.shutdown();
- }
- }
-
- /**
- * @description 开启服务器
- * @param port 端口
- * @author xiao tang
- * @date 2022/9/11
- */
- private static void startServer0(int port) throws IOException {
- ServerSocket serverSocket = new ServerSocket(port);
- System.out.println("服务器启动成功.");
- while(true) {
- try {
- // 阻塞直到有客户端请求
- Socket socket = serverSocket.accept();
- // 处理器请求
- THREAD_POOL.execute(new BusiTask(socket));
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("抛出异常");
- throw e;
- }
- }
- }
-
- /**
- * @description 业务作业,Runnable子类
- * @author xiao tang
- * @date 2022/9/11
- */
- static class BusiTask implements Runnable {
- private Socket socket;
- public BusiTask(Socket socket) {
- this.socket = socket;
- }
- @Override
- public void run() {
- // 获得字节输入输出流
- try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
- ObjectOutputStream outputMsg = new ObjectOutputStream(socket.getOutputStream());) {
- // 获取过程参数(报文),包括类名,方法名,方法参数类型,方法参数列表
- MyRpcMessage inputMsg = (MyRpcMessage)inputStream.readObject();
- // 通过反射创建对象
- Class> clazz = Class.forName(inputMsg.getClassName());
- Object procedureObj = clazz.newInstance();
- // // 通过反射创建方法
- Method method = clazz.getDeclaredMethod(inputMsg.getMethodName(), inputMsg.getParameterTypes());
- // // 通过反射调用方法
- Object result = method.invoke(procedureObj, inputMsg.getParameterValues());
- // 把结果回写
- outputMsg.writeObject(result);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
过程调用接口实现类:
- /**
- * @Description 过程调用接口实现
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class IHelloRpcServerImpl implements IHelloRpc {
- @Override
- public String hello(String name) {
- return "hello " + name + ", nice to meet you, i am a server named RPC.";
- }
- }
- /**
- * @Description RPC存根(代理)工厂
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class MyRpcProxyFactory {
-
- /**
- * @description 私有构造器
- * @author xiao tang
- * @date 2022/9/11
- */
- private MyRpcProxyFactory() {
- }
-
- /**
- * @description 创建代理对象
- * @param targetClazz 目标对象clazz
- * @param className 类名
- * @return 代理对象
- * @author xiao tang
- * @date 2022/9/11
- */
- public static
T newProxy(Class targetClazz, String className) { - return (T) Proxy.newProxyInstance(
- targetClazz.getClassLoader()
- , new Class>[]{targetClazz}
- , (Object proxy, Method method, Object[] args) ->{
- Socket socket = new Socket("localhost",8089);
- // 获取输出流
- try(ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
- ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
- // 写出过程参数到服务器,包括类名,方法名,方法参数类型,方法参数列表
- MyRpcMessage message = MyRpcMessage.builder()
- .className(className)
- .methodName(method.getName())
- .parameterTypes(method.getParameterTypes())
- .parameterValues(args)
- .build();
- outputStream.writeObject(message);
- outputStream.flush();
-
- // 获取输入流
- Object result = inputStream.readObject();
- if (result instanceof Throwable) {
- throw (Throwable)result;
- }
- return result;
- } finally {
- socket.close();
- }
- });
- }
- }
测试用例入口:
- public class MyRpcSocketClient {
-
- public static void main(String[] args) {
- IHelloRpc helloRpc = MyRpcProxyFactory.newProxy(IHelloRpc.class, "com.my.netty.rpc.socket.server.IHelloRpcServerImpl");
- String result = helloRpc.hello("成都");
- System.out.println(result);
- }
- }
hello 成都, nice to meet you, i am a server named RPC.
1)背景: 基于socket的rpc实现时阻塞式的,吞吐量不高; 本文尝试使用netty 实现rpc ;
2)netyt优势:
3)代码目录:

【图解】 代码还是分为3部分(常量不算):
1)公共协议:
2)客户端:
3)服务端:
1)公共接口;
- /**
- * @Description 公共接口,服务提供方,服务消费方都需要
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public interface HelloService {
-
- String hello(String msg);
- }
常量:
- public class MyDubboConst {
- /** 协议头 */
- public static final String PROTOCOL_HEAD = "HelloServer#hello#";
- }
1)netty客户端代理:
- /**
- * @Description 客户端-服务消费者
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class MyDubboNettyClient {
- // 创建线程池
- private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- // 客户端处理器
- private static MyDubboNettyClientHandler clientHandler;
-
- private int counter = 0;
-
- // 编写方法,使用代理模式,获取代理对象
- public Object getBean(final Class> serviceClass, final String protocolHead) {
- return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
- new Class>[]{serviceClass}, (Object proxy, Method method, Object[] args)->{
- System.out.println("Object proxy, Method method, Object[] args 进入了,第 " + (++counter) + " 次进入");
- // 下面的代码, 客户端每调用一次hello,就会进入到该代码
- if (clientHandler == null) {
- initClientHandler(); // 初始化客户端处理器
- }
- // 设置要发送给服务器的信息
- // protocolHead 协议头部
- // args[0] 就是客户端调用 hello的参数
- System.out.println("clientHandler.setParameter");
- clientHandler.setParameter(protocolHead + args[0]);
- System.out.println("executorService.submit");
- return executorService.submit(clientHandler).get();
- });
- }
-
- // 初始化客户端处理器
- private static void initClientHandler() {
- clientHandler = new MyDubboNettyClientHandler();
- // 创建 EventLoopGroup
- NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- pipeline.addLast(new StringDecoder());
- pipeline.addLast(new StringEncoder());
- pipeline.addLast(clientHandler);
- }
- });
- // 启动客户端
- ChannelFuture channelFuture = null;
- try {
- channelFuture = bootstrap.connect("127.0.0.1", 8089).sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("客户端启动成功");
- }
- }
2)netty客户端业务逻辑处理器
- /**
- * @Description dubbo客户端处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class MyDubboNettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
-
- private ChannelHandlerContext context; // 上下文
- private String result; // 返回结果
- private String parameter; // 客户端调用方法时,传入的参数
-
- // 与服务器连接创建成功后,就会被调用 (第1个被调用 )
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("MyDubboNettyClientHandler.channelActive");
- context = ctx; // 因为在其他方法也会用到这个 context
- }
-
- // 收到服务器的数据后,就会调用方法
- @Override
- public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("MyDubboNettyClientHandler.channelRead");
- result = msg.toString();
- notify(); // 唤醒等待的线程
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- System.out.println("MyDubboNettyClientHandler.exceptionCaught");
- ctx.close();
- }
-
- // 被代理对象调用, 发送数据给服务器 -> 然后 wait -> 等待被唤醒 -> 返回结果 (第3个被调用)
- @Override
- public synchronized Object call() throws Exception {
- System.out.println("发送数据到服务器, parameter = " + parameter);
- context.writeAndFlush(parameter); // 发送数据给服务器
- System.out.println("发送数据到服务成功");
- wait(); // 等待直到服务器返回结果,服务器返回后调用 channelRead 方法来唤醒;
- System.out.println("after wait().");
- return result; // 服务提供方返回的结果
- }
- // 第2个被调用,设置参数
- public void setParameter(String parameter) {
- System.out.println("setParameter");
- this.parameter = parameter;
- }
- }
3)客户端启动引导程序:
- public class MyDubboClientBootstrap {
-
- public static void main(String[] args) {
- // 创建访问消费者-netty客户端
- MyDubboNettyClient customer = new MyDubboNettyClient();
- // 创建代理对象
- HelloService helloService = (HelloService)customer.getBean(HelloService.class, MyDubboConst.PROTOCOL_HEAD);
-
- // 通过代理对象调用服务提供者的服务
- String result = helloService.hello("hello服务器");
- System.out.println("服务器响应结果 = " + result);
- }
- }
1)过程调用(方法调用)的具体实现
- /**
- * @Description 服务提供方实现接口
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class HelloServiceImpl implements HelloService {
- private int counter;
- // 当有消费方调用该方法时,就返回一个字符串
- @Override
- public String hello(String msg) {
- System.out.println("收到客户端消息 = " + msg);
- // 根据msg 返回不同结果
- if (msg != null) {
- return "你好客户端,我收到你的消息 ["+msg+"]这是第" + (++counter) + "次收到消息";
- } else {
- return "你好客户端,我收到的消息为空" ;
- }
- }
- }
2)netty服务器
- /**
- * @Description 基于netty的 Dubbo 服务器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class MyDubboNettyServer {
-
- public static void startServer(String hostName, int port) throws InterruptedException {
- startServer0(hostName, port);
- }
-
- private static void startServer0(String hostName, int port) throws InterruptedException {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- pipeline.addLast(new StringDecoder());
- pipeline.addLast(new StringEncoder());
- pipeline.addLast(new MyDubboNettyServerHandler()) ; // 自定义业务处理器
- }
- });
- // 启动服务器
- ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
- System.out.println("服务提供方启动成功,开始提供服务");
- // 监听关闭
- channelFuture.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
3)netty服务器业务处理器
- /**
- * @Description 基于netty的 Dubbo 服务器处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月11日
- */
- public class MyDubboNettyServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- // 获取客户端发送的消息 并 调用公共接口服务
- System.out.println("接收的msg = " + msg);
- // 客户端在调用服务器的api时,我们需要定义一个协议
- // 如 msg 以 "HelloServer#hello#" 开头,才调用api
- String msgString = msg.toString();
- if (msgString.startsWith(MyDubboConst.PROTOCOL_HEAD)) {
- String result = new HelloServiceImpl().hello(msgString.substring(msgString.lastIndexOf("#")+1));
- ctx.writeAndFlush(result);
- } else {
- ctx.writeAndFlush("协议头非法");
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
4)服务器启动引导程序:
- public class MyDubboServerBootstrap {
- public static void main(String[] args) throws InterruptedException {
- // 代码代填
- MyDubboNettyServer.startServer("127.0.0.1", 8089);
- }
- }
1)服务器日志:
- 服务提供方启动成功,开始提供服务
- [DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
- [DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
- [DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
- [DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
- [DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
- [DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@e62edfd
- 接收的msg = HelloServer#hello#hello服务器
- 收到客户端消息 = hello服务器
2)客户端日志:
- 客户端启动成功
- clientHandler.setParameter
- MyDubboNettyClientHandler.channelActive
- setParameter
- executorService.submit
- 发送数据到服务器, parameter = HelloServer#hello#hello服务器
- [DEBUG] Recycler - -Dio.netty.recycler.maxCapacityPerThread: 32768
- [DEBUG] Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
- [DEBUG] Recycler - -Dio.netty.recycler.linkCapacity: 16
- [DEBUG] Recycler - -Dio.netty.recycler.ratio: 8
- 发送数据到服务成功
- [DEBUG] AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true
- [DEBUG] ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@89e890f
- MyDubboNettyClientHandler.channelRead
- after wait().
- 服务器响应结果 = 你好客户端,我收到你的消息 [hello服务器]这是第1次收到消息