1.本文总结自B站《netty-尚硅谷》,很不错;
2.本文po出了 Unpooled创建缓冲区的 代码示例;
3.本文示例代码基于netty实现以下功能:
Unpooled定义:
1)Unpooled.buffer(capacity) 定义:
- public static ByteBuf buffer(int initialCapacity) {
- return ALLOC.heapBuffer(initialCapacity);
- }
代码示例 :
- public class NettyByteBuf61 {
- public static void main(String[] args) {
- // 创建一个对象,该对象包含一个数组 byte[10]
- // 在netty buf中,不需要像nio那样 执行flip 切换读写模式
- // 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置
- ByteBuf byteBuf = Unpooled.buffer(10);
- for (int i = 0; i < 10; i++) {
- byteBuf.writeByte(i); // writerIndex 自增
- }
- // 输出
- for (int i = 0; i < byteBuf.capacity(); i++) {
- System.out.printf(byteBuf.readByte() + " "); // readerIndex 自增
- // System.out.println(byteBuf.getByte(i));
- }
- // 查看 byteBuf 的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)
- System.out.println(byteBuf);
- }
- }
【代码解说】
运行结果:
0 1 2 3 4 5 6 7 8 9
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)
copiedBuffer(CharSequence string, Charset charset) 定义:
- public static ByteBuf copiedBuffer(CharSequence string, Charset charset) {
- if (string == null) {
- throw new NullPointerException("string");
- }
-
- if (string instanceof CharBuffer) {
- return copiedBuffer((CharBuffer) string, charset);
- }
-
- return copiedBuffer(CharBuffer.wrap(string), charset);
- }
代码示例
- public class NettyByteBuf62 {
- public static void main(String[] args) {
- // 通过 Unpooled.copiedBuffer 创建 buf缓冲区
- ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", StandardCharsets.UTF_8);
- // 1 使用相关方法-byteBuf.hasArray()
- if (byteBuf.hasArray()) {
- String content = new String(byteBuf.array(), StandardCharsets.UTF_8);
- System.out.println(content);
- // 查看ByteBuf的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 24)
- System.out.println("bytebuf = " + byteBuf);
- // 查看偏移量
- System.out.println("byteBuf.arrayOffset() = " + byteBuf.arrayOffset()); // 0
- // 查看 readerIndex
- System.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex()); // 0
- // 查看 writerIndex
- System.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex()); // 12
- // 查看 capacity
- System.out.println("byteBuf.capacity() = " + byteBuf.capacity());
- // 查看可读取的字节数量 12
- System.out.println("byteBuf.readableBytes() = " + byteBuf.readableBytes());
- // 使用for循环读取byteBuf
- for (int i = 0; i < byteBuf.readableBytes(); i++) {
- System.out.print((char)byteBuf.getByte(i));
- }
- System.out.println();
- // 读取 byteBuf 其中某一段,从下标4开始,读取6个字节
- CharSequence charSequence = byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8);
- System.out.println(charSequence);
- }
- }
- }
运行结果:
hello world
bytebuf = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33)
byteBuf.arrayOffset() = 0
byteBuf.readerIndex() = 0
byteBuf.writerIndex() = 11
byteBuf.capacity() = 33
byteBuf.readableBytes() = 11
hello world
o worl
需求描述:
1)群聊服务器代码
- /**
- * @Description netty群聊服务器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月03日
- */
- public class NettyGroupChatServer63 {
-
- private int port;
-
- public NettyGroupChatServer63(int port) {
- this.port = port;
- }
-
- public static void main(String[] args) {
- try {
- new NettyGroupChatServer63(8089).run();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void run() throws InterruptedException {
- // 创建两个线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- // 服务器启动引导对象
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- // 获取pipeline
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 添加解码处理器 编码器
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- // 添加业务处理handler
- pipeline.addLast(new NettyGroupChatServerHandler());
- }
- });
- ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
- System.out.println("netty服务器启动成功");
- // 监听关闭
- channelFuture.channel().closeFuture().sync();
- } finally {
- // 优雅关闭线程
- bossGroup.shutdownGracefully().sync();
- workerGroup.shutdownGracefully().sync();
- }
-
- }
- }
2)群聊服务器处理器
- /**
- * @Description netty服务器处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月03日
- */
- public class NettyGroupChatServerHandler extends SimpleChannelInboundHandler
{ -
- // 定义一个 channel 组,用于管理channel
- // GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
- private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-
- // 读取数据并转发
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- // 获取当前channel
- Channel channel = ctx.channel();
- // 遍历 channelGroup, 根据不同情况 回送不同消息
- channelGroup.forEach(otherChannel-> {
- if (channel != otherChannel) { // 非当前channel, 直接转发
- otherChannel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户 " + channel.remoteAddress() + "说:" + msg + "\n");
- } else { // 回显自己发送的消息给自己
- channel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "自己说:" + msg + "\n");
- }
- });
- }
-
- // 一旦连接建立,第一个被执行
- // 将当前channel 添加到channelGroup
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- // 把客户端加入群组的信息发送到其他客户端
- channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 加入聊天");
- // 把当前channel 添加到 channel 组
- channelGroup.add(channel);
- }
-
- // 表示 channel 处于活动状态, 提示 xx 上线
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 上线了");
- }
-
- // 表示 channel 处于离线状态, 提示 xx 离线
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 离开了");
- }
-
- // 断开连接,把xx客户离开的信息推送给其他在线客户
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- Channel channel = ctx.channel();
- channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 离开了");
- System.out.println("channelGroup.size() = " + channelGroup.size());
- }
-
- // 发送异常如何处理
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- // 关闭通道
- ctx.close();
- }
- }
1)群聊客户端代码:
- /**
- * @Description netty群聊客户端
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月03日
- */
- public class NettyGroupChatClient64 {
-
- /** 主机和端口 */
- private final String host;
- private final int port;
-
- /**
- * @description 构造器
- * @author xiao tang
- * @date 2022/9/3
- */
- public NettyGroupChatClient64(String host, int port) {
- this.host = host;
- this.port = port;
- }
-
- public static void main(String[] args) {
- try {
- new NettyGroupChatClient64("127.0.0.1", 8089).run();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void run() throws InterruptedException {
- // 事件运行的线程池
- EventLoopGroup eventExecutors = new NioEventLoopGroup();
- try {
- // 客户端启动引导对象
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(eventExecutors)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 添加解码器 编码器
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- // 添加业务逻辑的 handler
- pipeline.addLast(new NettyGroupChatClietnHandler());
- }
- });
- // 连接给定主机的端口,阻塞直到连接成功
- ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
- // 得到 channel
- Channel channel = channelFuture.channel();
- System.out.println("----------" + channel.localAddress() + "----------");
- // 客户端需要输入信息,创建一个扫描器
- Scanner scanner = new Scanner(System.in);
- while (scanner.hasNextLine()) {
- String msg = scanner.nextLine();
- // 通过channel 发送到服务器
- channel.writeAndFlush(msg);
- }
- } finally {
- // 关闭线程池,释放所有资源,阻塞直到关闭成功
- eventExecutors.shutdownGracefully().sync();
- }
-
- }
- }
2)群聊客户端处理器代码:
- /**
- * @Description netty群聊客户端处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月03日
- */
- public class NettyGroupChatClietnHandler extends SimpleChannelInboundHandler
{ - @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- System.out.println(msg.trim());
- }
- }
1)服务器与客户端: 服务器1个,客户端3个;

2)客户端离线:

1)netty定义的空闲状态事件:
Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed * read, write, or both operation for a while.当一个通道一段时间内没有执行 读,写,或读写操作时,就会触发 IdleStateEvent事件;
2)需求描述:
1)netty心跳检测服务器
- /**
- * @Description netty心跳检测服务器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月03日
- */
- public class NettyHeartbeatCheckServer66 {
- public static void main(String[] args) {
- try {
- new NettyHeartbeatCheckServer66().run();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void run() throws InterruptedException {
- // 创建线程池执行器
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup(8);
- try {
- // 服务器启动引导对象
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 128)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
- .childHandler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- // 添加处理器
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 1. 添加空闲状态处理器 :
- // readerIdleTime: 表示多长时间没有读入io事件,就会发送一个心跳检测包,检测是否连接状态
- // writerIdleTime: 表示多长时间没有写出io事件,就会发送一个心跳检测包,检测是否连接状态
- // allIdleTime: 表示多长时间没有读入和写出io事件,就会发送一个心跳检测包,检测是否连接状态
- // 2. 文档说明
- // Triggers an {@link IdleStateEvent } when a {@link Channel} has not performed
- // * read, write, or both operation for a while.
- // 3. 当 IdleStateEvent 事件触发后, 就会传递给管道的 下一个处理器 去处理
- // 通过调用下一个handler的 userEventTriggered 方法,即在该方法中处理IdleStateEvent 事件;
- pipeline.addLast(new IdleStateHandler(4, 5, 7, TimeUnit.SECONDS));
- // 添加一个对空闲检测 进一步处理的handler(自定义 )
- pipeline.addLast(new NettyHeartbeatCheckServerHandler());
- }
- });
- // 启动服务器,监听端口,阻塞直到启动成功
- ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
- // 阻塞直到channel关闭
- channelFuture.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully().sync();
- workerGroup.shutdownGracefully().sync();
- }
- }
- }
2)netty心跳检测服务器处理器
- /**
- * @Description netty心跳检测服务器处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月04日
- */
- public class NettyHeartbeatCheckServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event2 = (IdleStateEvent) evt;
- String eventType = ""; // 事件类型
- switch (event2.state()) {
- case READER_IDLE: eventType = "读空闲"; break;
- case WRITER_IDLE: eventType = "写空闲"; break;
- case ALL_IDLE: eventType = "读写空闲"; break;
- }
- System.out.println("客户端" + ctx.channel().remoteAddress() + "--超时事件--" + eventType);
- System.out.println("服务器做相应处理");
- // 如果发生空闲,马上关闭通道
- // System.out.println("一旦发生超时事件,则关闭 channel");
- // ctx.channel().close();
- }
- }
- }
1)以 NettyGroupChatClient64 作为客户端连接到 服务器 NettyHeartbeatCheckServer66;
2)打印结果如下:
- // 控制台打印结果
- 客户端/127.0.0.1:61278--超时事件--读空闲
- 服务器做相应处理
- 客户端/127.0.0.1:61278--超时事件--写空闲
- 服务器做相应处理
- 客户端/127.0.0.1:61278--超时事件--读写空闲
- 服务器做相应处理
- 客户端/127.0.0.1:61278--超时事件--读空闲
- 服务器做相应处理
- 客户端/127.0.0.1:61278--超时事件--写空闲
- 服务器做相应处理
- 客户端/127.0.0.1:61278--超时事件--读空闲
- 服务器做相应处理