• Netty入门——组件(EventLoop)


    一、EventLoop(事件循环对象)

    1.1、EventLoop(事件循环对象)的概述

    • EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

    1.2、EventLoop(事件循环对象)的继承关系

    • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
      在这里插入图片描述
    • 另一条线是继承自 netty 自己的 OrderedEventExecutor
      在这里插入图片描述

    二、EventLoopGroup (事件循环组)

    2.1、EventLoopGroup (事件循环组)的概述

    • EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

    2.2、EventLoopGroup (事件循环组)的继承关系

    • 继承自 netty 自己的 EventExecutorGroup
      (1)、实现了 Iterable 接口提供遍历 EventLoop 的能力;
      (2)、另有 next 方法获取集合中下一个 EventLoop;
      在这里插入图片描述

    三、EventLoopGroup (事件循环组)代码示例

    3.1、获取事件循环对象代码示例

    • 引入pom依赖

       <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-all</artifactId>
           <version>4.1.39.Final</version>
       </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 代码示例

      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.util.concurrent.EventExecutor;
      public class EventLoopTest {
          public static void main(String[] args) {
              // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务
              NioEventLoopGroup group = new NioEventLoopGroup(2);
              // 1. 创建事件循环组;DefaultEventLoopGroup只能提交普通任务和定时任务
              //DefaultEventLoopGroup group = new DefaultEventLoopGroup(2);
              // 2. 第一种方式:获取下一个事件循环对象
              System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
              System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
              System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
              System.out.println("第一种方式:获取下一个事件循环对象===="+group.next());
              // 2. 第二种方式:通过遍历获取下一个事件循环对象
              for(EventExecutor eventLoopGroup :group){
                  System.out.println("第二种方式:通过遍历获取下一个事件循环对象=="+eventLoopGroup);
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • 输出结果
      在这里插入图片描述

    3.2、执行普通任务代码示例

    • 引入pom依赖

       <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-all</artifactId>
           <version>4.1.39.Final</version>
       </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 代码示例

      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.util.concurrent.EventExecutor;
      public class EventLoopTest {
          public static void main(String[] args) {
              // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务
              NioEventLoopGroup group = new NioEventLoopGroup(2);
              // 2. 执行普通任务
              group.next().execute(()->{
                  try {
                      Thread.sleep(1000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  System.out.println("=====普通任务=====");
              });
              System.out.println("主线程");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    • 输出结果
      在这里插入图片描述

    3.3、执行定时任务代码示例

    • 引入pom依赖

       <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-all</artifactId>
           <version>4.1.39.Final</version>
       </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 代码示例

      import io.netty.channel.nio.NioEventLoopGroup;
      import java.util.concurrent.TimeUnit;
      public class EventLoopTest {
          public static void main(String[] args) {
              // 1. 创建事件循环组;NioEventLoopGroup即可处理io事件,又能提交普通任务和定时任务
              NioEventLoopGroup group = new NioEventLoopGroup(2);
              // 4. 执行定时任务
              group.next().scheduleAtFixedRate(()->{
                  System.out.println("定时任务");
              },0,1, TimeUnit.SECONDS);
              System.out.println("主线程");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
    • 输出结果
      在这里插入图片描述

    3.4、处理 io 事件

    3.4.1、引入pom依赖
    • 引入pom依赖

       <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-all</artifactId>
           <version>4.1.39.Final</version>
       </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    3.4.2、服务端代码示例
    • 服务端代码示例

      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import lombok.extern.slf4j.Slf4j;
      import java.nio.charset.Charset;
      /**
       * @description: 处理io任务 服务端
       * @author: xz
       */
      @Slf4j
      public class EventLoopServer {
          public static void main(String[] args) {
              //1、服务端启动器:负责组装netty组件
              new ServerBootstrap()
                      //2、将EventLoop分为boss和worker(即将EventLoop分工细化)
                      // boss即第1个参数,只负责accept事件; 
                      // worker即第2个参数,只负责socketChannel上的读写
                      .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                      //3、选择服务器的 ServerSocketChannel 实现
                      .channel(NioServerSocketChannel.class)
                      //4、添加服务端处理器
                      .childHandler(
                          // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                          new ChannelInitializer<NioSocketChannel>() {
                          @Override
                          protected void initChannel(NioSocketChannel ch) throws Exception {
                              //6、添加具体 handler
                              ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                                  @Override
                                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                      //msg转ByteBuf
                                      ByteBuf buf = (ByteBuf) msg;
                                      //ByteBuf转字符串
                                      log.debug(buf.toString(Charset.defaultCharset()));
                                      //让消息传递给下一个handler
                                      ctx.fireChannelRead(msg);
                                  }
                              });
                          }
                      })
                      //7、绑定监听端口
                      .bind(8080);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
    3.4.3、客户端代码示例
    • 客户端代码示例

      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBufAllocator;
      import io.netty.channel.Channel;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import io.netty.handler.codec.string.StringEncoder;
      import lombok.extern.slf4j.Slf4j;
      import java.net.InetSocketAddress;
      /**
       * @description: 处理io任务 客户端
       * @author: xz
       */
      @Slf4j
      public class EventLoopClient {
          public static void main(String[] args) throws InterruptedException {
              // 1. 客户端启动器
              Channel channel = new Bootstrap()
                      // 2. 添加 EventLoop(事件循环)
                      .group(new NioEventLoopGroup(1))
                      // 3. 选择客户端的 SocketChannel 实现
                      .channel(NioSocketChannel.class)
                      // 4. 添加客户端处理器
                      .handler(new ChannelInitializer<NioSocketChannel>() {
                          // 在连接建立后被调用
                          @Override
                          protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                              //9. 消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
                              nioSocketChannel.pipeline().addLast(new StringEncoder());
                          }
                      })
                      //5. 连接到服务器
                      .connect(new InetSocketAddress("localhost", 8080))
                      //6. 等待 connect 建立连接完毕
                      .sync()
                      //7. 连接对象
                      .channel();
              System.out.println("打印channel对象==="+channel);
              //8. 发送数据
              channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));
              Thread.sleep(2000);
              channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("aaaaaa".getBytes()));
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
    3.4.4、分别启动3次客户端
    • 第一次启动:先启动服务端,再启动客户端,输出结果如下:

      注:第一次启动使用线程名称为nioEventLoopGroup-3-1的进行处理在这里插入图片描述

    • 第二次启动:重新启动客户端,输出结果如下:

      注:第二次启动使用线程名称为nioEventLoopGroup-3-2的进行处理
      在这里插入图片描述

    • 第三次启动:重新启动客户端,输出结果如下:
      注:第三次启动仍然使用线程名称为nioEventLoopGroup-3-1的进行处理
      在这里插入图片描述

    3.4.5、分别启动3次客户端,查看处理结果
    • 由3.4.4步骤可知,可以看到两个线程(即工人)轮流处理 channel,但线程(即工人)与 channel 之间进行了绑定;
      在这里插入图片描述

    3.5、增加两个非 nio 工人处理 io 事件

    3.5.1、引入pom依赖
    • 引入pom依赖

       <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-all</artifactId>
           <version>4.1.39.Final</version>
       </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    3.5.2、服务端代码示例
    • 服务端代码示例

      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.DefaultEventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import lombok.extern.slf4j.Slf4j;
      
      import java.nio.charset.Charset;
      
      /**
       * @description: EventLoop处理io任务 服务端
       * @author: xz
       */
      @Slf4j
      public class EventLoopServer {
          public static void main(String[] args) {
              //创建一个独立的EventLoopGroup
              DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);
              //1、服务端启动器:负责组装netty组件
              new ServerBootstrap()
                      //2、将EventLoop分为boss和worker(即将EventLoop分工细化)
                      // boss即第1个参数,只负责accept事件; worker即第2个参数,只负责socketChannel上的读写
                      .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                      //3、选择服务器的 ServerSocketChannel 实现
                      .channel(NioServerSocketChannel.class)
                      //4、添加服务端处理器
                      .childHandler(
                          // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                          new ChannelInitializer<NioSocketChannel>() {
                          @Override
                          protected void initChannel(NioSocketChannel ch) throws Exception {
                              //6、添加具体 handler
                              ch.pipeline().addLast(normalWorkers,"myhandler", new ChannelInboundHandlerAdapter() {
                                  @Override
                                  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                      //msg转ByteBuf
                                      ByteBuf buf = (ByteBuf) msg;
                                      //ByteBuf转字符串
                                      log.debug(buf.toString(Charset.defaultCharset()));
                                      //让消息传递给下一个handler
                                      ctx.fireChannelRead(msg);
                                  }
                              });
                          }
                      })
                      //7、绑定监听端口
                      .bind(8080);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
    3.5.3、客户端代码示例
    • 客户端代码与3.4.3步骤中代码无变化
    3.5.4、分别启动3次客户端
    • 第一次启动:先启动服务端,再启动客户端,输出结果如下:

      注:第一次启动使用线程名称为defaultEventLoopGroup-2-1的进行处理在这里插入图片描述

    • 第二次启动:重新启动客户端,输出结果如下:

      注:第二次启动使用线程名称为defaultEventLoopGroup-2-2的进行处理
      在这里插入图片描述

    • 第三次启动:重新启动客户端,输出结果如下:
      注:第三次启动仍然使用线程名称为defaultEventLoopGroup-2-1的进行处理
      在这里插入图片描述

    3.5.5、分别启动3次客户端,查看处理结果
    • 由3.5.4步骤可知,可以看到非 nio 工人也分别绑定了 channel,我们自己的 myhandler 由非 nio 工人执行)
      在这里插入图片描述
  • 相关阅读:
    P8193 [USACO22FEB] 高维前缀和
    go语言window|mac|linux下交叉编译其他平台的软件包
    [极致用户体验] 我又来帮掘金修专栏bug了,顺便教你个超牛逼的分割线CSS!
    一起Talk Android吧(第四百零一回:如何使用TableLayout布局)
    [尚硅谷React笔记]——第3章 React应用(基于React脚手架)
    循序渐进介绍基于CommunityToolkit.Mvvm 和HandyControl的WPF应用端开发(5) -- 树列表TreeView的使用
    力扣(leetcode)第485题最大连续1的个数(Python)
    20_Vue如何监测数组类型数据发生改变的?
    Spring5 自定义标签开发
    前后端分离使用RSA加密
  • 原文地址:https://blog.csdn.net/li1325169021/article/details/127605452