• Netty网络框架学习笔记-19(实现一个简单RPC-1)


    Netty网络框架学习笔记-19(实现一个简单RPC-1_2020.06.28)

    RPC介绍

    RPC(RemoteProcedureCall)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程

    两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

    HTTP方式直接调用不叫RPC, RPC调用就好像类似调用本地方法一样

    jeBpKH.png

    常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift,Spring 旗下的 Spring Cloud。

    在RPC中, 调用者称为消费者, 被调用者称为服务提供者。 RPC框架就是将上面时序图的步骤封装起来, 让远程调用, 像在本地调用方法一样。

    1.0 基于Netty简单实现一个RPC (模拟dubbo)

    1.1.1 设计说明

    • 创建一个注册中心 (提供有多少服务提供者的IP和端口)
    • 创建一个服务提供者 (监听消费者请求, 按照约定格式, 返回数据)
    • 创建一个服务消费者 (发送请求, 按照约定格式, 发送请求数据)

    jnnZ79.png

    技术选型: 注册中心: netty实现, 客户端远程调用: netty实现, 服务提供: netty实现

    都是最最简单实现版本, 用于学习!

    2.0 消息通信协议实体

    2.1 MessageProtocol (消息流转协议实体)

    @Data
    public class MessageProtocol {
        /**
         * 调用的类名称
         */
        private String className;
    
        /**
         * 内容
         */
        private String methodName;
    
        /**
         * 方法参数类型
         */
        private Object[] methodParameterTypes;
    
        /**
         * 方法参数
         */
        private Object[] methodParameter;
    
        /**
         * 每次请求唯一ID
         */
        private String requestId;
    
        /**
         * 调用结果
         */
        private InvokeResult invokeResult;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    2.2 InvokeResult (调用结果)

    @Data
    public class InvokeResult {
    
        /**
         * 0=成功, 1=失败
         */
        private Integer resultCode;
    
        /**
         * 失败原因
         */
        private String failMessage;
    
        /**
         * 调用后的结果
         */
        private Object invokeResult;
    
        public InvokeResult(Integer resultCode, String failMessage) {
            this.resultCode = resultCode;
            this.failMessage = failMessage;
        }
    
        public InvokeResult(Integer resultCode, Object invokeResult) {
            this.resultCode = resultCode;
            this.invokeResult = invokeResult;
        }
    
        public InvokeResult() {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2.3 RegistrationInfo (注册信息实体)

    @Data
    @EqualsAndHashCode
    public class RegistrationInfo {
    
        /**
         *  0=注册, 1=拉取
         */
        private Integer type;
    
        /**
         * 服务名称
         */
        private String serviceName;
    
        /**
         * 地址
         */
        private String ip;
        /**
         * 端口号
         */
        private Integer port;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3.0 创建注册中心

    3.1.1 MyRegistrationCenter

    /**
     * @Author: ZhiHao
     * @Date: 2022/6/29 16:17
     * @Description: 注册中心 (简单提供的功能是, 接受服务提供者注册服务, 接受消费者拉取对应需要的服务提供者元数据)
     * @Versions 1.0
     **/
    @Slf4j
    public class MyRegistrationCenter {
    
        public static void main(String[] args) {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap = serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .childHandler(new MyChannelInitializer());
    
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 8888));
            channelFuture.addListener(el -> {
                if (el.isSuccess()) {
                    log.info("MyRegistrationCenter===注册中心启动成功, 等待服务提供者或者消费者访问!");
                }
            });
            try {
                channelFuture.sync().channel().closeFuture().sync();
            } catch (Exception e) {
                log.info("MyRegistrationCenter===注册中心发生异常, 消息:{}", e);
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.1.2 MyRegisteredChannelInitializer

    public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        private static final DefaultEventLoopGroup DEFAULT_EVENT_LOOP_GROUP = new DefaultEventLoopGroup(16);
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new StringEncoder());
            // 将ByteBuf转成为 json内容的ByteBuf
            pipeline.addLast(new JsonObjectDecoder());
            // 在转字符串
            pipeline.addLast(new StringDecoder());
            // 自定义业务处理器
            pipeline.addLast(DEFAULT_EVENT_LOOP_GROUP,new MyChannelHandler());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3.1.3 MyRegisteredChannelHandler

    @Slf4j
    public class MyRegisteredChannelHandler extends SimpleChannelInboundHandler<String> {
    
        private static final ConcurrentMap<String, RegistrationInfo> CONCURRENT_MAP = new ConcurrentHashMap<>();
        
        private static final int REGISTERED = 0;
        private static final int PULL_REGISTERED_INFO = 1;
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            RegistrationInfo registrationInfo = JSONUtil.toBean(msg, RegistrationInfo.class);
            if (Objects.nonNull(registrationInfo)) {
                Integer type = Optional.ofNullable(registrationInfo.getType()).orElse(-1);
                switch (type) {
                    case REGISTERED:
                        CONCURRENT_MAP.put(registrationInfo.getServiceName(), registrationInfo);
                        log.info("MyChannelHandler===注册成功一个服务提供者:{}",registrationInfo);
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.putOnce("result", "信息注册成功!");
                        String jsonStr1 = jsonObject.toString();
                        ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr1.getBytes(StandardCharsets.UTF_8)));
                        break;
                    case PULL_REGISTERED_INFO:
                        registrationInfo = CONCURRENT_MAP.get(registrationInfo.getServiceName());
                        if (Objects.isNull(registrationInfo)){
                            JSONObject jsonObject2 = new JSONObject();
                            jsonObject2.putOnce("result", "服务对应的注册信息不存在!");
                            ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonObject2.toString().getBytes(StandardCharsets.UTF_8)));
                            return;
                        }
                        String jsonStr2 = JSONUtil.toJsonStr(registrationInfo);
                        log.info("MyChannelHandler===成功拉取服务提供者地址返回给客户端:{}",jsonStr2);
                        ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr2.getBytes(StandardCharsets.UTF_8)));
                        break;
                    default:
                        JSONObject jsonObject3 = new JSONObject();
                        jsonObject3.putOnce("result", "type类型正确!");
                        ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonObject3.toString().getBytes(StandardCharsets.UTF_8)));
                }
            }
            ctx.fireChannelRead(msg);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            ctx.close();
            super.channelInactive(ctx);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("MyChannelHandler===发生异常, 信息:{}", cause);
            ctx.close();
            super.exceptionCaught(ctx, cause);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    3.1.4 注册中心启动:

    18:02:44.914 [nioEventLoopGroup-2-1] INFO com.zhihao.netty.rpc.registrationcenter.MyRegistrationCenter - MyRegistrationCenter===注册中心启动成功, 等待服务提供者或者消费者访问!
    
    • 1

    扩展:

    Java RMI 远程调用介绍

    RPC(Feign VS Dubbo)

    1

  • 相关阅读:
    爬虫一般采用什么代理IP,Python爬虫代理IP使用方法详解
    argparse的用法
    [附源码]Python计算机毕业设计Django酒店物联网平台系统
    迅为RK3568开发板QT学习手册
    【Docker】入门指南(基础篇)
    2-数据保存-XML
    C#——文件读取Directory类详情
    快来试试!免费用上GPT-4 !!!
    基于51单片机的多种音乐盒播放器Proteus仿真系统
    华为OD机试真题【服务器能耗统计】
  • 原文地址:https://blog.csdn.net/weixin_44600430/article/details/125563713