• echoServer回显服务器


    NettyEchoServer回显服务器的服务器端

    前面实现过Java NIO版本的EchoServer回显服务器,在学习了Netty后,这里为大家设计和实现一个Netty版本的EchoServer回显服务器。功能很简单:从服务器端读取客户端输入的数据,然后将数据直接回显到Console控制台。

    首先是服务器端的实践案例,目标为掌握以下知识:

    • 服务器端ServerBootstrap的装配和使用。
    • 服务器端NettyEchoServerHandler入站处理器的channelRead入站处理方法的编写。
    • Netty的ByteBuf缓冲区的读取、写入,以及ByteBuf的引用计数的查看。

    服务器端的ServerBootstrap装配和启动过程,它的代码如下:

            package com.crazymakercircle.netty.echoServer;
            //...
            public class NettyEchoServer {
                //....
                public void runServer() {
                    //创建反应器线程组
                    EventLoopGroupbossLoopGroup = new NioEventLoopGroup(1);
                    EventLoopGroupworkerLoopGroup = new NioEventLoopGroup();
                    //....省略设置: 1 反应器线程组/2 通道类型/4 通道选项等
                    //5 装配子通道流水线
                    b.childHandler(new ChannelInitializer<SocketChannel>() {
                        //有连接到达时会创建一个通道
                        protected void initChannel(SocketChannelch) throws Exception {
                            // 流水线管理子通道中的Handler业务处理器
                            // 向子通道流水线添加一个Handler业务处理器
                            ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
                        }
                    });
                    //.... 省略启动、等待、从容关闭(或称为优雅关闭)等
                }
            //…省略main方法
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    共享NettyEchoServerHandler处理器

    Netty版本的EchoServerHandler回显服务器处理器,继承自ChannelInboundHandlerAdapter,然后覆盖了channelRead方法,这个方法在可读IO事件到来时,被流水线回调。这个回显服务器处理器的逻辑分为两步:

    第一步,从channelRead方法的msg参数。

    第二步,调用ctx.channel().writeAndFlush() 把数据写回客户端。

    先看第一步,读取从对端输入的数据。channelRead方法的msg参数的形参类型不是ByteBuf,而是Object,为什么呢?实际上,msg的形参类型是由流水线的上一站决定的。大家知道,入站处理的流程是:Netty读取底层的二进制数据,填充到msg时,msg是ByteBuf类型,然后经过流水线,传入到第一个入站处理器;每一个节点处理完后,将自己的处理结果(类型不一定是ByteBuf)作为msg参数,不断向后传递。因此,msg参数的形参类型,必须是Object类型。不过,可以肯定的是,第一个入站处理器的channelRead方法的msg实参类型,绝对是ByteBuf类型,因为它是Netty读取到的ByteBuf数据包。在本实例中,NettyEchoServerHandler就是第一个业务处理器,虽然msg的实参类型是Object,但是实际类型就是ByteBuf,所以可以强制转成ByteBuf类型。

    另外,从Netty 4.1开始,ByteBuf的默认类型是Direct ByteBuf直接内存。大家知道,Java不能直接访问Direct ByteBuf内部的数据,必须先通过getBytes、readBytes等方法,将数据读入Java数组中,然后才能继续在数组中进行处理。

    第二步将数据写回客户端。这一步很简单,直接复用前面的msg实例即可。不过要注意,如果上一步使用的readBytes,那么这一步就不能直接将msg写回了,因为数据已经被readBytes读完了。幸好,上一步调用的读数据方法是getBytes,它不影响ByteBuf的数据指针,因此可以继续使用。这一步调用了ctx.writeAndFlush,把msg数据写回客户端。也可调用ctx.channel().writeAndFlush()方法。这两个方法在这里的效果是一样的,因为这个流水线上没有任何的出站处理器。

    服务器端的入站处理器NettyEchoServerHandler的代码如下:

            package com.crazymakercircle.netty.echoServer;
            //...
            @ChannelHandler.Sharable
            public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {
                public static final NettyEchoServerHandler INSTANCE
                              = new NettyEchoServerHandler();
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws
        Exception {
                  ByteBuf in = (ByteBuf) msg;
                  Logger.info("msg type: " + (in.hasArray()? "堆内存":"直接内存"));
                  int len = in.readableBytes();
                  byte[] arr = new byte[len];
                  in.getBytes(0, arr);
                  Logger.info("server received: " + new String(arr, "UTF-8"));
    
                  Logger.info("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt());
                  //写回数据,异步任务
                  ChannelFuture f = ctx.writeAndFlush(msg);
                  f.addListener((ChannelFuturefutureListener) -> {
                      Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
                  });
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    这里的NettyEchoServerHandler在前面加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。什么叫作Handler共享呢?就是多个通道的流水线可以加入同一个Handler业务处理器实例。而这种操作,Netty默认是不允许的。但是,很多应用场景需要Handler业务处理器实例能共享。例如,一个服务器处理十万以上的通道,如果一个通道都新建很多重复的Handler实例,就需要上十万以上重复的Handler实例,这就会浪费很多宝贵的空间,降低了服务器的性能。所以,如果在Handler实例中,没有与特定通道强相关的数据或者状态,建议设计成共享的模式:在前面加了一个Netty注解:@ChannelHandler.Sharable。反过来,如果没有加@ChannelHandler.Sharable注解,试图将同一个Handler实例添加到多个ChannelPipeline通道流水线时,Netty将会抛出异常。

    还有一个隐藏比较深的重点:同一个通道上的所有业务处理器,只能被同一个线程处理。所以,不是@Sharable共享类型的业务处理器,在线程的层面是安全的,不需要进行线程的同步控制。而不同的通道,可能绑定到多个不同的EventLoop反应器线程。因此,加上了@ChannelHandler.Sharable注解后的共享业务处理器的实例,可能被多个线程并发执行。这样,就会导致一个结果:@Sharable共享实例不是线程层面安全的。显而易见,@Sharable共享的业务处理器,如果需要操作的数据不仅仅是局部变量,则需要进行线程的同步控制,以保证操作是线程层面安全的。

    如何判断一个Handler是否为@Sharable共享呢?ChannelHandlerAdapter提供了实用方法——isSharable()。如果其对应的实现加上了@Sharable注解,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline通道流水线中。

    NettyEchoServerHandler回显服务器处理器没有保存与任何通道连接相关的数据,也没有内部的其他数据需要保存。所以,它不光是可以用来共享,而且不需要做任何的同步控制。在这里,为它加上了@Sharable注解表示可以共享,更进一步,这里还设计了一个通用的INSTANCE静态实例,所有的通道直接使用这个INSTANCE实例即可。

    最后,揭示一个比较奇怪的问题。

    运行程序,大家会看到在写入客户端的工作完成后,ByteBuf的引用计数的值变成为0。在上面的代码中,既没有自动释放的代码,也没有手动释放的代码,为什么,引用计数没有了呢?这个问题,比较有意思,留给大家自行思考。答案,就藏在上文之中,如果确实想不出来也没有找到,可以来疯狂创客圈社群,和大家一起交流,探讨最佳答案。

    NettyEchoClient客户端代码

    其次是客户端的实践案例,目标为掌握以下知识:

    • 客户端Bootstrap的装配和使用。
    • 客户端NettyEchoClientHandler入站处理器中,接受回写的数据,并且释放内存。
    • 有多种方式用于释放ByteBuf,包括:自动释放、手动释放。

    客户端Bootstrap的装配和使用,代码如下:

            package com.crazymakercircle.netty.echoServer;
            //...
            public class NettyEchoClient {
    
                private int serverPort;
                private String serverIp;
                Bootstrap b = new Bootstrap();
    
                public NettyEchoClient(String ip, int port) {
                  this.serverPort = port;
                  this.serverIp = ip;
                }
    
                public void runClient() {
                  //创建反应器线程组
                  EventLoopGroupworkerLoopGroup = new NioEventLoopGroup();
    
                  try {
                      //1 设置反应器 线程组
                      b.group(workerLoopGroup);
                      //2 设置nio类型的通道
                      b.channel(NioSocketChannel.class);
                      //3 设置监听端口
                      b.remoteAddress(serverIp, serverPort);
                      //4 设置通道的参数
                      b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
                      //5 装配子通道流水线
                      b.handler(new ChannelInitializer<SocketChannel>() {
                          //有连接到达时会创建一个通道
                          protected void initChannel(SocketChannelch) throws Exception {
                            // 流水线管理子通道中的Handler业务处理器
                            // 向子通道流水线添加一个Handler业务处理器
                            ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
                          }
                      });
                      ChannelFuture f = b.connect();
                      f.addListener((ChannelFuturefutureListener) ->
                      {
                          if (futureListener.isSuccess()) {
                            Logger.info("EchoClient客户端连接成功!");
                          } else {
                            Logger.info("EchoClient客户端连接失败!");
                          }
                      });
    
                      // 阻塞,直到连接成功
                      f.sync();
                      Channel channel = f.channel();
                      Scanner scanner = new Scanner(System.in);
                      Print.tcfo("请输入发送内容:");
                      while (scanner.hasNext()) {
                          //获取输入的内容
                          String next = scanner.next();
                          byte[] bytes = (Dateutil.getNow() + " >>"
                                        + next).getBytes("UTF-8");
                          //发送ByteBuf
                          ByteBuf buffer = channel.alloc().buffer();
                          buffer.writeBytes(bytes);
                          channel.writeAndFlush(buffer);
                          Print.tcfo("请输入发送内容:");
                      }
                    } catch (Exception e) {
                       e.printStackTrace();
                    } finally {
                       // 从容关闭EventLoopGroup,
                       // 释放掉所有资源,包括创建的线程
                       workerLoopGroup.shutdownGracefully();
                    }
                }
                //…省略main方法
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    NettyEchoClientHandler处理器

    客户端的流水线不是空的,还需要装配一个回显处理器,功能很简单,就是接收服务器写过来的数据包,显示在Console控制台上。代码如下:

            package com.crazymakercircle.netty.echoServer;
            import com.crazymakercircle.util.Logger;
            import io.netty.buffer.ByteBuf;
            import io.netty.channel.ChannelHandler;
            import io.netty.channel.ChannelHandlerContext;
            import io.netty.channel.ChannelInboundHandlerAdapter;
    
            /**
             * create by尼恩 @疯狂创客圈
             **/
            @ChannelHandler.Sharable
            public class NettyEchoClientHandler extends ChannelInboundHandlerAdapter {
                public static final NettyEchoClientHandler INSTANCE
                              = new NettyEchoClientHandler();
                /**
                * 出站处理方法
                *
                * @param ctx上下文
                * @param msg入站数据包
                * @throws Exception可能抛出的异常
                */
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws
        Exception {
                    ByteBuf byteBuf = (ByteBuf) msg;
                    int len = byteBuf.readableBytes();
                    byte[] arr = new byte[len];
                    byteBuf.getBytes(0, arr);
                    Logger.info("client received: " + new String(arr, "UTF-8"));
    
                    // 释放ByteBuf的两种方法
                    // 方法一:手动释放ByteBuf
                    byteBuf.release();
    
                    //方法二:调用父类的入站方法,将msg向后传递
                    // super.channelRead(ctx, msg);
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    通过代码可以看到,从服务器端发送过来的ByteBuf,被手动方式强制释放掉了。当然,也可以使用前面介绍的自动释放方式来释放ByteBuf。

  • 相关阅读:
    Xshell的下载与安装
    10分钟Window本地部署stable diffusion AI绘图【入门教程】
    DeepSpeed
    @AspectJ注解配置切面编程(注解方式)
    python神经网络编程 豆瓣,python神经网络库 keras
    驱动开发:PE导出函数与RVA转换
    在微信公众号怎么实现全民经纪人功能
    C#教程9:C#方法(Methods)
    如何在用pip配置文件设置HTTP爬虫IP
    部署LVS-DR群集 待续。。
  • 原文地址:https://blog.csdn.net/yitian881112/article/details/127656940