目的:
通过案例学习Netty的NIO通信
案例功能:
1, 客户端发送数据到服务端, 服务端打印;
2, 服务端回传收到的数据给客户端
代码中写了详细注释, 博文不具体阐述, 用作入门, 后续专栏文章会详细介绍Netty的每一个组件, Netty的前两篇博文权当和Netty做个HelloWorld.
服务端代码如下:
- package netty.echo;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
-
- /**
- * Instruction:
- * Author:@author MaLi
- */
- public class EchoServer {
- public void startServer() {
- //1, 创建启动器
- ServerBootstrap serverBootstrap = new ServerBootstrap();
-
- //2, 创建并设置事件轮询组
- // 用于接收新连接
- NioEventLoopGroup bossGroup = new NioEventLoopGroup();
- // 用于客户端与服务端的IO传输, 线程数默认为机器CPU核数*2
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- // 为父子通道设置对应的轮询组 - 本质是为通道设置Reactor的选择器角色
- serverBootstrap.group(bossGroup, workerGroup);
-
- //3, 设置Channel通信类型 - 这里使用NIO通信类型
- serverBootstrap.channel(NioServerSocketChannel.class);
- // 设置该通信为长链接
- serverBootstrap.option(ChannelOption.SO_KEEPALIVE, true);
-
- //4, 为子通道设置Handler流水线
- EchoServerHandler echoServerHandler = new EchoServerHandler();
- serverBootstrap.childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(echoServerHandler);
- }
- });
-
- //5, 启动并绑定Channel到服务器端口对应端口(这里是8888), sync()作用, 等待绑定完成
- try {
- ChannelFuture sync = serverBootstrap.bind(8888).sync();
- //6, 等待通道的关闭回调, (Netty官方案例有英文注释, 这里只是优雅的关闭方式, 实际在当前代码中并没有关闭事件)
- // 作用: 阻塞在这里一直等待新连接事件发生
- sync.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- //7, 关闭事件轮询组
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
-
- //服务端处理IO事件的Handler处理器 - 基本Netty的开发都是开发Handler
- @ChannelHandler.Sharable //代表多个Channel可以共享这个Handler(只要没有并发安全就可以被共享)
- public static class EchoServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- //收到数据
- ByteBuf in = (ByteBuf) msg;
- //如果返回false则代表不是堆内分配的存储 - Netty4.1+使用直接内存(OS管理的物理内存)
- System.out.println("是否堆内存" + in.hasArray());
- //因为Netty4.1+默认是使用直接内存的buffer来存储Channel读到的数据, Java要进行处理这些数据, 先要拷贝到自己的堆中
- //所以这里先建立一个对应长度的堆内数组
- byte[] arr = new byte[in.readableBytes()];
- // 读取数据到数组arr中
- in.getBytes(0, arr);
- System.out.println("Data from Client: " + new String(arr));
- System.out.println("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt()); //测试一下当前缓冲区的引用数量
- //写数据回给客户端
- ChannelFuture channelFuture = ctx.writeAndFlush(msg);
-
- channelFuture.addListener((ChannelFuture futureListener) -> {
- System.out.println("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
- });
- //传递到下一个Handler - 当前案例只有一个用户自定义的Handler, 其实不写也无所谓.
- super.channelRead(ctx, msg);
- }
- }
-
- public static void main(String[] args) {
- EchoServer echoServer = new EchoServer();
- echoServer.startServer();
- }
- }
客户端代码如下
- package netty.echo;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.PooledByteBufAllocator;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
-
- import java.util.Scanner;
-
- /**
- * Instruction:
- * Author:@author MaLi
- */
- public class EchoClient {
- private int serverPort;
- private String serverIp;
-
- public EchoClient(int serverPort, String serverIp) {
- this.serverPort = serverPort;
- this.serverIp = serverIp;
- }
-
- public void startClient() {
- //Step1: 创建组装器 - 用于配置客户端的 - 事件轮询器 - 通道 - 处理器
- Bootstrap bootstrap = new Bootstrap();
- //Step2: 创建轮询器 - 封装了Selector, 用于选择数据传输事件
- NioEventLoopGroup workerLoopGroup = new NioEventLoopGroup();
- bootstrap.group(workerLoopGroup);
- //Step3: 设置通道类型 - 这里使用了NIOSocket
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.remoteAddress(serverIp, serverPort);
- bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- //Step4: 配置事件处理器 - 有数据IO的时候, 用于处理该IO的处理器
- bootstrap.handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new EchoClientHandler());
- }
- });
- //Step5: 连接服务器, 并配置了一个监听器, 用于在连接完成后, 回调lambda函数
- ChannelFuture channelFuture = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
- if (futureListener.isSuccess()) {
- System.out.println("connection successful");
- } else {
- System.out.println("connection failure");
- }
- });
- try {
- //sync作用: 因为上面的连接到服务器上以及监听都是异步操作, 执行后马上返回, 可能连接还未完全建立, 所以sync在此等待一下
- channelFuture.sync();
-
- //StepX - 业务操作: 在连接完成之后, 获取到通道, 往通道里面写一些数据
- //获取通道
- Channel channel = channelFuture.channel();
- //获取标准输入
- Scanner scanner = new Scanner(System.in);
- System.out.println("请输入信息: ");
- while (scanner.hasNext()) {
- String msg = scanner.next();
- // 创建一个缓冲区, 用于存储待发送的信息
- ByteBuf buffer = channel.alloc().buffer();
- //保存数据到直接内存的缓冲区
- buffer.writeBytes(msg.getBytes());
- // 通过通道将数据发送出去
- channel.writeAndFlush(buffer);
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }finally {
- //由于上面有while循环, 这里不出异常或者主动杀掉进程, 就不会执行到, 但是作为一个关闭是必不可少的, 否则真正的关闭不释放文件描述符
- workerLoopGroup.shutdownGracefully();
- }
- }
- //该内部类是对通道的入站操作的处理Handler
- public static class EchoClientHandler extends ChannelInboundHandlerAdapter{
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- //收到数据
- ByteBuf in = (ByteBuf) msg;
- byte[] arr = new byte[in.readableBytes()];
- in.getBytes(0, arr);
- System.out.println("Data from Server: " + new String(arr));
- //读取完成之后,要主动释放掉该buffer
- in.release();
- }
- }
-
- public static void main(String[] args) {
- EchoClient client = new EchoClient(8888,"localhost");
- client.startClient();
- }
- }
Netty知识体系小总结:
1, 网络编程IO模型: BIO, NIO, Reactor模型(单线程, 多线程);
2, 网络通信的TCP/IP通信模型: 三次握手, 四次挥手;
3, Netty主要组件
事件选择器: EventLoop
通道: Channel - 这里不同于NIO的Channel, 仅仅是同名, 实则是对NIOChannel的封装
处理器: Handler, 以及对Handler的编排PipeLine
性能的关键: ByteBuf, 相比NIO的ByteBuffer, Netty的ByteBuf性能更好, 有自己的零拷贝, 池化技术等.
编码解码器
序列化操作
以上内容在后续文章中不断分享.