• netty通信


    学习netty之前,要先了解操作系统中的IO零拷贝(已经附上链接了)

    一、netty的简单介绍

    • Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目。
    • Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
    • Netty 主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 **Peer-to-Peer **场景下的大量数据持续传输的应用。
    • Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景。
    • Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信

    为什么有了netty框架

    原生的NIO存在问题:

    • NIO的类库和API繁杂:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer
    • 要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式
    • Selector可能出现空轮询,占用CPU资源

    netty是对NIO进行封装,解决了上述问题:

    • 设计优雅:
      适用于各种传输类型的统一API,阻塞和非阻塞Socket;
      基于灵活且可扩展的事件模型,可以清晰地分离关注点;
      高度可定制的线程模型
    • 高性能、吞吐量更高:
      延迟更低;
      减少资源消耗;
      最小化不必要的内存复制。
    • 安全:完整的 SSL/TLS 和 StartTLS 支持。
    • 社区活跃、不断更新

    二、线程模式

    下面讲解的是netty线程模式的由来

    现有的线程模式:

    • BIO的传统阻塞IO服务模型
    • NIO的Reactor模型
      单 Reactor 单线程;
      单 Reactor多线程;
      主从 Reactor多线程

    2.1 BIO的传统阻塞IO服务模型

    1、每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。当并发数很大,就会创建大量的线程,占用很大系统资源。
    2、连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 Handler对象中的read 操作,导致上面的处理线程资源浪费。
    image

    2.2 NIO的Reactor模型

    基于I/O多路复用模型:多个客户端进行连接,先把连接请求给Reactor,多个连接共用一个阻塞对象Reactor,由Reactor负责监听和分发,当客户端连接没有数据时不会阻塞线程。
    基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。(解决了当并发数很大时,会创建大量线程,占用很大系统资源)

    Reactor模式中核心组成:

    • Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理线程来对IO事件做出反应。
    • Handlers:处理线程执行IO事件。

    单Reactor单线程

    特点:

    • 该模型简单没有多线程的竞争,由一个线程完成所有的操作(监听、分发、执行),没有充分利用多核CPU
    • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间长因为该线程还要处理业务;
    • 当Reactor出现问题,就会造成业务模块不可用

    image
    上图解析:

    • Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch 进行分发
    • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
    • 如果不是建立连接事件,则Reactor会分发处理线程来处理Handler的IO事件(完成 Read → 业务处理 → send 的完整业务流程)

    单Reactor多线程

    特点:

    • 充分利用多核CPU,可能出现多线程竞争
    • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间短因为处理业务交给worker线程池执行;
    • Reactor承担所有的事件的监听和响应,它是单线程运行,在高并发场景容易出现性能瓶颈
    • 当Reactor出现问题,就会造成业务模块不可用

    image

    上图解析:

    • Reactor对象通过select监控客户端请求事件,收到事件后,通过Dispatch 进行分发
    • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
    • 如果不是连接请求事件,则Reactor会分发处理线程来处理Handler的IO事件,handler只负责响应事件(read、send),不做具体的业务处理交给worker线程池执行(这样不会使handler阻塞太久)
    • worker线程池会分配独立的worker线程完成真正的业务,并将结果返回给handler,handler收到响应后,通过send将结果返回给client

    主从Reactor多线程

    特点:

    • 主Reactor可已有多个,从Reactor也可以有多个
    • 主Reactor负责处理建立连接请求事件,从Reactor负责处理业务请求事件
    • 当从Reactor出现问题,可以调用其他的从Reactor提供服务

    image

    上图解析:

    • Reactor主线程 MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor的accept处理连接事件
    • 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor,subreactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
    • 当有新事件发生时,subreactor就会分发处理线程来处理handler,handler通过read读取数据,分发给后面的worker线程池处理。
    • worker线程池分配独立的worker线程进行业务处理,并返回结果。handler 收到响应的结果后,再通过send将结果返回给client
    • Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个 SubReactor

    生活中的体现:
    单 Reactor 单线程:前台接待员和服务员是同一个人,全程为顾客服务
    单 Reactor 多线程:1 个前台接待员,多个服务员,接待员只负责接待
    主从 Reactor 多线程:多个前台接待员,多个服务生

    2.3 netty线程模型

    netty线程模型主要是依据主从Reactor多线程
    image

    • BossGroup中的NioEventLoop就像MainReactor可以有多个,WorkerGroup中的NioEventLoop就像SubReactor一样可以有多个。

    • Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

    • BossGroup和WorkerGroup类型都是NioEventLoopGroup,NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop(NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop)

    • NioEventLoop表示一个不断循环的执行处理任务的线程,每个 NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯

    • 每个BossGroup下面的NioEventLoop循环执行的步骤有3步:
      1、轮询 accept 事件
      2、处理 accept 事件,与 client 建立连接,生成NioSocketChannel,并将其注册到某个WorkerGroup的NioEventLoop上的Selector
      3、继续处理任务队列的任务,即runAllTasks

    • 每个WorkerGroup下面的NioEventLoop循环执行的步骤有3步:
      1、轮询 read,write 事件
      2、处理 I/O 事件,即 read,write 事件,在对应NioSocketChannel处理
      3、处理任务队列的任务,即runAllTasks

    • 每个WorkerGroup的NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel(通道),即通过pipeline可以获取到对应通道,每个通道中都有一个channelPipeline维护了很多的处理器channelhandler。

    netty案例-TCP服务

    服务端:
    NettyServer

    NettyServer代码
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyServer {
        public static void main(String[] args) throws Exception {
    
    
            //创建BossGroup 和 WorkerGroup
            //说明
            //1. 创建两个线程组 bossGroup 和 workerGroup
            //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
            //3. 两个都是无限循环
            //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
            //   默认实际 cpu核数 * 2
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
    
    
    
            try {
                //创建服务器端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
    
                //使用链式编程来进行设置
                bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                        .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
    //                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                        .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                            //给pipeline 设置处理器
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                                ch.pipeline().addLast(new NettyServerHandler());
                            }
                        }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
    
                System.out.println(".....服务器 is ready...");
    
                //绑定一个端口并且同步生成了一个 ChannelFuture 对象(也就是立马返回这样一个对象)
                //启动服务器(并绑定端口)
                ChannelFuture cf = bootstrap.bind(6668).sync();
    
                //给cf 注册监听器,监控我们关心的事件
    
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (cf.isSuccess()) {
                            System.out.println("监听端口 6668 成功");
                        } else {
                            System.out.println("监听端口 6668 失败");
                        }
                    }
                });
    
    
                //对关闭通道事件  进行监听
                cf.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
    }
    
    

    NettyServerHandler

    NettyServerHandler代码
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelPipeline;
    import io.netty.util.CharsetUtil;
    
    import java.util.concurrent.TimeUnit;
    
    /*
    说明
    1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)
    2. 这时我们自定义一个Handler , 才能称为一个handler
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        //读取数据事件(这里我们可以读取客户端发送的消息)
        /*
        1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
        2. Object msg: 就是客户端发送的数据 默认Object
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
            System.out.println("server ctx =" + ctx);
            System.out.println("看看channel 和 pipeline的关系");
            Channel channel = ctx.channel();
            ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表
    
    
            //将 msg 转成一个 ByteBuf
            //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + channel.remoteAddress());
        }
    
        //数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
            //writeAndFlush 是 write + flush
            //将数据写入到缓存,并刷新
            //一般讲,我们对这个发送的数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
        }
    
        //发生异常后, 一般是需要关闭通道
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    客户端:
    NettyClient

    NettyClient代码
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
        public static void main(String[] args) throws Exception {
    
            //客户端需要一个事件循环组
            EventLoopGroup group = new NioEventLoopGroup();
    
    
            try {
                //创建客户端启动对象
                //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
                Bootstrap bootstrap = new Bootstrap();
    
                //设置相关参数
                bootstrap.group(group) //设置线程组
                        .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                            }
                        });
    
                System.out.println("客户端 ok..");
    
                //启动客户端去连接服务器端
                //关于 ChannelFuture 要分析,涉及到netty的异步模型
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
                //对关闭通道事件  进行监听
                channelFuture.channel().closeFuture().sync();
            }finally {
    
                group.shutdownGracefully();
    
            }
        }
    }
    

    NettyClientHandler

    NettyClientHandler代码
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        //当通道就绪就会触发该方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
        }
    
        //当通道有读取事件时,会触发
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    netty中的参数组件

    1、Bootstrap、ServerBootstrap

    Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

    2、Future、ChannelFuture

    Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以立即返回一个ChannelFuture,它可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

    • Channel channel(),返回当前正在进行 IO 操作的通道
    • ChannelFuture sync(),等待异步操作执行完毕,同步执行注册的监听事件

    Future-Listener 机制
    当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

    • 通过 isDone 方法来判断当前操作是否完成;
    • 通过 isSuccess 方法来判断已完成的当前操作是否成功;
    • 通过 getCause 方法来获取已完成的当前操作失败的原因;
    • 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
    • 通过 addListener 方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;

    3、Channel

    • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
    • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
    • Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回一个ChannelFuture,并且不保证在调用结束时所请求的 I/O 操作已完(后期通过ChannelFuture的方法查看异步执行结果)
    • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应
      NioSocketChannel,异步的客户端 TCP Socket 连接。
      NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
      NioDatagramChannel,异步的 UDP 连接。

    4、Selector

    • Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
    • 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select)这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel

    5、ChannelHandler 及其实现类

    • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
    • ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
    • 当自定义一个handler类处理器时,需要继承ChannelhandlerAdapter
      image

    6、Pipeline 和 ChannelPipeline

    • Pipeline中包含了多个Channel(通道)
    • 一个Channel包含了一个ChannelPipeline,而ChannePipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个channeHandlerContext中又关联着一个channelHandler
    • ChannelPipeline是保存ChannelHandler的List,用于处理或拦截Channel 的入站事件和出站事件操作
    • 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前t个出站的handler, c两种类型的handler互不干扰
    • ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式
      image

    ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
    ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置

    7、ChannelHandlerContext

    • 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
    • 即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline 和 Channel 的信息,方便对ChannelHandler进行调用。

    ChannelHandlerContext的常用方法:
    1、ChannelFuture close(),关闭通道
    2、ChannelOutboundInvoker flush(),刷新
    3、ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline 中当前 ChannelHandler 的下一个ChannelHandler 开始处理(出站)

    8、ChannelOption

    Netty在创建Channel实例后,一般都需要设置 ChannelOption 参数
    image

    9、EventLoopGroup 和其实现类 NioEventLoopGroup

    • EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
    • EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
    • BossEventLoop负责接收客户端的连接并将SocketChannel注册到 WorkerEventLoopGroup的其中一个workerEventLoop的selector上并进行后续的IO事件处理

    10、Unpooled类

    • Netty提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类

    ByteBuf buffer = Unpooled.buffer(10);
    ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

    netty的案例-群聊系统

    GroupChatServer

    GroupChatServer代码
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class GroupChatServer {
    
        private int port; //监听端口
    
    
        public GroupChatServer(int port) {
            this.port = port;
        }
    
        //编写run方法,处理客户端的请求
        public void run() throws  Exception{
    
            //创建两个线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
    
            try {
                ServerBootstrap b = new ServerBootstrap();
    
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                //获取到pipeline
                                ChannelPipeline pipeline = ch.pipeline();
                                //向pipeline加入解码器
                                pipeline.addLast("decoder", new StringDecoder());
                                //向pipeline加入编码器
                                pipeline.addLast("encoder", new StringEncoder());
                                //加入自己的业务处理handler
                                pipeline.addLast(new GroupChatServerHandler());
    
                            }
                        });
    
                System.out.println("netty 服务器启动");
                ChannelFuture channelFuture = b.bind(port).sync();
    
                //监听关闭
                channelFuture.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            new GroupChatServer(7000).run();
        }
    }
    

    GroupChatServerHandler

    GroupChatServerHandler代码
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        //这样写还要自己遍历Channel
        //public static List<Channel> channels = new ArrayList<Channel>();
    
        //使用一个hashmap 管理私聊(私聊本案例并未实现,只是提供个思路)
        //public static Map<String, Channel> channels = new HashMap<String,Channel>();
    
        //定义一个channle 组,管理所有的channel
        //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
        private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    
        //handlerAdded 表示连接建立,一旦连接,第一个被执行
        //将当前channel 加入到  channelGroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            //将该客户加入聊天的信息推送给其它在线的客户端
    
            //该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历
    
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
            channelGroup.add(channel);
    
    		//私聊如何实现
    //         channels.put("userid100",channel);
    		
    
        }
    
        //断开连接, 将xx客户离开信息推送给当前在线的客户
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
            System.out.println("channelGroup size" + channelGroup.size());
    
        }
    
        //表示channel 处于活动状态, 提示 xx上线
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //这个是给服务端看的,客户端上面已经提示xxx加入群聊了
            System.out.println(ctx.channel().remoteAddress() + " 上线了~");
        }
    
        //表示channel 处于不活动状态, 提示 xx离线了
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
            System.out.println(ctx.channel().remoteAddress() + " 离线了~");
        }
    
        //读取数据,转发给在线的每一个客户端
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    
            //获取到当前channel
            Channel channel = ctx.channel();
            //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
    
            channelGroup.forEach(ch -> {
                if(channel != ch) { //不是当前的channel,转发消息
                    ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
                }else {//回显自己发送的消息给自己
                    ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //关闭通道
            ctx.close();
        }
    }
    

    GroupChatClient

    GroupChatClient代码
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    
    public class GroupChatClient {
    
        //属性
        private final String host;
        private final int port;
    
        public GroupChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void run() throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
    
    
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
    
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
    
                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自定义的handler
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
    
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //得到channel
                Channel channel = channelFuture.channel();
                System.out.println("-------" + channel.localAddress()+ "--------");
                //客户端需要输入信息,创建一个扫描器
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    //通过channel 发送到服务器端
                    channel.writeAndFlush(msg + "\r\n");
                }
            }finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
            new GroupChatClient("127.0.0.1", 7000).run();
        }
    }
    

    GroupChatClientHandler

    GroupChatClientHandler代码
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    
        //从服务器拿到的数据
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(msg.trim());
        }
    }
    

    netty的心跳检测机制

    IdleStateHandler是netty 提供的处理空闲状态的处理器(放在ChannelPipeline维护的ChannelHandler的双向链表上):

    • long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
    • long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
    • long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接

    netty的编解码器

    编解码器放在ChannelPipeline维护的ChannelHandler的双向链表上
    服务端发数据给客户端:服务端—>出站(编码)—>Socket通道—>入站(解码)—>客户端
    客户端发数据给服务端:客户端—>出站(编码)—>Socket通道—>入站(解码)—>服务端
    image

    编解码器(自定义编解码器时需要继承下面的其中一个类):

    • ByteToMessageDecoder
    • LineBasedFrameDecoder:这个类在 Netty 内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
    • DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
    • HttpObjectDecoder:一个 HTTP 数据的解码器
    • LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。

    TCP 粘包和拆包及解决方案

    TCP粘包和拆包
    使用优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,出现了粘包和拆包的问题,因为面向流的通信是无消息保护边界的
    image

    • 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
    • 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
    • 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
    • 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。

    解决方案
    使用自定义协议+编解码器来解决,关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。

    RPC(基于netty)

    1、RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
    2、两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

    RPC的调用流程图
    image

    • 服务消费方(client)以本地调用方式调用服务
    • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
    • client stub 将消息进行编码并发送到服务端
    • server stub 收到消息后进行解码
    • server stub 根据解码结果调用本地的服务
    • 本地服务执行并将结果返回给 server stub
    • server stub 将返回导入结果进行编码并发送至消费方
    • client stub 接收到消息并进行解码
    • 服务消费方(client)得到结果

    RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

  • 相关阅读:
    【MySQL】索引的作用及知识储备
    React@16.x(42)路由v5.x(7)常见应用场景(4)- 路由切换动画
    设计模式之适配器模式C++实现
    交换机技术综述(第十一课)
    在 Vite项目中,使用插件 @rollup/plugin-inject 注入全局 jQuery
    海信电视携手FIRST惊喜影展·类型公开周,共筑参考级影像
    安卓开发中ViewBinding的使用
    [红蓝攻防]MDOG(全新UI重制版)为Xss跨站而生,数据共享,表单劫持,URL重定向
    SpringBoot整合JWT实现登陆验证
    星环效果css备忘
  • 原文地址:https://www.cnblogs.com/worldusemycode/p/16015272.html