• 手写RPC框架--5.Netty业务逻辑


    RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
    RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

    5.Netty业务逻辑

    a.加入基础的Netty代码

    1.在DcyRpcBootstrap类的start()方法中加入netty代码 (待完善)

    /**
     * 启动netty服务
     */
    public void start() {
        // 1.创建EventLoopGroup,老板只负责处理请求,之后会将请求分发给worker,1比2的比例
        NioEventLoopGroup boss = new NioEventLoopGroup(2);
        NioEventLoopGroup worker = new NioEventLoopGroup(10);
    
        try{
            // 2.服务器端启动辅助对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            // 3.配置服务器
            serverBootstrap = serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // TODO 核心内容,需要添加很多入栈和出栈的handler
                            socketChannel.pipeline().addLast(null);
                        }
                    });
    
            // 4.绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
    
            // 5.阻塞操作
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                boss.shutdownGracefully().sync();
                worker.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    2.在ReferenceConfig类的get()方法中加入netty代码 (待完善)

    /**
     * 代理设计模式,生成一个API接口的代理对象
     * @return 代理对象
     */
    public T get() {
        // 使用动态代理完成工作
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class[] classes = new Class[]{interfaceRef};
    
        // 使用动态代理生成代理对象
        Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 调用sayHi()方法,事实上会走进这个代码段当中
                // 已经知道method(具体的方法),args(参数列表)
                log.info("method-->{}", method.getName());
                log.info("args-->{}", args);
    
                // 1.发现服务,从注册中心,寻找一个可用的服务
                // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
                InetSocketAddress address = registry.lookup(interfaceRef.getName());
                if (log.isInfoEnabled()){
                    log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
                }
                // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果
                // 定义线程池 EventLoopGroup
                NioEventLoopGroup group = new NioEventLoopGroup();
                // 启动一个客户端需要一个辅助类 bootstrap
                Bootstrap bootstrap = new Bootstrap();
    
                try {
                    bootstrap = bootstrap.group(group)
                            .remoteAddress(address)
                            // 选择初始化一个什么样的channel
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    socketChannel.pipeline().addLast(null);
                                }
                            });
    
    
                    // 3.连接到远程节点;等待连接完成
                    ChannelFuture channelFuture = bootstrap.connect().sync();
                    // 4.获取channel并且写数据,发送消息到服务器端
                    channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes(StandardCharsets.UTF_8)));
                    // 5.阻塞程序,等待接收消息
                    channelFuture.channel().closeFuture().sync();
    
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        group.shutdownGracefully().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                return null;
            }
        });
    
        return (T) helloProxy;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    b.对通道channel进行缓存

    每次启动程序都会建立一个新的Netty连接,显示是对不合适的
    解决方案:缓存channel,尝试从缓存中获取channel。如果为空,则创建新的连接并进行缓存

    1.在DcyRpcBootstrap类的中添加一个全局的缓存:对通道进行缓存

    // Netty的连接缓存
    public static final Map<InetSocketAddress, Channel> CHANNEL_CACHE = new ConcurrentHashMap<>();
    
    • 1
    • 2

    2.在ReferenceConfig类的get()方法中进行修改:查询缓存是否存在通道(address),若未命中,则建立新的channel并进行缓存

    /**
     * 代理设计模式,生成一个API接口的代理对象
     * @return 代理对象
     */
    public T get() {
        // 使用动态代理完成工作
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class[] classes = new Class[]{interfaceRef};
    
        // 使用动态代理生成代理对象
        Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 调用sayHi()方法,事实上会走进这个代码段当中
                // 已经知道method(具体的方法),args(参数列表)
                log.info("method-->{}", method.getName());
                log.info("args-->{}", args);
    
                // 1.发现服务,从注册中心,寻找一个可用的服务
                // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
                InetSocketAddress address = registry.lookup(interfaceRef.getName());
                if (log.isInfoEnabled()){
                    log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
                }
                // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果
    
                // 每次在这都会建立一个新的连接,对程序不合适
                // 解决方案:缓存channel,尝试从缓存中获取channel。如果为空,则创建新的连接并进行缓存
                // 1.从全局缓存中获取一个通道
                Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
    
                if (channel == null) {
                    // 建立新的channel
                    // 定义线程池 EventLoopGroup
                    NioEventLoopGroup group = new NioEventLoopGroup();
                    // 启动一个客户端需要一个辅助类 bootstrap
                    Bootstrap bootstrap = new Bootstrap();
    
                    try {
                        bootstrap = bootstrap.group(group)
                                .remoteAddress(address)
                                // 选择初始化一个什么样的channel
                                .channel(NioSocketChannel.class)
                                .handler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                                        socketChannel.pipeline().addLast(null);
                                    }
                                });
    
    
                        // 3.尝试连接服务器
                        channel = bootstrap.connect().sync().channel();
                        // 缓存
                        DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
    
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
    
                if (channel == null){
                    throw new NetworkException("获取通道channel发生了异常。");
                }
                ChannelFuture channelFuture = channel.writeAndFlush(new Object());
    
                return null;
            }
        });
    
        return (T) helloProxy;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    c.对代码进行重构优化

    1.在com.dcyrpc.discovery下创建NettyBootstrapInitializer类:提供Bootstrap的单例

    /**
     * 提供Bootstrap的单例
     */
    public class NettyBootstrapInitializer {
    
        private static final Bootstrap bootstrap = new Bootstrap();
        
        static {
            NioEventLoopGroup group = new NioEventLoopGroup();
            bootstrap.group(group)
                    // 选择初始化一个什么样的channel
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(null);
                        }
                    });
        }
    
        private NettyBootstrapInitializer() {
        }
    
        public static Bootstrap getBootstrap() {
            return bootstrap;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2.在ReferenceConfig类的get()方法中进行代码的优化

    /**
     * 代理设计模式,生成一个API接口的代理对象
     * @return 代理对象
     */
    public T get() {
        // 使用动态代理完成工作
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class[] classes = new Class[]{interfaceRef};
    
        // 使用动态代理生成代理对象
        Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 调用sayHi()方法,事实上会走进这个代码段当中
                // 已经知道method(具体的方法),args(参数列表)
                log.info("method-->{}", method.getName());
                log.info("args-->{}", args);
    
                // 1.发现服务,从注册中心,寻找一个可用的服务
                // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
                InetSocketAddress address = registry.lookup(interfaceRef.getName());
                if (log.isInfoEnabled()){
                    log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
                }
                // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果
    
                // 每次在这都会建立一个新的连接,对程序不合适
                // 解决方案:缓存channel,尝试从缓存中获取channel。如果为空,则创建新的连接并进行缓存
                // 1.从全局缓存中获取一个通道
                Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
    
                if (channel == null) {
                    // await()方法会阻塞,会等待连接成功再返回
                    // sync和await都是阻塞当前线程,获取返回值。因为连接过程和发送数据过程是异步的
                    // 如果发生了异常,sync会主动在主线程抛出异常,await不会,异常在子线程中处理,需要使用future处理
    //                    channel = NettyBootstrapInitializer.getBootstrap().connect(address).await().channel();
    
                    // 使用addListener执行异步操作
                    CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                    NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                        if (promise.isDone()) {
                            // 异步的,已经完成
                            log.info("已经和【{}】成功建立连接。", address);
                            channelFuture.complete(promise.channel());
                        } else if (!promise.isSuccess()) {
                            channelFuture.completeExceptionally(promise.cause());
                        }
                    });
    
                    // 阻塞获取channel
                    channel = channelFuture.get(3, TimeUnit.SECONDS);
    
                    // 缓存channel
                    DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
                }
                if (channel == null){
                    throw new NetworkException("获取通道channel发生了异常。");
                }
    
                /**
                 * ---------------------------同步策略---------------------------
                 */
    //                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
    //                // get()阻塞获取结果
    //                // getNow()获取当前的结果,如果未处理完成,返回null
    //                if (channelFuture.isDone()) {
    //                    Object object = channelFuture.getNow();
    //                } else if (!channelFuture.isSuccess()) {
    //                    // 发生问题,需要捕获异常。
    //                    // 子线程可以捕获异步任务的异常
    //                    Throwable cause = channelFuture.cause();
    //                    throw new RuntimeException(cause);
    //                }
    
                /**
                 * ---------------------------异步策略---------------------------
                 */
                CompletableFuture<Object> completableFuture = new CompletableFuture<>();
                // TODO 需要将completableFuture暴露出去
                channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
                    // 当前的promise返回的结果是,writeAndFlush的返回结果
                    // 一旦数据被写出去,这个promise也就结束了
    //                    if (promise.isDone()) {
    //                        completableFuture.complete(promise.getNow());
    //                    }
    
                    // 只需要处理异常
                    if (!promise.isSuccess()) {
                        completableFuture.completeExceptionally(promise.cause());
                    }
                });
    
                return completableFuture.get(3, TimeUnit.SECONDS);
            }
        });
    
        return (T) helloProxy;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    d.完成基础通信

    1.在DcyRpcBootstrap类的start()方法中添加 handler:SimpleChannelInboundHandler

    /**
     * 启动netty服务
     */
    public void start() {
        // 1.创建EventLoopGroup,老板只负责处理请求,之后会将请求分发给worker,1比2的比例
        NioEventLoopGroup boss = new NioEventLoopGroup(2);
        NioEventLoopGroup worker = new NioEventLoopGroup(10);
    
        try{
            // 2.服务器端启动辅助对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            // 3.配置服务器
            serverBootstrap = serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // TODO 核心内容,需要添加很多入栈和出栈的handler
                            socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
                                    ByteBuf byteBuf = (ByteBuf) msg;
                                    log.info("byteBuf --> {}", byteBuf.toString(Charset.defaultCharset()));
    
                                    channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer("dcyrpc--hello".getBytes()));
                                }
                            });
                        }
                    });
    
            // 4.绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
    
            // 5.阻塞操作
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                boss.shutdownGracefully().sync();
                worker.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    2.在NettyBootstrapInitializer类的初始化Netty的静态代码块中添加 handler:SimpleChannelInboundHandler

    static {
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                // 选择初始化一个什么样的channel
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                                log.info("msg --> {}", msg.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    e.异步获取服务器的返回结果

    1.在DcyRpcBootstrap类的中添加一个全局的对外挂起的 completableFuture

    // 定义全局的对外挂起的 completableFuture
    public static final Map<Long, CompletableFuture<Object>> PENDING_REQUEST = new HashMap<>(128);
    
    • 1
    • 2

    2.在ReferenceConfig类中的get()方法完成对,completableFuture暴露出去

    /**
     * 代理设计模式,生成一个API接口的代理对象
     * @return 代理对象
     */
    public T get() {
        // 使用动态代理完成工作
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class[] classes = new Class[]{interfaceRef};
    
        // 使用动态代理生成代理对象
        Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 调用sayHi()方法,事实上会走进这个代码段当中
                // 已经知道method(具体的方法),args(参数列表)
                log.info("method-->{}", method.getName());
                log.info("args-->{}", args);
    
                // 1.发现服务,从注册中心,寻找一个可用的服务
                // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
                InetSocketAddress address = registry.lookup(interfaceRef.getName());
                if (log.isInfoEnabled()){
                    log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
                }
                
                // 1.从全局缓存中获取一个通道
                Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
    
                if (channel == null) {
                    // 使用addListener执行异步操作
                    CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                    NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                        if (promise.isDone()) {
                            // 异步的,已经完成
                            log.info("已经和【{}】成功建立连接。", address);
                            channelFuture.complete(promise.channel());
                        } else if (!promise.isSuccess()) {
                            channelFuture.completeExceptionally(promise.cause());
                        }
                    });
    
                    // 阻塞获取channel
                    channel = channelFuture.get(3, TimeUnit.SECONDS);
    
                    // 缓存channel
                    DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
                }
                if (channel == null){
                    log.error("获取或建立与【{}】通道时发生了异常。", address);
                    throw new NetworkException("获取通道时发生了异常。");
                }
    
                CompletableFuture<Object> completableFuture = new CompletableFuture<>();
                // TODO 需要将completableFuture暴露出去
                DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);
    
                channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
    
                    // 只需要处理异常
                    if (!promise.isSuccess()) {
                        completableFuture.completeExceptionally(promise.cause());
                    }
                });
    
                // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
                // 在Netty的pipeline中最终的handler的处理结果 调用complete
                return completableFuture.get(10, TimeUnit.SECONDS);
            }
        });
    
        return (T) helloProxy;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    3.在NettyBootstrapInitializer类的初始化Netty的静态代码块中:寻找与之匹配的待处理 completeFuture

    tatic {
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                // 选择初始化一个什么样的channel
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                                // 异步
                                // 服务提供方,给予的结果
                                String result = msg.toString(Charset.defaultCharset());
                                // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
                                CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
                                completableFuture.complete(result);
                            }
                        });
                    }
                });
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    f.调整代码

    在core模块com.dcyrpc下创建proxy.handler

    在handler包下创建RpcConsumerInvocationHandler类,实现InvocationHandler接口

    • ReferenceConfig类下的InvocationHandler匿名内部类拷贝到该RpcConsumerInvocationHandler类中
    /**
     * 该类封装了客户端通信的基础逻辑,每一个代理对象的远程调用过程都封装在invoke方法中
     * 1.发现可用服务
     * 2.建立连接
     * 3.发送请求
     * 4.得到结果
     */
    @Slf4j
    public class RpcConsumerInvocationHandler implements InvocationHandler {
    
        // 接口
        private Class<?> interfaceRef;
    
        // 注册中心
        private Registry registry;
    
        public RpcConsumerInvocationHandler(Class<?> interfaceRef, Registry registry) {
            this.interfaceRef = interfaceRef;
            this.registry = registry;
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
            // 1.发现服务,从注册中心,寻找一个可用的服务
            //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
            }
    
            // 2.尝试获取一个可用的通道
            Channel channel = getAvailableChannel(address);
            if (log.isInfoEnabled()){
                log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
            }
    
            /**
             * ---------------------------封装报文---------------------------
             */
            // 3.封装报文
    
            /**
             * ---------------------------同步策略---------------------------
             */
    //                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
    //                // get()阻塞获取结果
    //                // getNow()获取当前的结果,如果未处理完成,返回null
    //                if (channelFuture.isDone()) {
    //                    Object object = channelFuture.getNow();
    //                } else if (!channelFuture.isSuccess()) {
    //                    // 发生问题,需要捕获异常。
    //                    // 子线程可以捕获异步任务的异常
    //                    Throwable cause = channelFuture.cause();
    //                    throw new RuntimeException(cause);
    //                }
    
            /**
             * ---------------------------异步策略---------------------------
             */
    
            // 4.写出报文
            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            // 将completableFuture暴露出去
            DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);
    
            channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
                // 需要处理异常
                if (!promise.isSuccess()) {
                    completableFuture.completeExceptionally(promise.cause());
                }
            });
    
            // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
            // 在Netty的pipeline中最终的handler的处理结果 调用complete
            // 5.获得响应的结果
            return completableFuture.get(10, TimeUnit.SECONDS);
        }
    
        /**
         * 根据地址获取一个可用的通道
         * @param address
         * @return
         */
        private Channel getAvailableChannel(InetSocketAddress address) {
            // 1.尝试从缓存中获取通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);
    
            // 2.拿不到就建立新连接
            if (channel == null) {
    
                // 使用addListener执行异步操作
                CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                    if (promise.isDone()) {
                        // 异步的,已经完成
                        log.info("已经和【{}】成功建立连接。", address);
                        channelFuture.complete(promise.channel());
                    } else if (!promise.isSuccess()) {
                        channelFuture.completeExceptionally(promise.cause());
                    }
                });
    
                // 阻塞获取channel
                try {
                    channel = channelFuture.get(3, TimeUnit.SECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    log.error("获取通道时发生异常。{}", e);
                    throw new DiscoveryException(e);
                }
    
                // 缓存channel
                DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
            }
    
            // 3.建立连接失败
            if (channel == null){
                log.error("获取或建立与【{}】通道时发生了异常。", address);
                throw new NetworkException("获取通道时发生了异常。");
            }
    
            // 4.返回通道
            return channel;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125

    ReferenceConfig类的get()方法被修改为:让整个代码可读性更高,更简洁

    /**
     * 代理设计模式,生成一个API接口的代理对象
     * @return 代理对象
     */
    public T get() {
        // 使用动态代理完成工作
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class<T>[] classes = new Class[]{interfaceRef};
        InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, registry);
    
        // 使用动态代理生成代理对象
        Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);
    
        return (T) helloProxy;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    g.处理handler (优化)

    在core模块com.dcyrpc下创建channelhandler.handler

    channelhandler.handler包下创建MySimpleChannelInboundHandler类:处理响应结果

    继承 SimpleChannelInboundHandler,重写read0方法

    拷贝NettyBootstrapInitializer静态代码块中的匿名内部类SimpleChannelInboundHandler的代码

    public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // 异步
            // 服务提供方,给予的结果
            String result = msg.toString(Charset.defaultCharset());
            // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
            CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
            completableFuture.complete(result);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    channelhandler包下创建ConsumerChannelInitializer,继承 ChannelInitializer,重写initChannel方法

    拷贝NettyBootstrapInitializer静态代码块中的匿名内部类ChannelInitializer的代码

    public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new MySimpleChannelInboundHandler());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    NettyBootstrapInitializer类的初始化Netty的静态代码块中:优化handler的匿名内部类

    static {
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                // 选择初始化一个什么样的channel
                .channel(NioSocketChannel.class)
                .handler(new ConsumerChannelInitializer());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  • 相关阅读:
    Camtasia Studio 2024软件最新版下载【安装详细图文教程】
    如何使用环境变量运行bat脚本(开启数据库db)
    安装部署KubeSphere管理kubernetes
    Python3学习
    【必读】从零开始,一步步教你安装nginx,搭建个人博客网站!
    【无标题】
    使用 Pandera 的 PySpark 应用程序的数据验证
    【送书活动】强势挑战Java,Kotlin杀回TIOBE榜单Top 20!学Kotlin看哪些书?
    jave image to ascii
    【C++笔记】AVL树的模拟实现
  • 原文地址:https://blog.csdn.net/weixin_46926189/article/details/132692361