• RPC框架(一)——简易RPC


    RPC介绍

    RPC,Remote Procedure Call 即远程过程调用,远程过程调用其实对标的是本地过程调用
    在这里插入图片描述
    一个RPC框架要进行使用应该要具有如下的组件(功能)
    在这里插入图片描述
    从整体层次来看,一个RPC协议的框架应该具有三个层面:

    • 服务的注册中心
    • 请求服务的客户端
    • 提供服务的服务端。

    关于这三个层面,其实细分的话,又可以分为以下几个部分,每一部分完成各自的任务。

    1.客户端(客户端发起请求,调用远程方法)

    2.客户端存根(存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端)作为一个代理类

    3.网络传输 通过网络传输,把我们调用的远程接口中的参数传输给服务端,这样服务端的接口实现类才能进行处理,在处理完成之后,还要通过网络传输的方式把返回的结果发送回来。网络传输一般有原生的Soket方式,还有现在常用的Netty

    4.服务端存根( 接收客户端发送过来的请求消息并进行解析,然后再调用服务端的方法进行处理)作为一个代理类

    5.服务端 (提供服务的一方,有远程接口和实现类)

    简易RPC实现

    通用接口

    Service接口:

    public interface HelloService {
        String hello(HelloObject object);
    }
    
    • 1
    • 2
    • 3

    传输数据的实体类(在网络传输的过程中,实体类都需要实现Serializable接口,代表可序列化):

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class HelloObject implements Serializable {
        private Integer id;
        private String message;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Service接口的实现类:

    @Service
    public class HelloServiceImpl implements HelloService {
        private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
        @Override
        public String hello(HelloObject object) {
            logger.info("接收到消息:{}", object.getMessage());
            return "这是Impl1方法";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    传输格式

    RpcRequest对象:

    
    @Builder
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class RpcRequest implements Serializable {
    
        /**
         * 请求号
         */
        private String requestId;
        /**
         * 待调用接口名称
         */
        private String interfaceName;
        /**
         * 待调用方法名称
         */
        private String methodName;
        /**
         * 调用方法的参数
         */
        private Object[] parameters;
        /**
         * 调用方法的参数类型
         */
        private Class<?>[] paramTypes;
    
        /**
         * 是否是心跳包
         */
        private Boolean heartBeat;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    RpcResponse对象:

    @Data
    @NoArgsConstructor
    public class RpcResponse<T> implements Serializable {
    
        /**
         * 响应对应的请求号
         */
        private String requestId;
        /**
         * 响应状态码
         */
        private Integer statusCode;
        /**
         * 响应状态补充信息
         */
        private String message;
        /**
         * 响应数据
         */
        private T data;
    
        public static <T> RpcResponse<T> success(T data, String requestId) {
            RpcResponse<T> response = new RpcResponse<>();
            response.setRequestId(requestId);
            response.setStatusCode(ResponseCode.SUCCESS.getCode());
            response.setData(data);
            return response;
        }
        public static <T> RpcResponse<T> fail(ResponseCode code, String requestId) {
            RpcResponse<T> response = new RpcResponse<>();
            response.setRequestId(requestId);
            response.setStatusCode(code.getCode());
            response.setMessage(code.getMessage());
            return response;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    状态枚举类

    @AllArgsConstructor
    @Getter
    public enum ResponseCode {
        SUCCESS(200, "调用方法成功"),
        FAIL(500, "调用方法失败"),
        METHOD_NOT_FOUND(500, "未找到指定方法"),
        CLASS_NOT_FOUND(500, "未找到指定类");
        private final int code;
        private final String message;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    客户端实现(动态代理

    客户端方面,由于在客户端这一侧我们并没有接口的具体实现类,就没有办法直接生成实例对象。这时,我们可以通过动态代理的方式生成实例,并且调用方法时生成需要的RpcRequest对象并且发送给服务端

    public class RpcClientProxy implements InvocationHandler {
        private String host;
        private int port;
    
        public RpcClientProxy(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Class<T> clazz) {
            return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
        }
        
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            RpcRequest rpcRequest = RpcRequest.builder()
                    .interfaceName(method.getDeclaringClass().getName())
                    .methodName(method.getName())
                    .parameters(args)
                    .paramTypes(method.getParameterTypes())
                    .build();
            RpcClient rpcClient = new RpcClient();
            return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    生成了RpcRequest对象后,我们使用一个RpcClient来发送这个请求,并且通过getData方法来获取响应的数据

    public class RpcClient {
     
        private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
     
        public Object sendRequest(RpcRequest rpcRequest, String host, int port) {
            try (Socket socket = new Socket(host, port)) {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                objectOutputStream.writeObject(rpcRequest);
                objectOutputStream.flush();
                return objectInputStream.readObject();
            } catch (IOException | ClassNotFoundException e) {
                logger.error("调用时有错误发生:", e);
                return null;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里的实现方式就是直接使用Java的序列化方式(通过实现Serizlizable),创建一个Socket,利用Socket进行传输,获取ObjectOutputStream对象,然后把需要发送的对象传进去即可,接收时获取ObjectInputStream对象,readObject()方法就可以获得一个返回的对象。

    服务端实现(反射调用)

    主要流程就是使用一个ServerSocket监听某个端口,循环接收连接请求,如果发来了请求就创建一个线程,在新线程中处理调用。这里创建线程采用线程池的方式。

    public class RpcServer {
     
        private final ExecutorService threadPool;
        private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
     
        public RpcServer() {
            int corePoolSize = 5;
            int maximumPoolSize = 50;
            long keepAliveTime = 60;
            BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<>(100);
            ThreadFactory threadFactory = Executors.defaultThreadFactory();
            threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory);
        }
        
    	 public void register(Object service, int port) {
            try (ServerSocket serverSocket = new ServerSocket(port)) {
                logger.info("服务器正在启动...");
                Socket socket;
                while((socket = serverSocket.accept()) != null) {
                    logger.info("客户端连接!Ip为:" + socket.getInetAddress());
                    threadPool.execute(new WorkerThread(socket, service));
                }
            } catch (IOException e) {
                logger.error("连接时有错误发生:", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    这里向工作线程WorkerThread传入了socket和用于服务端实例service。WorkerThread实现了Runnable接口,用于接收RpcRequest对象,解析并且调用,生成RpcResponse对象并传输回去。run方法如下:

    @Override
        public void run() {
            try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
                RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
                Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
                Object returnObject = method.invoke(service, rpcRequest.getParameters());
                objectOutputStream.writeObject(RpcResponse.success(returnObject));
                objectOutputStream.flush();
            } catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
                logger.error("调用或发送时有错误发生:", e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    测试

    上面已经实现了一个HelloService的实现类了,现在我们只需要创建一个RpcServer并且把这个实现类注册进去就行了

    public class TestServer {
        public static void main(String[] args) {
            HelloService helloService = new HelloServiceImpl();
            RpcServer rpcServer = new RpcServer();
            rpcServer.register(helloService, 9000);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    服务端开放在9000端口。

    客户端方面,我们需要通过动态代理,生成代理对象,并且调用,动态代理会自动帮我们向服务端发送请求的

    public class TestClient {
        public static void main(String[] args) {
            RpcClientProxy proxy = new RpcClientProxy("127.0.0.1", 9000);
            HelloService helloService = proxy.getProxy(HelloService.class);
            HelloObject object = new HelloObject(12, "This is a message");
            String res = helloService.hello(object);
            System.out.println(res);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    小白也能通俗易懂的Mac环境变量配置教程
    别对蔚来手机抱太大希望
    【每天学习一点新知识】嘘!上课偷偷学习HTTPS加密
    nodejs毕业设计拼车租车平台
    L13.linux命令每日一练 -- 第二章 文件和目录操作命令 -- lsattr和file命令
    transformer_01
    macOs Ventura 13自动开机关机设置教程(命令行)
    单位建数字档案室的意义和作用
    猿创征文|一篇打通架构设计,Java设计模式10,建造者模式
    PerfView专题 (第一篇):如何寻找热点函数
  • 原文地址:https://blog.csdn.net/mao____mao/article/details/127991122