• 7.提交任务到 NioEventLoop(Nio事件循环执行线程)


    【README】

    1.本文总结自 B站 《尚硅谷-netty》;

    2.NioEventLoop实际上是一个提交到线程池的Runnable任务,在while无限循环中运行 taskQueue中的任务(串行);


    【1】提交任务到NioEventLoop

    1)NioEventLoop:

    • 表示一个不断循环执行的Runnable任务,每个NioEventLoop都有一个selector,用于监听绑定在其上的 socket 网络通道;

    2)NioEventLoop 内部采用串行化设计:

    • 从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责;

    3)NioEventLoopGroup 下包含多个 NioEventLoop ;

    • 每个 NioEventLoop 中包含有一个 Selector,一个taskQueue ;
    • 每个 NioEventLoop 的 Selector 上可以注册监听多个NioChannel ;
    • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上;
    • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline ;

    4)任务提交到 NioEventLoop 后,实际会添加到 taskQueue ; 

    • taskQueue的访问路径如下:
      • ChannelHandlerContext ->
      • DefaultChannelPipeline  ->
      • NioSocketChannel ->
      • NioEventLoop ->
      • taskQueue (任务队列);

    【1.1】场景1-在 netty server的handler中提交普通任务

    1. /**
    2. * @Description netty服务器处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月27日
    6. */
    7. public class SimpleNettyServerHandler45 extends ChannelInboundHandlerAdapter {
    8. // 读写数据事件(读取客户端发送的消息)
    9. // 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
    10. // 2. Object msg: 客户端发送的数据,默认是 Object
    11. @Override
    12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    13. System.out.println("server ctx = " + ctx);
    14. System.out.println("查看 channel 和 pipeline的关系 ");
    15. Channel channel = ctx.channel();
    16. ChannelPipeline channelPipeline = ctx.pipeline(); // 管道是双向链表,出栈入栈
    17. // 将 msg 转为 ByteBuf 字节缓冲
    18. // 这个 ByteBuf 是 netty提供的, 不是 nio的ByteBuffer
    19. ByteBuf buf = (ByteBuf) msg;
    20. System.out.println("客户端发送消息:" + buf.toString(StandardCharsets.UTF_8));
    21. System.out.println("客户端地址:" + ctx.channel().remoteAddress());
    22. // 同步任务1
    23. // 业务场景: 有一个耗时长的业务 -> 异步执行 -> 提交该 channel对应的 NIOEventLoop 的 taskQueue 中;
    24. // Thread.sleep(10 * 1000);
    25. // ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是同步任务1", StandardCharsets.UTF_8));
    26. // System.out.println("go on.....");
    27. // 以上耗时操作的解决方案1:用户程序自定义的普通任务
    28. // 异步任务2
    29. ctx.channel().eventLoop().execute(new Runnable() {
    30. @Override
    31. public void run() {
    32. try {
    33. Thread.sleep(10 * 1000);
    34. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是异步任务2-休眠10s", StandardCharsets.UTF_8));
    35. } catch (Exception e) {
    36. System.out.println("发生异常, " + e.getMessage());
    37. }
    38. }
    39. });
    40. // 异步任务3
    41. ctx.channel().eventLoop().execute(new Runnable() {
    42. @Override
    43. public void run() {
    44. try {
    45. Thread.sleep(20 * 1000);
    46. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是异步任务3-休眠20s", StandardCharsets.UTF_8));
    47. } catch (Exception e) {
    48. System.out.println("发生异常, " + e.getMessage());
    49. }
    50. }
    51. });

    【1.2】场景2-在 netty server的handler中提交定时任务

    1. // 异步任务2和异步任务3添加到同一个任务队列,由同一个线程来运行,所以异步任务2阻塞10s,而异步任务3会阻塞30s(10+20)
    2. System.out.println("异步任务 go on.....");
    3. // 用户自定义定时任务 -》 定时任务提交到 scheduledTaskQueue 中
    4. ctx.channel().eventLoop().schedule(new Runnable() {
    5. @Override
    6. public void run() {
    7. try {
    8. Thread.sleep(20 * 1000);
    9. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端,我是定时任务1-休眠5s", StandardCharsets.UTF_8));
    10. } catch (Exception e) {
    11. System.out.println("发生异常, " + e.getMessage());
    12. }
    13. }
    14. }, 5, TimeUnit.SECONDS);
    15. System.out.println("定时任务 go on.....");

    【代码解说】

    • 以上任务提交后,实际会添加到 taskQueue ; 
    • taskQueue的访问路径如下:
      • ChannelHandlerContext ->
      • DefaultChannelPipeline  ->
      • NioSocketChannel ->
      • NioEventLoop ->
      • taskQueue (任务队列);
    • 每个NioEventLoop只能使用1个独立线程运行任务队列中的任务,即多个任务串行执行

    【补充】taskQueue目录树:

  • 相关阅读:
    redis学习(008 实战:黑马点评:缓存介绍)
    PostGIS导入shp文件报错:dbf file (.dbf) can not be opened.
    Flask-[实现websocket]-(2): flask-socketio文档学习
    好书分享:《精装版|VirtualLab Fusion高速物理光学软件中文手册》
    ES 8.x 向量检索性能测试 & 把向量检索性能提升100倍!
    3.17 haas506 2.0开发教程-example - 低功耗模式 (2.2版本接口有更新)
    数据库变更时,OceanBase如何自动生成回滚 SQL
    ArcGIS Maps SDK for JS:监听按钮点击事件控制图层的visible属性
    计算机毕业设计node+vue基于微信小程序的西餐外卖系统
    nvidia nx onnx转trt模型报错
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126574602