• 6.netty线程模型-Reactor


    【README】

    1..本文部分内容翻译自:

    [Netty] Netty's thread model and simple usage

    2.netty模型是以 Reactor模式为基础的,具体的,netty使用的是 主从Reactor多线程模型

    3.先介绍了 Reactor线程模型;后介绍了 Netty 组成部分;

    4.文末还po出了 netty服务器与客户端代码实现(总结自 B站《尚硅谷-netty》);

    【注意】

    • 本文所有图片均转自:  

    [Netty] Netty's thread model and simple usage


    【1】netty线程模型-Reactor

    1)Reactor模式

    • reactor模式,也叫分发器模式。当1个或多个请求同时发送到服务器时,服务器同步地把它们分发到独立的处理线程;

    2)Reactor模式的三种角色:

    • Acceptor: (接收器)处理客户端新连接,并分配客户端分发到处理线程链;
    • Reacor:(反应器)负责监听,分配事件及IO事件给对应处理器Handler;
    • Handler:(处理器)处理事件,如编码,业务逻辑处理,解码等;

    3)Reactor的3种线程模型

    • 单Reactor单线程模型;
    • 单Reactor多线程模型;
    • 主从Reactor多线程模型;

    netty 使用的是 主从 Reactor多线程模型

    4)除建立连接外,服务器处理客户端请求涉及的操作大致有5种:

    • IO读;
    • 编码;
    • 计算;
    • 解码;
    • IO写;

    【2】Reactor3种线程模型

    【2.1】单Reactor单线程模型

    1)单Reactor单线程模型概述:

    • 在该线程模型下,所有请求连接的建立,IO读写,及业务逻辑处理都全部在同一个线程;
    • 若耗时操作发生在业务逻辑处理,则所有的请求都会延迟 与阻塞,因为这个线程上的所有操作都是同步的;这一点应该比较好理解;

    2)模型图

    【图解】

    • 服务器 仅 有一个  Reactor 线程;
    •  IO读,解码,计算,编码,发送(IO写)这5个步骤都由单个Reactor 线程来完成,一旦有一个步骤耗时过长,则整体阻塞

    【优缺点】

    • 缺点:因为只有一个Reactor线程, 请求1耗时长会导致其他请求阻塞直到请求1处理完成服务器并发性非常低

    【2.2】单Reactor多线程模型

    1)单Reactor多线程模型概述:

    • 为了防止阻塞,请求连接的建立(包括认证与授权)及IO读写都在 Reactor线程里;
    • 此外,业务逻辑处理完全由异步线程池负责处理,并在处理后把结果写回;

    2)模型图

    【图解】

    • 服务器 仅 有一个  Reactor 线程;
    •  IO读与发送(IO写)由 Reactor线程负责执行;
    • 解码,计算,编码等业务逻辑处理由 工作线程池(WorkThreads)负责分配线程执行;

    【优缺点】

    • 优点:解码,计算,编码等耗时操作若有阻塞,不会导致服务器的其他请求阻塞;
    • 缺点:仅有一个 Reactor线程来处理客户端连接,若客户端IO读写数据量大,容易在IO读写时发生阻塞

     【2.3】主从Reactor多线程模型

    1)主从Reactor多线程模型概述:由于单Reactor线程会降低多核cpu能力(或未能发挥),所以需要建立多Reactor线程,即主从Reactor线程模型

    • 主Reactor线程(单个):用于建立请求的连接(包括认证与授权);
    • 从Reactor线程(多个):用于处理IO读写;
    • 补充:其他业务逻辑如解码,计算,编码等工作还是交由异步线程池(Worker线程池)处理;

     【图解】主从Reactor多线程模型下,服务器线程有3类:

    • 第1类:主Reactor线程负责客户端连接的建立(同步);
    • 第2类:子Reactor线程有多个,封装在线程池,异步处理客户端的IO读写(read 与 send);
    • 第3类:工作线程池:异步处理业务逻辑,包括 编码,计算,解码等操作;

    【优缺点】

    • 优点1:显然,IO读写若有阻塞,不会导致其他客户端请求阻塞;
    • 优点2:显然,业务逻辑若有阻塞(耗时操作),也不会导致其他客户端请求阻塞;
    • 小结:在主从Reactor多线程模型条,服务器的并发性非常高(也充分利用了多核cpu的算力);

    【3】netty线程模型

     【3.1】成员关系

    1)netty线程模型包含 ServerBootStrap,NioEventLoopGroup 及其组成部分 NioEventLoop,NioChannel, ChannelPipeline, ChannelHandler 等;

    • ServerBootStrap:服务器启动引导对象;
    • NioEventLoopGroup:Nio 事件循环组;
    • NioEventLoop:Nio事件循环;
    • NioChannel:Nio通道;
    • ChannelPipeline:通道管道(封装了多个处理器);
    • ChannelHandler:处理器;

    2)netty使用主从Reactor多线程模型来实现。

    • 主Reactor:对应 BossGroup;
    • 从(子)Reactor:对应 WorkerGroup;
    • BossGroup 用于接收连接并通过通道注册 与 WorkerGroup已建立的连接;
    • 当IO事件被触发,该事件交由管道处理;管道处理实际上是由对应的处理器Handler来处理;

    【3.2】EventLoop 事件循环

    1)即 事件循环器;顾名思义,他实际上是一个循环的处理过程;

    2)EventLoop 等同于 while(true)  死循环里的代码段;

    3)EventLoop 主要包含了一个选择器多路复用器和一个任务队列

    • 选择器多路复用器:处理IO事件的;
    • 任务队列:存储已提交任务;

    4)EventLoop并不是从程序启动时就开始运行,而是当有任务提交时,它才会开始处理任务并一直运行;


    【3.3】通道 channel

    • 1)netty封装了 java的本地Channel,并引入了 管道与通道处理器的概念。
    • 2)通道具体的IO事件处理是通过管道和通道处理器(管道中)来完成的;


    【4】netty服务器与客户端-代码实现

    1)初始化:

    • 服务器通过 ServerBootStrap,可以直接设置线程组;
    • 服务器的bossGroup(boss线程组)用于处理 NioServerSocketChannel 通道的 Accept 事件,即处理请求连接与认证;
    • 服务器的 workerGroup(工作线程组)用于处理 IO 读写事件及已提交的异步任务;

    2)ServerBootStrap初始化 代码实现:

    1. ServerBootstrap bootstrap = new ServerBootstrap();
    2. EventLoopGroup boss = new NioEventLoopGroup(1); // 设置boss线程组中的线程个数为1 (默认是cpu核数*2)
    3. EventLoopGroup work = new NioEventLoopGroup(8); // 设置worker线程组中的线程个数为8 (默认是cpu核数*2)
    4. bootstrap.group(boss, work).channel(NioServerSocketChannel.class); // 把参数设置到bootstrap

    【4.1】服务器

    1)netty服务器

    1. /**
    2. * @Description 简单netty服务器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月27日
    6. */
    7. public class SimpleNettyServer44 {
    8. public static void main(String[] args) throws InterruptedException {
    9. // 创建 BossGroup 和 WorkerGroup
    10. // 1. 创建2个线程组 bossGroup, workerGroup
    11. // 2 bossGroup 仅处理连接请求; 真正的业务逻辑,交给workerGroup完成;
    12. // 3 两个线程组都是无限循环
    13. // 4 bossGroup 和 workerGroup 含有的子线程(NIOEventLoop)个数
    14. // 默认是 cpu核数 * 2
    15. EventLoopGroup boosGroup = new NioEventLoopGroup();
    16. EventLoopGroup workerGruop = new NioEventLoopGroup();
    17. try {
    18. // 创建服务器端的启动对象, 配置参数
    19. ServerBootstrap bootstrap = new ServerBootstrap();
    20. bootstrap.group(boosGroup, workerGruop) // 设置2个线程组
    21. .channel(NioServerSocketChannel.class) // 使用NIOSocketChannel 作为服务器的通道实现
    22. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
    23. .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
    24. .childHandler(new ChannelInitializer() { // 创建一个通道初始化对象
    25. // 给 pipeline 设置处理器
    26. @Override
    27. protected void initChannel(SocketChannel socketChannel) throws Exception {
    28. socketChannel.pipeline().addLast(new SimpleNettyServerHandler45());
    29. }
    30. }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
    31. System.out.println("... server is ready.");
    32. // 启动服务器, 绑定端口并同步处理 ,生成一个 ChannelFuture对象
    33. ChannelFuture channelFuture = bootstrap.bind(6668).sync();
    34. channelFuture.addListener((future1) -> System.out.println("Finish binding"));
    35. // 对关闭通道进行监听
    36. channelFuture.channel().closeFuture().sync();
    37. } finally {
    38. // 优雅关闭
    39. boosGroup.shutdownGracefully();
    40. workerGruop.shutdownGracefully();
    41. }
    42. }
    43. }

    2)netty服务器中的处理器(封装到管道)

    1. /**
    2. * @Description netty服务器处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月27日
    6. */
    7. public class SimpleNettyServerHandler45 extends ChannelInboundHandlerAdapter {
    8. // 读写数据事件(读取客户端发送的消息)
    9. // 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
    10. // 2. Object msg: 客户端发送的数据,默认是 Object
    11. @Override
    12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    13. System.out.println("server ctx = " + ctx);
    14. System.out.println("查看 channel 和 pipeline的关系 ");
    15. Channel channel = ctx.channel();
    16. ChannelPipeline channelPipeline = ctx.pipeline(); // 管道是双向链表,出栈入栈
    17. // 将 msg 转为 ByteBuf 字节缓冲
    18. // 这个 ByteBuf 是 netty提供的, 不是 nio的ByteBuffer
    19. ByteBuf buf = (ByteBuf) msg;
    20. System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));
    21. System.out.println("客户端地址:" + ctx.channel().remoteAddress());
    22. }
    23. // 数据读取完毕,回复客户端
    24. @Override
    25. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    26. // writeAndFlush 是 write + flush ;把数据写入到缓冲,并刷新
    27. ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
    28. channelFuture.addListener(future -> System.out.println("回复成功"));
    29. }
    30. // 处理异常,关闭通道
    31. @Override
    32. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    33. ctx.channel().close();
    34. }
    35. }

    【4.2】客户端

    1)netty客户端

    1. /**
    2. * @Description netty客户端
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月27日
    6. */
    7. public class SimpleNettyClient46 {
    8. public static void main(String[] args) throws InterruptedException {
    9. // 客户端需要一个事件循环组
    10. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    11. try {
    12. // 创建客户端启动对象, 注意是 BootStrap
    13. Bootstrap bootstrap = new Bootstrap();
    14. // 设置相关参数
    15. bootstrap.group(eventLoopGroup) // 设置线程组
    16. .channel(NioSocketChannel.class) // 设置客户端通道实现类(反射)
    17. .handler(new ChannelInitializer() {
    18. @Override
    19. protected void initChannel(SocketChannel socketChannel) throws Exception {
    20. socketChannel.pipeline().addLast(new SimpleNettyClientHaandler()); // 加入自己的处理器
    21. }
    22. });
    23. System.out.println("client is ok");
    24. // 启动客户端去连接服务器
    25. // ChannelFuture, 设计到netty的异步模型
    26. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    27. // 给关闭通道进行监听
    28. channelFuture.channel().closeFuture().sync();
    29. } finally {
    30. eventLoopGroup.shutdownGracefully();
    31. }
    32. }
    33. }

    2)netty客户端中的处理器

    1. /**
    2. * @Description netty客户端处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月27日
    6. */
    7. public class SimpleNettyClientHaandler extends ChannelInboundHandlerAdapter {
    8. // 当通道就绪就会触发该方法
    9. @Override
    10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    11. System.out.println("client " + ctx);
    12. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", StandardCharsets.UTF_8));
    13. }
    14. // 当通道有读取事件时,会触发
    15. @Override
    16. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    17. ByteBuf byteBuf = (ByteBuf) msg;
    18. System.out.println("服务器回复消息:" + byteBuf.toString(StandardCharsets.UTF_8));
    19. System.out.println("服务器地址:" + ctx.channel().remoteAddress());
    20. }
    21. // 捕获异常
    22. @Override
    23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    24. cause.printStackTrace();
    25. ctx.close();
    26. }
    27. }

     【4.3】演示效果

    server端:

    ... server is ready.
    server ctx = ChannelHandlerContext(SimpleNettyServerHandler45#0, [id: 0x2995c48c, L:/127.0.0.1:6668 - R:/127.0.0.1:49427])
    查看 channel 和 pipeline的关系
    客户端发送消息:hello, 服务器
    客户端地址:/127.0.0.1:49427

    client端:

    client is ok
    client ChannelHandlerContext(SimpleNettyClientHaandler#0, [id: 0xe92524d8, L:/127.0.0.1:49427 - R:/127.0.0.1:6668])
    服务器回复消息:hello, 客户端
    服务器地址:/127.0.0.1:6668


    【5】 netty小结

    • netty使用了 主从Reactor多线程模型;
    • EventLoop是netty处理请求,io事件及其他操作的检测与分配(这句翻译的不好,最好查看原文);
    • netty引入了 通道Channel,管道 Pipeline,通道处理器 ChannelHandler 以异步处理任务;
    • netty提供了 ServerBootStrap 简化了管道初始化;

  • 相关阅读:
    LSTM和双向LSTM讲解及实践
    checkbox 设置默认值
    解密Prompt系列30. LLM Agent之互联网冲浪智能体
    用HTML+CSS做一个漂亮简单的个人网页——动漫网页【火影忍者】1个页面
    easyExcel生成个性化表格(自定义行高,合并,字体,去网格线),前后端分离开发下,返回错误的JSON数据给前端
    如何通过CRM系统做好客户的分级分类
    C++位图简明介绍与实现
    12.2-12.4总结
    JavaScript【DOM概述、节点、节点树 、Node.nodeType属性 、document对象(属性、方法/获取元素、方法/创建元素)、Element对象属性】(十)
    Elasticsearch - Java API 操作 ES7.16.0+、ES8.x 索引,文档;高级搜索(八)
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126557854