• 手写RPC框架(六)整合Netty


    手写RPC框架(六)整合Netty

    Netty简介:

    Netty是一个基于NIO的,提供异步,事件驱动的网络应用工具,具有高性能高可靠性等特点。

    使用传统的Socket来进行网络通信,服务端每一个连接都要新建一个线程,清楚处理完成后通过输出流返回给客户端。而Netty通过NIO的方式,服务端实现为一个请求一个线程,客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才会启动一个线程进行处理。

    这次我们通过Netty来实现网络通信,替代Socket,提高框架性能。

    1. 引入Netty

      复制代码
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.73.Final</version> </dependency>
    2. netty服务端

      复制代码
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      public class NettyServer{ private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private String serverAddress; //启动地址 private int serverPort; //启动端口 private EventLoopGroup boss = null; private EventLoopGroup worker = null; public NettyServer(String serverAddress, int serverPort) { this.serverAddress = serverAddress; this.serverPort = serverPort; } public void startNettyServer() throws Exception { //netty调度模块,负责接收请求 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); //netty调度模块,负责处理请求 NioEventLoopGroup workGroup = new NioEventLoopGroup(); //启动类 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //传输数据的channel ChannelPipeline pipeline = ch.pipeline(); //解码器 pipeline.addLast(new StringDecoder()); //编码器 pipeline.addLast(new StringEncoder()); //业务逻辑 pipeline.addLast(new RpcServerHandler()); } }); try { //端口绑定 ChannelFuture sync = bootstrap.bind(serverAddress, serverPort).sync(); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); logger.info("关闭Netty"); } }

      在这里通过startServer方法会启动netty服务端,当有请求时,会进入RpcServerHandler()方法中进行处理。

      复制代码
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      @ChannelHandler.Sharable public class RpcServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { //msg为接收到的请求信息 RpcResponse response = new RpcResponse(); //将请求信息解码 RpcRegisterEntity rpcRegisterEntity=JSON.parseObject(msg,RpcRegisterEntity.class); //通过反射得到远程调用的类并执行该方法 Object result = invoke(rpcRegisterEntity); try { //返回体 response.setResult(result); } catch (Exception exception) { exception.printStackTrace(); response.setException(exception); } //写入返回数据 ctx.writeAndFlush(JSON.toJSONString(response)); } private Object invoke(RpcRegisterEntity entity) { try { //接口名 String interfaceName = entity.getServiceImplClassFullName(); // String implClassName = RegisterCenter.getProviderData(interfaceName); //类名 String implClassName = entity.getServiceImplClassFullName(); Class<?> clazz = Class.forName(implClassName); String methodName = entity.getMethodName(); Class<?>[] parameterTypes = entity.getParameterTypes(); Object[] parameters = entity.getParameters(); Method method = clazz.getMethod(methodName, parameterTypes); //通过反射得到结果 return method.invoke(clazz.newInstance(), parameters); } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) { e.printStackTrace(); return e; } } //当Channel处理于活动状态时被调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // System.out.println(ctx.channel().remoteAddress().toString()); super.channelActive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // System.out.println(JSON.toJSONString(cause)); super.exceptionCaught(ctx, cause); } }
    3. netty消费端

      复制代码
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      public class RpcClient{ private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); private EventLoopGroup group; private Channel channel; private String ip; private int port; private RpcConsumerHandler rpcConsumerHandler=new RpcConsumerHandler(); private ExecutorService executorService = Executors.newCachedThreadPool(); public RpcClient(String ip, int port) { this.ip = ip; this.port = port; initClient(); } public void initClient() { try { //1.创建线程组 group = new NioEventLoopGroup(); //2.创建启动助手 Bootstrap bootstrap = new Bootstrap(); //3.设置参数 bootstrap.group(group) //传输数据用的channel .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); //添加客户端处理类 pipeline.addLast(rpcConsumerHandler); } }); //4.连接Netty服务端 connect(bootstrap, ip, port, 5); } catch (Exception exception) { exception.printStackTrace(); if (channel != null) { channel.close(); } if (group != null) { group.shutdownGracefully(); } } } private void connect(Bootstrap bootstrap, String host, int port, int retry) { ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { logger.info("连接服务端成功"); } else if (retry == 0) { logger.error("重试次数已用完,放弃连接"); } else { //第几次重连: int order = (5 - retry) + 1; //本次重连的间隔 int delay = 1 << order; logger.error("{} : 连接失败,第 {} 重连....", new Date(), order); bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS); } }); channel = channelFuture.channel(); } /** * 提供给调用者主动关闭资源的方法 */ public void close() { if (channel != null) { channel.close(); } if (group != null) { group.shutdownGracefully(); } } /** * 提供消息发送的方法 */ public Object send(String msg) throws ExecutionException, InterruptedException { rpcConsumerHandler.setRequestMsg(msg); Future submit = executorService.submit(rpcConsumerHandler); return submit.get(); } public void destroy() throws Exception { if (channel != null) { channel.close(); } if (group != null) { group.shutdownGracefully(); } } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }

      当创建好客户端时,发送请求时数据会交由rpcConsumerHandler处理,

      复制代码
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> implements Callable { ChannelHandlerContext context; //发送的消息 String requestMsg; //服务端返回的消息 String responseMsg; public void setRequestMsg(String requestMsg) { this.requestMsg = requestMsg; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { context = ctx; } //接收由服务端返回的数据 @Override protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println("客户端结果:"+msg); responseMsg = msg; //唤醒等待的线程 notify(); } //发送数据 @Override public synchronized Object call() throws Exception { //消息发送 context.writeAndFlush(requestMsg); //线程等待 wait(); return responseMsg; } }
    4. 调用

      复制代码
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      //新建连接 RpcClient rpcClient = getClient(rpcRegisterEntity.getHost(), rpcRegisterEntity.getPort()); //发送数据 Object responseMsg = rpcClient.send(JSON.toJSONString(rpcRegisterEntity)); //解析返回的数据 RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);
  • 相关阅读:
    光波导k域布局可视化(“神奇的圆环”)
    Swagger文档生成操作SOP
    代理模式(静态代理,动态代理,cglib代理)
    江西涉农县开展一对一指导服务 国稻种芯:重点保障水稻生长
    Python中,我们可以使用pandas和numpy库对Excel数据进行预处理,包括读取数据、数据清洗、异常值剔除等
    MySQL(进阶篇--InnoDB引擎)
    m1系列芯片aarch64架构使用docker-compose安装seata
    Docker实战:docker compose 搭建Rocketmq
    C语言-指针详解速成
    突破编程_C++_高级教程(单元测试与 Google Test 教程)
  • 原文地址:https://www.cnblogs.com/iven98/p/15910961.html