1..本文部分内容翻译自:
[Netty] Netty's thread model and simple usage
2.netty模型是以 Reactor模式为基础的,具体的,netty使用的是 主从Reactor多线程模型;
3.先介绍了 Reactor线程模型;后介绍了 Netty 组成部分;
4.文末还po出了 netty服务器与客户端代码实现(总结自 B站《尚硅谷-netty》);
【注意】
[Netty] Netty's thread model and simple usage
2)Reactor模式的三种角色:
3)Reactor的3种线程模型
而 netty 使用的是 主从 Reactor多线程模型;
4)除建立连接外,服务器处理客户端请求涉及的操作大致有5种:
1)单Reactor单线程模型概述:
2)模型图

【图解】
【优缺点】
1)单Reactor多线程模型概述:
2)模型图

【图解】
【优缺点】
1)主从Reactor多线程模型概述:由于单Reactor线程会降低多核cpu能力(或未能发挥),所以需要建立多Reactor线程,即主从Reactor线程模型。

【图解】主从Reactor多线程模型下,服务器线程有3类:
【优缺点】

1)netty线程模型包含 ServerBootStrap,NioEventLoopGroup 及其组成部分 NioEventLoop,NioChannel, ChannelPipeline, ChannelHandler 等;
2)netty使用主从Reactor多线程模型来实现。
1)即 事件循环器;顾名思义,他实际上是一个循环的处理过程;
2)EventLoop 等同于 while(true) 死循环里的代码段;
3)EventLoop 主要包含了一个选择器多路复用器和一个任务队列
4)EventLoop并不是从程序启动时就开始运行,而是当有任务提交时,它才会开始处理任务并一直运行;

1)初始化:
2)ServerBootStrap初始化 代码实现:
- ServerBootstrap bootstrap = new ServerBootstrap();
- EventLoopGroup boss = new NioEventLoopGroup(1); // 设置boss线程组中的线程个数为1 (默认是cpu核数*2)
- EventLoopGroup work = new NioEventLoopGroup(8); // 设置worker线程组中的线程个数为8 (默认是cpu核数*2)
- bootstrap.group(boss, work).channel(NioServerSocketChannel.class); // 把参数设置到bootstrap
1)netty服务器
- /**
- * @Description 简单netty服务器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年08月27日
- */
- public class SimpleNettyServer44 {
- public static void main(String[] args) throws InterruptedException {
- // 创建 BossGroup 和 WorkerGroup
- // 1. 创建2个线程组 bossGroup, workerGroup
- // 2 bossGroup 仅处理连接请求; 真正的业务逻辑,交给workerGroup完成;
- // 3 两个线程组都是无限循环
- // 4 bossGroup 和 workerGroup 含有的子线程(NIOEventLoop)个数
- // 默认是 cpu核数 * 2
- EventLoopGroup boosGroup = new NioEventLoopGroup();
- EventLoopGroup workerGruop = new NioEventLoopGroup();
-
- try {
- // 创建服务器端的启动对象, 配置参数
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(boosGroup, workerGruop) // 设置2个线程组
- .channel(NioServerSocketChannel.class) // 使用NIOSocketChannel 作为服务器的通道实现
- .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
- .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
- .childHandler(new ChannelInitializer
() { // 创建一个通道初始化对象 - // 给 pipeline 设置处理器
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new SimpleNettyServerHandler45());
- }
- }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
- System.out.println("... server is ready.");
-
- // 启动服务器, 绑定端口并同步处理 ,生成一个 ChannelFuture对象
- ChannelFuture channelFuture = bootstrap.bind(6668).sync();
- channelFuture.addListener((future1) -> System.out.println("Finish binding"));
- // 对关闭通道进行监听
- channelFuture.channel().closeFuture().sync();
- } finally {
- // 优雅关闭
- boosGroup.shutdownGracefully();
- workerGruop.shutdownGracefully();
- }
- }
- }
2)netty服务器中的处理器(封装到管道)
- /**
- * @Description netty服务器处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年08月27日
- */
- public class SimpleNettyServerHandler45 extends ChannelInboundHandlerAdapter {
-
- // 读写数据事件(读取客户端发送的消息)
- // 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
- // 2. Object msg: 客户端发送的数据,默认是 Object
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("server ctx = " + ctx);
-
- System.out.println("查看 channel 和 pipeline的关系 ");
- Channel channel = ctx.channel();
- ChannelPipeline channelPipeline = ctx.pipeline(); // 管道是双向链表,出栈入栈
-
-
- // 将 msg 转为 ByteBuf 字节缓冲
- // 这个 ByteBuf 是 netty提供的, 不是 nio的ByteBuffer
- ByteBuf buf = (ByteBuf) msg;
- System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));
- System.out.println("客户端地址:" + ctx.channel().remoteAddress());
- }
-
- // 数据读取完毕,回复客户端
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- // writeAndFlush 是 write + flush ;把数据写入到缓冲,并刷新
- ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
- channelFuture.addListener(future -> System.out.println("回复成功"));
- }
-
- // 处理异常,关闭通道
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.channel().close();
- }
- }
1)netty客户端
- /**
- * @Description netty客户端
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年08月27日
- */
- public class SimpleNettyClient46 {
-
- public static void main(String[] args) throws InterruptedException {
- // 客户端需要一个事件循环组
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- try {
- // 创建客户端启动对象, 注意是 BootStrap
- Bootstrap bootstrap = new Bootstrap();
- // 设置相关参数
- bootstrap.group(eventLoopGroup) // 设置线程组
- .channel(NioSocketChannel.class) // 设置客户端通道实现类(反射)
- .handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new SimpleNettyClientHaandler()); // 加入自己的处理器
- }
- });
- System.out.println("client is ok");
- // 启动客户端去连接服务器
- // ChannelFuture, 设计到netty的异步模型
- ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
- // 给关闭通道进行监听
- channelFuture.channel().closeFuture().sync();
- } finally {
- eventLoopGroup.shutdownGracefully();
- }
- }
- }
2)netty客户端中的处理器
- /**
- * @Description netty客户端处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年08月27日
- */
- public class SimpleNettyClientHaandler extends ChannelInboundHandlerAdapter {
-
- // 当通道就绪就会触发该方法
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("client " + ctx);
- ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", StandardCharsets.UTF_8));
- }
-
- // 当通道有读取事件时,会触发
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("服务器回复消息:" + byteBuf.toString(StandardCharsets.UTF_8));
- System.out.println("服务器地址:" + ctx.channel().remoteAddress());
- }
-
- // 捕获异常
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
server端:
... server is ready.
server ctx = ChannelHandlerContext(SimpleNettyServerHandler45#0, [id: 0x2995c48c, L:/127.0.0.1:6668 - R:/127.0.0.1:49427])
查看 channel 和 pipeline的关系
客户端发送消息:hello, 服务器
客户端地址:/127.0.0.1:49427
client端:
client is ok
client ChannelHandlerContext(SimpleNettyClientHaandler#0, [id: 0xe92524d8, L:/127.0.0.1:49427 - R:/127.0.0.1:6668])
服务器回复消息:hello, 客户端
服务器地址:/127.0.0.1:6668