• 用netty实现简易rpc


    rpc介绍:

    RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

    rpc调用流程:

    在这里插入图片描述

    代码:

    public interface HelloService {
    
        String hello(String msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    public class HelloServiceImpl implements HelloService {
        @Override
        public String hello(String msg) {
            System.out.println("读取到客户端信息:" + msg);
            if (msg != null) {
                return "已收到客户端信息【" + msg + "】";
            } else {
                return "已收到客户端信息";
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 接收客户端发送的信息,并调用服务
            // 规定每次发送信息时 都以"HelloService#hello#开头“, 其中最后一个#后面的为参数
            String message = msg.toString();
            System.out.println("最初消息:" + message);
            if (message.startsWith("HelloService#hello#")) {
    
                String arg = message.substring(19);
                System.out.println("接收的参数:" + arg);
                String result = new HelloServiceImpl().hello(arg);
                ctx.writeAndFlush(result);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 出现异常时关闭通道
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    public class NettyServer {
    
        /**
         * 启动服务
         *
         * @param host 主机地址
         * @param port 线程端口
         */
        private static void startServer0(String host, int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {  // workerGroup
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                // String的编码解码器
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new NettyServerHandler()); // 自定义业务处理器
                            }
                        });
                // 绑定端口并启动
                ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
                System.out.println("服务器启动:");
                // 监听关闭
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void startServer(String host, int port) {
            startServer0(host, port);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    
        private ChannelHandlerContext channelHandlerContext;
        private String result; // 服务端返回的数据
        private String param; // 客户端调用方法时传入的参数
    
        /**
         * 与服务器建立连接时被调用
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive 被调用");
            this.channelHandlerContext = ctx;
        }
    
        /**
         * 收到服务器数据时被调用
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(" channelRead 被调用  ");
            result = msg.toString();
            // 唤醒等待的线程。
            notifyAll();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        /**
         * 当某个线程执行NettyClientHandler任务时,会调用get()方法,get()方法会阻塞当前线程,
         * 直到任务执行完成并返回结果或抛出异常。
         *
         * @return
         * @throws Exception
         */
        @Override
        public synchronized Object call() throws Exception {
            System.out.println("call--1  ");
            channelHandlerContext.writeAndFlush(param);
    //        TimeUnit.MILLISECONDS.sleep(5 * 1000);
            wait(); // 等待channelRead()方法的调用
            System.out.println("call--2  ");
            return result;
        }
    
        /**
         * 设置参数
         *
         * @param param
         */
        public void setParam(String param) {
            this.param = param;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    public class NettyClient {
    
        // 设置为cpu核数个线程
        private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        private static NettyClientHandler nettyClientHandler;
    
    
        private static void initClient() {
            nettyClientHandler = new NettyClientHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true) // tcp无延迟
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(nettyClientHandler);
                        }
                    });
            try {
                ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    
        }
    
        public Object getBean(final Class<?> serviceClass, final String providerName) {
            /**
             * newProxyInstance()方法的第三个参数为实现了java.lang.reflect.InvocationHandler接口的类,
             */
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {
                if (nettyClientHandler == null) {
                    System.out.println("nettyClientHandler 被初始化");
                    initClient();
                }
    
                System.out.println("进入到匿名内容类");
                nettyClientHandler.setParam(providerName + args[0]);
                return executorService.submit(nettyClientHandler).get();
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    public class ServerBootStrapService {
        public static void main(String[] args) {
            NettyServer.startServer("127.0.0.1",7000);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    public class ConsumerBootStrap {
    
        public final static String ProviderName = "HelloService#hello#";
    
        public static void main(String[] args) throws InterruptedException {
            NettyClient nettyClient = new NettyClient();
    
            /**
             * helloService为代理对象
             */
            HelloService helloService = (HelloService) nettyClient.getBean(HelloService.class, ProviderName);
            for (int i = 0; ; ) {
                TimeUnit.MILLISECONDS.sleep(2000);
    
                /**
                 * 当helloService调用hello()方法时,会进入到 实现了InvocationHandler类中的invoke()方法,也就是这个匿名内部类:(proxy, method, args) -> {
                 *             if (nettyClientHandler == null) {
                 *                 initClient();
                 *             }
                 *             nettyClientHandler.setParam(providerName + args[0]);
                 *             return executorService.submit(nettyClientHandler).get();
                 */
                helloService.hello("哈喽,哈喽: " + i++);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    gitee地址:https://gitee.com/okgoodfine/rpc-netty

  • 相关阅读:
    如何处理系统漏洞
    D. Grid-00100(构造一个行列均匀的方阵)
    哈希表的前置知识---二叉搜索树
    Netty网络框架学习笔记-15(ChannelPipeline 调度 handler分析)
    Linux vi/vim
    第三期:那些年,我们一起经历过的链表中的浪漫
    STC15单片机-整合代码,完成软件设计
    2022 蔚来杯 牛客多校 后缀自动机(SAM) 马拉车(Manacher)
    IDEA用maven和Spring模板分别创建SpringMVC项目
    神经网络和决策树,神经网络 选股
  • 原文地址:https://blog.csdn.net/qq_51134950/article/details/133706283