EventLoop 本质是一个单线程执行器 (同时维护了一个Selector), 里面有run 方法处理一个或者多个channel 上源源不断的io事件。
它的继承关系如下:
EventLoopGroup 是一组EventLoop, Channel 一般会调用
EventLoopGroup 的register 方法来绑定其中一个 EventLoop, 后续这个Channel 上的 io事件都是由此EventLoop来处理 (保证了io事件处理时的线程安全)
代码:
package com.xlg.component.netty.eventLoop;
import java.util.concurrent.TimeUnit;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
/**
* @author wangqingwei
* Created on 2022-06-08
*/
public class TestEventLoop {
public static void main(String[] args) {
// 1. 创建事件循环组
/*
nthreads == 0 ? Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)) : nthreads;
*/
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件, 普通任务, 定时任务
// EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务, 定时任务
// 2. 获取下一个事件循环对象
// 1 3 相等 因为底层是循环走的
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 3. 指定普通任务
// group.next().submit(() -> System.out.println("ok"));
// 4. 定时任务
// 立即执行, 并且是1s一次
group.next().scheduleAtFixedRate(() -> System.out.println("ok"), 0, 1, TimeUnit.SECONDS);
}
}
执行结果:
io.netty.channel.nio.NioEventLoop@3d8c7aca
io.netty.channel.nio.NioEventLoop@5ebec15
io.netty.channel.nio.NioEventLoop@3d8c7aca
io.netty.channel.nio.NioEventLoop@5ebec15
ok
ok
ok
ok
关闭EventLoopGroup
优雅的关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup到关闭状态从而拒绝新的任务加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的.
package com.xlg.component.netty.eventLoop;
import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author wangqingwei
* Created on 2022-06-08
*/
public class EventLoopServer {
private static final Logger logger = LoggerFactory.getLogger(EventLoopServer.class);
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(StandardCharsets.UTF_8));
}
});
}
}).bind(NETTY_SERVER_PORT);
}
}
package com.xlg.component.netty.eventLoop;
import static com.xlg.component.nio.TestCommon.LOCAL_HOST;
import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
import java.net.InetSocketAddress;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author wangqingwei
* Created on 2022-06-08
*/
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT))
.sync()
.channel();
System.out.println(channel);
System.out.println("--");
channel.writeAndFlush("asdfasf");
}
}
Bootstrap 的group方法可以传入两个 EventLoopGroup参数,分别boss处理accept事件,work处理读写
new ServerBootstrap()
// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
...
多个客户端分别发送 hello 结果.
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4
其实可以看到, 一个EventLoop 可以负责多个Channel,且EventLoop 一旦与Channel绑定,则一直负责该Channel的事件.
增加自定义EventLoopGroup
问题:
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup, 避免同一个NioEventLoop 中的其他Channel在较长时间内都无法得到处理
package com.xlg.component.netty.eventLoop;
import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
import java.nio.charset.StandardCharsets;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author wangqingwei
* Created on 2022-06-09
*/
public class MyServer {
public static void main(String[] args) {
// 增加自定义的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(
Thread.currentThread().getName() + ": " + buf.toString(StandardCharsets.UTF_8));
// 调用下一个handler
ctx.fireChannelRead(msg);
}
})
// 该handler绑定自定义的Group
.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + ": " + buf
.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(NETTY_SERVER_PORT);
}
}
启动三个客户端发送数据
nioEventLoopGroup-4-1: asdf
defaultEventLoopGroup-2-1: asdf
nioEventLoopGroup-4-2: hel2
defaultEventLoopGroup-2-2: hel2
nioEventLoopGroup-4-1: adf333
defaultEventLoopGroup-2-3: adf333
可以看出,客户端与服务器之间的事件,被nioEventLoopGroup 和 defaultEventLoopGroup分别处理
切换的实现
**不同的EventLoopGroup 切换的实现原理如下: **
由上面的图可以看出,当handler 中绑定的Group不同时,需要切换Group来执行不同的任务
io.netty.channel.AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取下一个EventLoop, excutor 即为EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一个EventLoop 在当前的 EVentLoopGroup 中
if (executor.inEventLoop()) {
// 使用当前的 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.executor(new Runnable(){
public void run() {
next.invokeChannelRead(m);
}
});
}
}
Channel 的常用方法
同步异步获取建立连接后的 channel 和 发送数据.
package com.xlg.component.netty.eventLoop;
import static com.xlg.component.nio.TestCommon.LOCAL_HOST;
import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author wangqingwei
* Created on 2022-06-08
*/
public class EventLoopClient {
private static final Logger logger = LoggerFactory.getLogger(EventLoopClient.class);
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 1. 建立链接
// 异步非阻塞, main发起了调用, 真正执行的是 nio 线程.NioEventLoopGroup
.connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT));
// 2.1 使用sync同步处理结果
// 阻塞住当前线程, 直到nio线程连接完毕. 如果去掉sync此时, 由于连接未建立完毕, main执行, channel无法给服务器写数据
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// System.out.println(channel);
// System.out.println("--");
// channel.writeAndFlush("hello world");
// 2.2 异步addListener(回调对象)
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
final Channel channel = future.channel();
logger.debug("{}", channel);
channel.writeAndFlush("hello world !!");
}
});
}
}
package com.xlg.component.netty.eventLoop;
import static com.xlg.component.nio.TestCommon.LOCAL_HOST;
import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
import java.net.InetSocketAddress;
import java.util.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* @author wangqingwei
* Created on 2022-06-08
*/
public class CloseFutureClient {
private static final Logger logger = LoggerFactory.getLogger(CloseFutureClient.class);
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
// 1. 建立链接
// 异步非阻塞, main发起了调用, 真正执行的是 nio 线程.NioEventLoopGroup
.connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT));
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
// 还是异步的, 关闭交给另外一个线程处理
channel.close();
// thread1 输出
// logger.debug("关闭之后的操作!!1");
break;
}
channel.writeAndFlush(line);
}
}, "thread1").start();
// logger.debug("main end ");
// 获取closeFuture 关闭后的处理 1. 同步 2.异步
ChannelFuture closeFuture = channel.closeFuture();
// logger.debug("waiting close....");
// closeFuture.sync();
// logger.debug("处理关闭之后的操作...");
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.debug("处理关闭之后的操作...");
// 优雅的关闭NioEventLoopGroup 里面还有其他线程
group.shutdownGracefully();
}
});
}
}
提升的是 吞吐量: 单位时间内处理的请求个数
在异步处理时,经常用到这两个接口
netty 的Future 和 jdk 中Future ,但是是两个接口,属于继承。而Promise 对netty中future进行了扩展
jdk future
package com.xlg.component.netty.FutureAndPromise;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author wangqingwei
* Created on 2022-06-11
*/
public class TestJDKFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
//2. 执行
Future<Integer> future = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("执行计算!!1");
Thread.sleep(1000);
return 50;
}
});
//3. 获取结果
System.out.println("等待结果!11");
System.out.println(future.get());
}
}
netty future
package com.xlg.component.netty.FutureAndPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
/**
* @author wangqingwei
* Created on 2022-06-11
*/
public class TestNettyFuture {
private static final Logger logger = LoggerFactory.getLogger(TestNettyFuture.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop loop = group.next();
//2. 执行
Future<Integer> future = loop.submit(() -> {
logger.debug("执行计算中!");
Thread.sleep(1000);
return 50;
});
//3. 获取结果
logger.debug("等待结果!1!");
// 立即返回结果
logger.debug("{}", future.getNow());
// 同步返回结果 也就是阻塞
// logger.debug("{}", future.get());
// 异步返回结果
future.addListener(future1 -> logger.debug("接受结果: {}", future1.getNow()));
}
}
netty promise
package com.xlg.component.netty.FutureAndPromise;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
/**
* @author wangqingwei
* Created on 2022-06-11
*/
public class TestNettyPromise {
private static final Logger logger = LoggerFactory.getLogger(TestNettyPromise.class);
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop loop = group.next();
DefaultPromise<Object> promise = new DefaultPromise<>(loop);
new Thread(() -> {
logger.debug("开始计算!!");
try {
int i = 1 / 0;
Thread.sleep(1000);
promise.setSuccess(50);
} catch (Exception e) {
promise.setFailure(e);
}
}).start();
logger.debug("等待结果!!");
// logger.debug("接受结果: {}", promise.get());
logger.debug("接受结果: {}", promise.isSuccess());
}
}
代码:
package com.xlg.component.netty.Pipeline;
import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author wangqingwei
* Created on 2022-06-12
*/
public class TestPipelineServer {
private static final Logger logger = LoggerFactory.getLogger(TestPipelineServer.class);
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// socket中 是有pipeline类似一个处理流 双向链表
// 默认有head -> tail 插入的分为入站和出站
// head -> h1 -> h2 -> h3 -> h4 -> tail
ChannelPipeline pipeline = ch.pipeline();
// 入站的handler处理完数据, 还可以把处理的数据 msg 给到下一个hander
// 调用后一个handler处理: ctx.fireChannelRead(msg); == super.channelRead(ctx, msg);
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("1");
super.channelRead(ctx, "1 -> 变更数据");
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.debug("2, msg = {}", msg);
// 3. 执行write写出数据, 这样出站handler才能执行
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes(
StandardCharsets.UTF_8)));
super.channelRead(ctx, msg);
}
});
// 出站的顺序是从tail向前的
pipeline.addLast("h3", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
logger.debug("3");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
logger.debug("4");
super.write(ctx, msg, promise);
}
});
}
})
.bind(NETTY_SERVER_PORT);
}
}
运行结果:
2022-06-12 16:47:31,253 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 47] 1
2022-06-12 16:47:31,253 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 54] 2, msg = 1 -> 变更数据
2022-06-12 16:47:31,256 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 75] 4
2022-06-12 16:47:31,257 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 67] 3
当然因为有LoggerHanlder 在客户端, 有server给客户端发的数据.
server...
embeddedChannel 可以用于测试各个handler, 通过其构造函数按照顺序传入需要测试的handler, 然后调用对应的Inbound 和 Outbound方法即可.
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};
// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
该方法可以帮助我们更为详细地查看ByteBuf中的内容
package com.xlg.component.netty.ByteBuf;
import java.nio.charset.StandardCharsets;
import org.junit.Test;
import com.xlg.component.ks.utils.ByteBufUtl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
* @author wangqingwei
* Created on 2022-06-12
*/
public class ByteBufTest {
@Test
public void byteBufStudy(){
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// init
ByteBufUtl.log(buffer);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 300; i++) {
stringBuilder.append("a");
}
buffer.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
ByteBufUtl.log(buffer);
}
}
运行结果
结果:
ead index:0 write index:0 capacity:256
read index:0 write index:300 capacity:512
...
通过** ByteBufAllocator.DEFAULT.buffer() ** 创建byteBuf 默认大小 256,使用直接内存创建,也可以指定其大小.
并且我我们来看,其有扩容机制,这一点是NIO所没有的.
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建
@Test
public void directAndHeapByteBuf(){
// 默认是以直接内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
// 堆内存创建
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(16);
System.out.println(buf.getClass());
ByteBuf buffer2 = ByteBufAllocator.DEFAULT.directBuffer(16);
System.out.println(buffer2.getClass());
}
**结果: **
class io.netty.buffer.PooledUnsafeDirectByteBuf // 池化直接内存
class io.netty.buffer.PooledUnsafeHeapByteBuf // 池化堆内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
-Dio.netty.allocator.type={unpooled|pooled}
ByteBuf 主要有以下几个组成部分
常用方法:
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian(大端写入),即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian(小端写入),即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 | CharSequence为字符串类的父类,第二个参数为对应的字符集 |
注意
- 这些方法的未指明返回值的,其返回值都是byteBuf, 可以链式带哦用来写入不同的数据
- 网络传输中,默认习惯是 Big Endian, 使用 writeInt(int value)
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
buffer.writeInt(5);
ByteBufUtil.log(buffer);
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}
read index:0 write index:0 capacity:16
read index:0 write index:4 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
扩容
扩容规则:
还有方法是采用get开头的一系列方法,这些方法不会改变read index
retain& release
由于Netty 中有堆外内存(直接内存)的 byteBuf 实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
Netty 这里使用了引用计数法来控制 回收内存,每个ByteBuf 都实现了 ReferenceCounted 接口
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则: 谁是最后使用者,谁负责release
HeadContext.write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
while (!buffer.release()) {}
当ByteBuf 被传到了pipline的head 与tail 时,ByteBuf 会被其中的方法彻底释放,但前提是ByteBuf 被传递到了head 与 tail中
TailContext 释放源码
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具体的释放方法
ReferenceCountUtil.release(msg);
}
}
//判断是否是 ByteBuf 因为ByteBuf 实现了ReferenceCounted接口.
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
ByteBuf 切片是 【零拷贝】 的体现之一,对原始ByteBuf 进行切片成多个 ByteBuf ,切片后的ByteBuf 并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf 维护对应的read、write指针
得到分片后的 buffer后,要调用其retain 方法,使其内部的引用计数 + 1。避免原ByteBuf 释放,导致切片也无法使用。
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
package com.xlg.component.netty.ByteBuf;
import com.xlg.component.ks.utils.ByteBufUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
public class TestSlice {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
// 将buffer分成两部分
ByteBuf slice1 = buffer.slice(0, 5);
ByteBuf slice2 = buffer.slice(5, 5);
// 需要让分片的buffer引用计数加一
// 避免原Buffer释放导致分片buffer无法使用
slice1.retain();
slice2.retain();
ByteBufUtil.log(slice1);
ByteBufUtil.log(slice2);
// 更改原始buffer中的值
System.out.println("===========修改原buffer中的值===========");
buffer.setByte(0,5);
System.out.println("===========打印slice1===========");
ByteBufUtil.log(slice1);
// 分片合并
final CompositeByteBuf byteBufs = ByteBufAllocator.DEFAULT.compositeBuffer(10);
byteBufs.addComponents(true, slice1, slice1);
ByteBufUtil.log(slice1);
}
}
当然还有 合并这些分片的方法, 还有copy(复制数据,内存地址变化)等