• Netty(5)第一行代码Hello World


    概述

    Netty概念

    Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务

    1. 异步:这里的异步是指Netty 使用了多线程来完成方法调用 和 处理结果相分离 ,指的是调用时的异步,并不是值异步IO,Netty的IO模型还是基于多路复用的
    2. 基于事件驱动:Netty底层实现采用的是多路复用技术(即selector),在IO事件发生时才进行处理(如可连接、可读、可写事件等)

    Netty地位

    Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

    以下的框架都使用了 Netty,因为它们都涉及到网络通信需求

    • Cassandra - nosql 数据库
    • Spark - 大数据分布式计算框架
    • Hadoop - 大数据分布式存储框架
    • RocketMQ - ali 开源的消息队列
    • ElasticSearch - 搜索引擎
    • gRPC - rpc 框架(远程调用框架)
    • Dubbo - rpc 框架
    • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
    • Zookeeper - 分布式协调框架

    Netty优势

    netty底层基于NIO,

    自己基于NIO去开发的话,工作量大,bug 多,需要处理下面的内容:

    • 需要自己构建协议
    • 解决 TCP 传输问题,如粘包、半包
    • epoll 空轮询导致 CPU 100%(导致selector在某些情况下阻塞不了,导致线程一直在空轮询)
    • netty对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

    netty完善了上面的内容

    第一行代码:Hello World

    要求:开发一个简单的服务器端和客户端

    • 客户端向服务器端发送 hello, world
    • 服务器仅接收,不返回

    加入pom坐标(依赖)

    <dependency>
        <groupId>io.nettygroupId>
        <artifactId>netty-allartifactId>
        <version>4.1.39.Finalversion>
    dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    服务器端

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.epoll.EpollServerSocketChannel;
    import io.netty.channel.kqueue.KQueueSocketChannel;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.channel.socket.oio.OioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.logging.LoggingHandler;
    
    public class HelloServer {
        public static void main(String[] args) {
            // 1. 服务器端启动器,负责组装 netty 组件(协调它们的工作),启动服务器
            new ServerBootstrap()
                // 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
                /*
                    BossEventLoop:处理可连接事件
                    WorkerEventLoop:处理可读事件
                        selector:通过selector去监听事件
                        thread:线程
                    一个selector + thread 就是一个EventLoop,循环处理event事件
                    一个BossEventLoop + WorkerEventLoop 就是一个组
                    NioEventLoopGroup里面包含了一个线程和选择器
                */
                .group(new NioEventLoopGroup())
                // 3. 选择 服务器的 ServerSocketChannel 实现
                /*
                    有NIO和BIO
                        NioServerSocketChannel:基于NIO
                        EpollServerSocketChannel:基于Linux的Epoll的channel
                        KQueueSocketChannel:
                        OioSocketChannel:基于阻塞IO事件
                 */
                .channel(NioServerSocketChannel.class) // OIO BIO
                // 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
                /*
                    告诉将来作为worker的EventLoop,应该做什么事情
                    确定worker将来要做那些业务逻辑
                 */
                .childHandler(//将来连接建立后才会去执行这里面的内容,只有在连接建立后才开始执行
                        // 5. channel 代表和客户端进行数据读写的通道,  Initializer 初始化,负责添加别的 handler
                    new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {//初始化,责添加别的 handler
                        // 6. 添加具体 handler
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
                            @Override // 处理读事件
                            public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
                                System.out.println(msg); // 打印上一步转换好的字符串
                            }
                        });
                    }
                })
                // 7. 访问启动后,绑定的监听端口
                .bind(8080);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    代码说明

    new ServerBootstrap()
        .group(new NioEventLoopGroup()) // 1
        .channel(NioServerSocketChannel.class) // 2
        .childHandler(new ChannelInitializer<NioSocketChannel>() { // 3
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new StringDecoder()); // 5
                ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                        System.out.println(msg);
                    }
                });
            }
        })
        .bind(8080); // 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    代码说明:

    • 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开

    • 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有
      在这里插入图片描述

    • 3 处,方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器

    • 4 处,ServerSocketChannel 绑定的监听端口

    • 5 处,SocketChannel 的处理器,解码 ByteBuf => String

    • 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果

    客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.net.InetSocketAddress;
    
    public class HelloClient {
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建启动类,启动客户端
            new Bootstrap()
                // 2. 添加 EventLoop:比如服务器端发送数据过来,客户端的eventLoop就可以从选择器里触发读事件,去进行进一步的处理
                .group(new NioEventLoopGroup())
                // 3. 选择客户端 channel 实现,NioSocketChannel(封装了jdk的NioSocketChannel)
                .channel(NioSocketChannel.class)
                // 4. 添加处理器,ChannelInitializer(连接建立后会被调用,调用后便会执行initChannel方法)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //客户端将字符串编码成为ByteBuf,服务器端将ByteBuf解码为字符串
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 5. 连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()//
                .channel()
                // 6. 向服务器发送数据,即写数据
                .writeAndFlush("hello, world");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    代码说明

    new Bootstrap()
        .group(new NioEventLoopGroup()) // 1
        .channel(NioSocketChannel.class) // 2
        .handler(new ChannelInitializer<Channel>() { // 3
            @Override
            protected void initChannel(Channel ch) {
                ch.pipeline().addLast(new StringEncoder()); // 8
            }
        })
        .connect("127.0.0.1", 8080) // 4
        .sync() // 5
        .channel() // 6
        .writeAndFlush(new Date() + ": hello world!"); // 7
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    代码说明

    • 1 处,创建 NioEventLoopGroup,同 Server
    • 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有
      在这里插入图片描述
    • 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
    • 4 处,指定要连接的服务器和端口
    • 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
    • 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
    • 7 处,写入消息并清空缓冲区
    • 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
    • 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程

    上述helloworld的执行流程

    在这里插入图片描述

    执行流程:

    1. 服务器从启动器开始执行,服务器端需要监听accept事件(连接建立事件)
    2. 当客户端连接到服务器(connect()),服务器会监听到连接建立,这时候才会去执行childHandler里面的回调方法initChannel(服务器端初始化方法)
    3. 客户端在监听到连接建立后,也会去执行Handler里面的回调方法initChannel(客户端初始化方法)
    4. 客户端和服务器端二者的初始化方法(initChannel())可以理解为同时运行,只不过一个是在客户端执行,一个是在服务器端执行
    5. 客户端的initChannel里面加了一个处理器(StringEncoder)
    6. 服务器端的initChannel里面添加了两个处理器 ,一个StringDecoder处理器,一个我们自己定义的处理器(ChannelInboundHandlerAdapter)
    7. 上面的执行完后,才会去执行客户端代码的sync()方法 (这是一个同步方法或阻塞方法,直到连接建立才开始运行)
    8. 继续向下运行到channel(),这时候拿到了一个channel对象(连接建立对象),这时候便可以去使用连接对象的一些方法去读、写数据
    9. 过后,运行writeAndFlush方法去发送数据(写数据),由此可知,凡是收发数据,都要走handler
    10. 上面的运行完后,会去执行客户端的处理器方法(StringEncoder方法:把helloworld转换为了ByteBuf)
    11. 转成功后,这时候才会把数据ByteBuf发送到服务器端的group里面的EventLoopGroup方法,里面还有另外一个EventLoop接收read事件
    12. 过后,继续走到了服务器端的处理器上(StringDecoder和ChannelInboundHandlerAdapter),按照之前添加的顺序去一个一个一次去处理这个数据
    13. 首先执行StringDecoder这个处理器,将ByteBuf转为String,然后将这个String交给下一个处理器ChannelInboundHandlerAdapter,这个自定义处理器,该处理器的channelRead关注读事件,运行这个方法,将传过来的String进行处理
    14. 最后打印出helloworld

    对代码中的字段理解

    观念

    • channel 理解为数据的通道
    • msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
    • handler 理解为数据的处理工序
      • 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成...)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • handler 分 Inbound(入栈,数据输入时) 和 Outbound(出栈,数据向客户端写出时) 两类
    • eventLoop 理解为处理数据的工人
    • 工人可以管理多个 channel 的 io 操作并且一旦工人负责了某个 channel,就要负责到底(一开始建立了关系,二者就绑定上了,这个过程是为了线程安全)
    • 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
  • 相关阅读:
    leetcode 经典题目42.接雨水
    【超图+CESIUM】【基础API使用示例】49、超图|CESIUM -自定义按钮操作视角上下左右东西南北移动|修改覆盖罗盘的上下左右东西南北的视角移动
    SpringSecurity自定义多Provider时提示No AuthenticationProvider found for问题的解决方案与原理(四)
    力扣(LeetCode)901. 股票价格跨度(C语言)
    element表单非必填数据的验证写法(自定义验证规则)
    使用scala语言编写代码,一键把hive中的DDLsql转化成MySql中的DDLsql
    SpringBoot 集成 kaptcha 验证码
    2022华数杯A题 B题 C题 思路汇总
    《软件方法(下)》第8章2023版连载(05)关于实体类
    tomcat的安装与部署
  • 原文地址:https://blog.csdn.net/yyuggjggg/article/details/126357013