• Netty优化-rpc


    1.3 RPC 框架

    1)准备工作

    这些代码可以认为是现成的,无需从头编写练习

    为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

    @Data
    public abstract class Message implements Serializable {
    
        // 省略旧的代码
    
        public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
        public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;
    
        static {
            // ...
            messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    请求消息

    @Getter
    @ToString(callSuper = true)
    public class RpcRequestMessage extends Message {
    
        /**
         * 调用的接口全限定名,服务端根据它找到实现
         */
        private String interfaceName;
        /**
         * 调用接口中的方法名
         */
        private String methodName;
        /**
         * 方法返回类型
         */
        private Class<?> returnType;
        /**
         * 方法参数类型数组
         */
        private Class[] parameterTypes;
        /**
         * 方法参数值数组
         */
        private Object[] parameterValue;
    
        public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
            super.setSequenceId(sequenceId);
            this.interfaceName = interfaceName;
            this.methodName = methodName;
            this.returnType = returnType;
            this.parameterTypes = parameterTypes;
            this.parameterValue = parameterValue;
        }
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_REQUEST;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    响应消息

    @Data
    @ToString(callSuper = true)
    public class RpcResponseMessage extends Message {
        /**
         * 返回值
         */
        private Object returnValue;
        /**
         * 异常值
         */
        private Exception exceptionValue;
    
        @Override
        public int getMessageType() {
            return RPC_MESSAGE_TYPE_RESPONSE;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    服务器架子

    @Slf4j
    public class RpcServer {
        public static void main(String[] args) {
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
            
            // rpc 请求消息处理器,待实现
            RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder());
                        ch.pipeline().addLast(LOGGING_HANDLER);
                        ch.pipeline().addLast(MESSAGE_CODEC);
                        ch.pipeline().addLast(RPC_HANDLER);
                    }
                });
                Channel channel = serverBootstrap.bind(8080).sync().channel();
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error", e);
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    服务器 handler

    @Slf4j
    @ChannelHandler.Sharable
    public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
            RpcResponseMessage response = new RpcResponseMessage();
            response.setSequenceId(message.getSequenceId());
            try {
                // 获取真正的实现对象
                HelloService service = (HelloService)
                        ServicesFactory.getService(Class.forName(message.getInterfaceName()));
    
                // 获取要调用的方法
                Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
    
                // 调用方法
                Object invoke = method.invoke(service, message.getParameterValue());
                // 调用成功
                response.setReturnValue(invoke);
            } catch (Exception e) {
                e.printStackTrace();
                // 调用异常
                response.setExceptionValue(e);
            }
            // 返回结果
            ctx.writeAndFlush(response);
        }
    
        public static void main(String[] args) throws Exception {
            RpcRequestMessage message = new RpcRequestMessage(
                    1,
                    "cn.itcast.rpc.service.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            );
            // 获取真正的实现对象
            HelloService service = (HelloService)
                    ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            Object invoke = method.invoke(service, message.getParameterValue());
            System.out.println(invoke);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    客户端架子

    @Slf4j
    public class RpcClient {
        public static void main(String[] args) {
            NioEventLoopGroup group = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
            
            // rpc 响应消息处理器,待实现
            RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(group);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder());
                        ch.pipeline().addLast(LOGGING_HANDLER);
                        ch.pipeline().addLast(MESSAGE_CODEC);
                        ch.pipeline().addLast(RPC_HANDLER);
                    }
                });
                Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
    
                ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                        1,
                        "cn.itcast.rpc.service.HelloService",
                        "sayHello",
                        String.class,
                        new Class[]{String.class},
                        new Object[]{"张三"}
                )).addListener(promise -> {
                    if (!promise.isSuccess()) {
                        Throwable cause = promise.cause();
                        log.error("error", cause);
                    }
                });
    
                channel.closeFuture().sync();
            } catch (Exception e) {
                log.error("client error", e);
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    客户端handler

    @Slf4j
    @ChannelHandler.Sharable
    public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
            log.debug("{}", msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    服务器端的 service 获取

    public class ServicesFactory {
    
        static Properties properties;
        static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
    
        static {
            try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
                properties = new Properties();
                properties.load(in);
                Set<String> names = properties.stringPropertyNames();
                for (String name : names) {
                    if (name.endsWith("Service")) {
                        Class<?> interfaceClass = Class.forName(name);
                        Class<?> instanceClass = Class.forName(properties.getProperty(name));
                        map.put(interfaceClass, instanceClass.newInstance());
                    }
                }
            } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    
        public static <T> T getService(Class<T> interfaceClass) {
            return (T) map.get(interfaceClass);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    相关配置 application.properties

    serializer.algorithm=Json
    cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
    
    • 1
    • 2

    业务类

    public interface HelloService {
        String sayHello(String name);
    }
    
    public class HelloServiceImpl implements HelloService {
        @Override
        public String sayHello(String msg) {
            //int i = 1 / 0;
            return "你好, " + msg;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    计数器

    public abstract class SequenceIdGenerator {
        private static final AtomicInteger id = new AtomicInteger();
    
        public static int nextId() {
            return id.incrementAndGet();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    客户端最终代码

    @Slf4j
    public class RpcClientManager {
    
        public static void main(String[] args) throws IOException {
            HelloService service = getProxyService(HelloService.class);
            System.out.println(service.sayHello("zhangsan"));
            System.out.println(service.sayHello("lisi"));
        }
    
        public static <T> T getProxyService(Class<T> serviceClass) {
            ClassLoader classLoader = serviceClass.getClassLoader();
            Class<?>[] interfaces = new Class[]{serviceClass};
            Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
                // 1. 将方法调用转换为 消息对象
                int sequenceId = SequenceIdGenerator.nextId();
                RpcRequestMessage msg = new RpcRequestMessage(
                        sequenceId,
                        serviceClass.getName(),
                        method.getName(),
                        method.getReturnType(),
                        method.getParameterTypes(),
                        args
                );
                // 2. 将消息对象发送出去
                getChannel().writeAndFlush(msg);
    
                // 3. 准备一个空 Promise 对象,来接收结果             指定 promise 对象异步接收结果线程
                DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
                RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
    
                log.debug("主线程开始等待...");
                // 4. 阻塞等待 promise 结果
                promise.await();
                log.debug("主线程放行...");
                if (promise.isSuccess()) {
                    return promise.getNow();
                } else {
                    throw new RuntimeException(promise.cause());
                }
            });
            return (T) o;
        }
    
        private static Channel channel = null;
        private static final Object LOCK = new Object();
    
        public static Channel getChannel() {
            if (channel != null) {
                return channel;
            }
            synchronized (LOCK) {
                if (channel != null) {
                    return channel;
                }
                initChannel();
                return channel;
            }
        }
    
        //初始化channel
        private static void initChannel() {
            NioEventLoopGroup group = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
            RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            try {
                channel = bootstrap.connect("localhost", 8080).sync().channel();
                channel.closeFuture().addListener(future -> {
                    group.shutdownGracefully();
                });
            } catch (Exception e) {
                log.error("client error", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    服务端不变,服务端handler

    @Slf4j
    @ChannelHandler.Sharable
    public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
            RpcResponseMessage response = new RpcResponseMessage();
            response.setSequenceId(message.getSequenceId());
            try {
                HelloService service = (HelloService)
                        ServicesFactory.getService(Class.forName(message.getInterfaceName()));
                Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
                Object invoke = method.invoke(service, message.getParameterValue());
                response.setReturnValue(invoke);
            } catch (Exception e) {
                e.printStackTrace();
                String msg = e.getCause().getMessage();
                response.setExceptionValue(new Exception("远程调用出错:" + msg));
            }
            log.error(response.toString());
            ctx.writeAndFlush(response);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    执行结果

    16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
    16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 1, 228
    16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, zhangsan, exceptionValue=null)
    16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
    你好, zhangsan
    16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
    16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 2, 224
    16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=2, messageType=102), returnValue=你好, lisi, exceptionValue=null)
    16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
    你好, lisi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    音频频谱显示-基于fmod设计音乐播放器并动态显示频谱图(二)
    ATFX:非农就业报告来袭,美元指数或再迎剧烈波动
    神经网络方法研究及应用,神经网络研究主要内容
    Vue组件------列表组件设计
    ISO20000信息技术服务管理体系认证条件
    辽宁博学优晨教育科技有限公司视频剪辑培训靠谱吗?
    二、Redis分布式锁
    学习记录十六 ---- spring boot
    阿里云天池大赛赛题(机器学习)——工业蒸汽量预测(完整代码)
    基于红黑树对map和set容器的封装
  • 原文地址:https://blog.csdn.net/qq_42665745/article/details/134054790