• 【初识Netty&使用Netty实现简单的客户端与服务端的通信操作&Netty框架中一些重要的类以及方法的解析】


    一.Netty是什么?

    image.png

    Netty 由 Trustin Lee(韩国,Line 公司)2004 年开发

    本质:网络应用程序框架

    实现:异步、事件驱动

    特性:高性能、可维护、快速开发

    用途:开发服务器和客户端

    Netty的性能很高,按照Facebook公司开发小组的测试表明,Netty最高能达到接近百万的吞吐。

    二.Netty发展历程

    • 2004年6月Netty2发布(声称Java社区中第一个基于事件驱动的应用网络框架)
    • 2008年10月Netty3发布
    • 2013年7月Netty4 发布
    • 2013年12月发布5.0.0.Alpha1
    • 2015年11月废弃5.0.0

    三.Netty在开发领域的应用场景

    30000+项目在使用(统计方法:依赖项中声明io.netty:netty-all)

    • 数据库:Cassandra
    • 大数据处理:Spark、Hadoop
    • Message Queue: RocketMQ
    • 检索: Elasticsearch
    • 框架:gRPC、Apache Dubbo
    • 分布式协调器:ZooKeeper
    • 工具类: async-http-client

    四.第一个Netty程序

    4.1 maven中引入一个比较稳定的4.1.25.Final的版本

    <dependency>
          <groupId>io.nettygroupId>
          <artifactId>netty-allartifactId>
          <version>4.1.28.Finalversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.2 服务端代码

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    import java.net.InetSocketAddress;
    
    public class Server{
    
        private final int port;
    
        public Server  (int port) {
            this.port = port;
        }
    
        public static void main(String[] args) throws InterruptedException {
            int port = 9999;
            Server server = new Server(port);
            System.out.println("服务器启动");
            server.start();
            System.out.println("服务器关闭");
        }
    
        public void start() throws InterruptedException {
            final ServerHandler serverHandler = new ServerHandler();
            /*线程组*/
            EventLoopGroup group = new NioEventLoopGroup(1);
            try {
                /*服务端启动必须*/
                ServerBootstrap b = new ServerBootstrap();
                b.group(group)/*将线程组传入*/
                        .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                        .localAddress(new InetSocketAddress(port))/*指定服务器监听端口*/
                        /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                        所以下面这段代码的作用就是为这个子channel增加handle*/
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel ch) throws Exception {
                                /*添加到该子channel的pipeline的尾部*/
                                ch.pipeline().addLast(serverHandler);
                            }
                        });
                ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
                f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
    
            } finally {
                group.shutdownGracefully().sync();/*优雅关闭线程组*/
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    4.3 服务端业务处理

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    @ChannelHandler.Sharable
    /*不加这个注解那么在增加到childHandler时就必须new出来*/
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        /*服务端读到数据以后,就会执行*/
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf in = (ByteBuf)msg;
            System.out.println("Server accept:"+in.toString(CharsetUtil.UTF_8));
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8));
        }
    
        /*** 服务端读取完成网络数据后的处理*/
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                    .addListener(ChannelFutureListener.CLOSE);
        }
    
        /*** 发生异常后的处理*/
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    4.4 客户端实现代码

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.net.InetSocketAddress;
    
    public class Client {
    
        private final int port;
        private final String host;
    
        public Client(int port, String host) {
            this.port = port;
            this.host = host;
        }
    
        public void start() throws InterruptedException {
            /*线程组*/
            EventLoopGroup group = new NioEventLoopGroup();
            try{
                /*客户端启动必备*/
                Bootstrap b = new Bootstrap();
                b.group(group)/*把线程组传入*/
                        /*指定使用NIO进行网络传输*/
                        .channel(NioSocketChannel.class)
                        .remoteAddress(new InetSocketAddress(host,port))
                        .handler(new ClientHandle());
                /*连接到远程节点,阻塞直到连接完成*/
                ChannelFuture f = b.connect().sync();
                /*阻塞程序,直到Channel发生了关闭*/
                f.channel().closeFuture().sync();
            }finally {
                group.shutdownGracefully().sync();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new Client(9999,"127.0.0.1").start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    4.5 客户端业务逻辑处理

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    public class ClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
    
        /*客户端读到数据以后,就会执行*/
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                throws Exception {
            System.out.println("client acccept:"+msg.toString(CharsetUtil.UTF_8));
        }
    
        /*连接建立以后*/
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer(
                    "Hello Netty",CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            super.userEventTriggered(ctx, evt);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    3.6 运行结果:

    3.6.1 服务端运行结果:

    在这里插入图片描述

    3.6.2 客户端运行结果:

    在这里插入图片描述

    五.Netty框架中一些重要的类以及方法的解析

    5.1 EventLoop、ServerBootstrap、Bootstrap

    EventLoop暂时可以看成一个线程、EventLoopGroup自然就可以看成线程组。

    EventLoopGroup group = new NioEventLoopGroup(1);
    
    • 1

    网络编程里,“服务器”和“客户端”实际上表示了不同的网络行为;换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。因此,有两种类型的引导:一种用于客户端(简单地称为Bootstrap),而另一种(ServerBootstrap)用于服务器。无论你的应用程序使用哪种协议或者处理哪种类型的数据,唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。

    //服务端启动创建的对象
    ServerBootstrap b = new ServerBootstrap();
    //客户端启动创建的对象
    Bootstrap b = new Bootstrap();
    
    • 1
    • 2
    • 3
    • 4

    ServerBootstrap将绑定到一个端口,因为服务器必须要监听连接,而Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的。

    引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap 则需要两个,因为服务器需要两组不同的Channel。第一组将只包含一个ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的Channel。

    image.png

    Channel 是Java NIO 的一个基本包装构造。

    它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。

    目前,可以把Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

    5.2 事件和ChannelHandler、ChannelPipeline

    image.png

    Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。

    可能由入站数据或者相关的状态更改而触发的事件包括:

    连接已被激活或者连接失活;数据读取;用户事件;错误事件。

    出站事件是未来将会触发的某个动作的操作结果,这些动作包括:

    打开或者关闭到远程节点的连接;将数据写到或者冲刷到套接字。

    image.png

    每个事件都可以被分发给ChannelHandler 类中的某个用户实现的方法。

    Netty 提供了大量预定义的可以开箱即用的ChannelHandler 实现,包括用于各种协议(如HTTP 和SSL/TLS)的ChannelHandler。

    5.3 ChannelFuture

    Netty 中所有的I/O 操作都是异步的。

    JDK 预置了interface java.util.concurrent.Future,Future 提供了一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以Netty提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用。

    每个Netty 的出站I/O操作都将返回一个 ChannelFuture 。

    好了,【初识Netty&使用Netty实现简单的客户端与服务端的通信操作&Netty框架中一些重要的类以及方法的解析】就先学习到这里,更多内容不断学习更新中。

  • 相关阅读:
    【Python】爬虫-基础入门
    Vscode环境下的PyQt
    mysql之主从复制和读写分离搭建
    Node.js学习笔记_No.04
    Linux进阶-用户管理与文件权限
    System V 消息队列(一)—— 消息队列相关接口函数(msgget / msgctl)
    使用 multiprocessing 多进程处理批量数据
    一个简单的模拟实例说明Task及其调度问题
    反射中获取Class对象,成员变量,成员方法,构造方法总结
    ES, MongoDB, HBase的区别和使用场景
  • 原文地址:https://blog.csdn.net/Coder_ljw/article/details/127658338