• Netty02 - 回显服务器实例


    目的:

            通过案例学习Netty的NIO通信

    案例功能:

            1, 客户端发送数据到服务端, 服务端打印;

            2, 服务端回传收到的数据给客户端

    代码中写了详细注释, 博文不具体阐述, 用作入门, 后续专栏文章会详细介绍Netty的每一个组件, Netty的前两篇博文权当和Netty做个HelloWorld.

    服务端代码如下:

    1. package netty.echo;
    2. import io.netty.bootstrap.ServerBootstrap;
    3. import io.netty.buffer.ByteBuf;
    4. import io.netty.channel.*;
    5. import io.netty.channel.nio.NioEventLoopGroup;
    6. import io.netty.channel.socket.SocketChannel;
    7. import io.netty.channel.socket.nio.NioServerSocketChannel;
    8. /**
    9. * Instruction:
    10. * Author:@author MaLi
    11. */
    12. public class EchoServer {
    13. public void startServer() {
    14. //1, 创建启动器
    15. ServerBootstrap serverBootstrap = new ServerBootstrap();
    16. //2, 创建并设置事件轮询组
    17. // 用于接收新连接
    18. NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    19. // 用于客户端与服务端的IO传输, 线程数默认为机器CPU核数*2
    20. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    21. // 为父子通道设置对应的轮询组 - 本质是为通道设置Reactor的选择器角色
    22. serverBootstrap.group(bossGroup, workerGroup);
    23. //3, 设置Channel通信类型 - 这里使用NIO通信类型
    24. serverBootstrap.channel(NioServerSocketChannel.class);
    25. // 设置该通信为长链接
    26. serverBootstrap.option(ChannelOption.SO_KEEPALIVE, true);
    27. //4, 为子通道设置Handler流水线
    28. EchoServerHandler echoServerHandler = new EchoServerHandler();
    29. serverBootstrap.childHandler(new ChannelInitializer() {
    30. @Override
    31. protected void initChannel(SocketChannel ch) throws Exception {
    32. ch.pipeline().addLast(echoServerHandler);
    33. }
    34. });
    35. //5, 启动并绑定Channel到服务器端口对应端口(这里是8888), sync()作用, 等待绑定完成
    36. try {
    37. ChannelFuture sync = serverBootstrap.bind(8888).sync();
    38. //6, 等待通道的关闭回调, (Netty官方案例有英文注释, 这里只是优雅的关闭方式, 实际在当前代码中并没有关闭事件)
    39. // 作用: 阻塞在这里一直等待新连接事件发生
    40. sync.channel().closeFuture().sync();
    41. } catch (InterruptedException e) {
    42. throw new RuntimeException(e);
    43. } finally {
    44. //7, 关闭事件轮询组
    45. workerGroup.shutdownGracefully();
    46. bossGroup.shutdownGracefully();
    47. }
    48. }
    49. //服务端处理IO事件的Handler处理器 - 基本Netty的开发都是开发Handler
    50. @ChannelHandler.Sharable //代表多个Channel可以共享这个Handler(只要没有并发安全就可以被共享)
    51. public static class EchoServerHandler extends ChannelInboundHandlerAdapter {
    52. @Override
    53. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    54. //收到数据
    55. ByteBuf in = (ByteBuf) msg;
    56. //如果返回false则代表不是堆内分配的存储 - Netty4.1+使用直接内存(OS管理的物理内存)
    57. System.out.println("是否堆内存" + in.hasArray());
    58. //因为Netty4.1+默认是使用直接内存的buffer来存储Channel读到的数据, Java要进行处理这些数据, 先要拷贝到自己的堆中
    59. //所以这里先建立一个对应长度的堆内数组
    60. byte[] arr = new byte[in.readableBytes()];
    61. // 读取数据到数组arr中
    62. in.getBytes(0, arr);
    63. System.out.println("Data from Client: " + new String(arr));
    64. System.out.println("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt()); //测试一下当前缓冲区的引用数量
    65. //写数据回给客户端
    66. ChannelFuture channelFuture = ctx.writeAndFlush(msg);
    67. channelFuture.addListener((ChannelFuture futureListener) -> {
    68. System.out.println("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
    69. });
    70. //传递到下一个Handler - 当前案例只有一个用户自定义的Handler, 其实不写也无所谓.
    71. super.channelRead(ctx, msg);
    72. }
    73. }
    74. public static void main(String[] args) {
    75. EchoServer echoServer = new EchoServer();
    76. echoServer.startServer();
    77. }
    78. }

    客户端代码如下

    1. package netty.echo;
    2. import io.netty.bootstrap.Bootstrap;
    3. import io.netty.buffer.ByteBuf;
    4. import io.netty.buffer.PooledByteBufAllocator;
    5. import io.netty.channel.*;
    6. import io.netty.channel.nio.NioEventLoopGroup;
    7. import io.netty.channel.socket.SocketChannel;
    8. import io.netty.channel.socket.nio.NioSocketChannel;
    9. import java.util.Scanner;
    10. /**
    11. * Instruction:
    12. * Author:@author MaLi
    13. */
    14. public class EchoClient {
    15. private int serverPort;
    16. private String serverIp;
    17. public EchoClient(int serverPort, String serverIp) {
    18. this.serverPort = serverPort;
    19. this.serverIp = serverIp;
    20. }
    21. public void startClient() {
    22. //Step1: 创建组装器 - 用于配置客户端的 - 事件轮询器 - 通道 - 处理器
    23. Bootstrap bootstrap = new Bootstrap();
    24. //Step2: 创建轮询器 - 封装了Selector, 用于选择数据传输事件
    25. NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup();
    26. bootstrap.group(workerLoopGroup);
    27. //Step3: 设置通道类型 - 这里使用了NIOSocket
    28. bootstrap.channel(NioSocketChannel.class);
    29. bootstrap.remoteAddress(serverIp, serverPort);
    30. bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    31. //Step4: 配置事件处理器 - 有数据IO的时候, 用于处理该IO的处理器
    32. bootstrap.handler(new ChannelInitializer() {
    33. @Override
    34. protected void initChannel(SocketChannel ch) throws Exception {
    35. ch.pipeline().addLast(new EchoClientHandler());
    36. }
    37. });
    38. //Step5: 连接服务器, 并配置了一个监听器, 用于在连接完成后, 回调lambda函数
    39. ChannelFuture channelFuture = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
    40. if (futureListener.isSuccess()) {
    41. System.out.println("connection successful");
    42. } else {
    43. System.out.println("connection failure");
    44. }
    45. });
    46. try {
    47. //sync作用: 因为上面的连接到服务器上以及监听都是异步操作, 执行后马上返回, 可能连接还未完全建立, 所以sync在此等待一下
    48. channelFuture.sync();
    49. //StepX - 业务操作: 在连接完成之后, 获取到通道, 往通道里面写一些数据
    50. //获取通道
    51. Channel channel = channelFuture.channel();
    52. //获取标准输入
    53. Scanner scanner = new Scanner(System.in);
    54. System.out.println("请输入信息: ");
    55. while (scanner.hasNext()) {
    56. String msg = scanner.next();
    57. // 创建一个缓冲区, 用于存储待发送的信息
    58. ByteBuf buffer = channel.alloc().buffer();
    59. //保存数据到直接内存的缓冲区
    60. buffer.writeBytes(msg.getBytes());
    61. // 通过通道将数据发送出去
    62. channel.writeAndFlush(buffer);
    63. }
    64. } catch (InterruptedException e) {
    65. throw new RuntimeException(e);
    66. }finally {
    67. //由于上面有while循环, 这里不出异常或者主动杀掉进程, 就不会执行到, 但是作为一个关闭是必不可少的, 否则真正的关闭不释放文件描述符
    68. workerLoopGroup.shutdownGracefully();
    69. }
    70. }
    71. //该内部类是对通道的入站操作的处理Handler
    72. public static class EchoClientHandler extends ChannelInboundHandlerAdapter{
    73. @Override
    74. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    75. //收到数据
    76. ByteBuf in = (ByteBuf) msg;
    77. byte[] arr = new byte[in.readableBytes()];
    78. in.getBytes(0, arr);
    79. System.out.println("Data from Server: " + new String(arr));
    80. //读取完成之后,要主动释放掉该buffer
    81. in.release();
    82. }
    83. }
    84. public static void main(String[] args) {
    85. EchoClient client = new EchoClient(8888,"localhost");
    86. client.startClient();
    87. }
    88. }

    Netty知识体系小总结: 

    1, 网络编程IO模型: BIO, NIO, Reactor模型(单线程, 多线程);

    2, 网络通信的TCP/IP通信模型: 三次握手, 四次挥手;

    3, Netty主要组件

            事件选择器: EventLoop

            通道: Channel - 这里不同于NIO的Channel, 仅仅是同名, 实则是对NIOChannel的封装

            处理器: Handler, 以及对Handler的编排PipeLine

            性能的关键: ByteBuf, 相比NIO的ByteBuffer, Netty的ByteBuf性能更好, 有自己的零拷贝, 池化技术等.

            编码解码器

            序列化操作

    以上内容在后续文章中不断分享.

  • 相关阅读:
    Python之ini配置文件详解
    类和对象(1)
    契约锁电子签助力拍卖业务网上签约,保全证据、不可抵赖,成交快
    万物皆可“云” 从杭州云栖大会看数智生活的未来
    FFmpeg获取媒体文件的音频信息
    Java 类型信息详解和反射机制
    [南京大学]-[软件分析]课程学习笔记(二)-IR
    OSCAR 分享之蚂蚁开源治理的方法和实践
    Redis入门完整教程:Python客户端redis-py
    【搜题公众号】详解搜题公众号搭建教程(附赠题库)
  • 原文地址:https://blog.csdn.net/malipku/article/details/128120686