Netty是一个高性能,异步事件驱动的NIO框架,基于Java进行开发。所有的IO操作都是异步非阻塞的,能够通过Future-Listener机制获取异步IO的结果。
public class NettyServer {
public static void main(String[] args) {
// 1. 服务端组件,组装netty的组件
new ServerBootstrap()
// 2. BootEventLoop, 包含selector, thread
.group(new NioEventLoopGroup())
// 3. 连接服务器的ServerSocketChannel实现,accept事件
.channel(NioServerSocketChannel.class)
// 4. 添加响应的事件处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()); // 将ByteBuf转换为字符
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override // 添加读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
// 6. 绑定端口
.bind(8888);
}
}
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1. 启动类
new Bootstrap()
// 2. 添加EventLoop
.group(new NioEventLoopGroup())
// 3. 添加客户端ServerSocket的实现
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 连接服务器
.connect(new InetSocketAddress("localhost", 8888))
// 6. 连接建立前进行阻塞
.sync()
// 7. 创建Channel
.channel()
// 8. 向服务端发送数据并进行刷新
.writeAndFlush("hello netty");
}
}
channel: 数据传输的通道
handler:对于数据处理操作
pipeline:多个handler的集合可以称为pipeline
eventLoop: 包含boss和worker,boss线程负责连接的建立,worker具体处理数据。每一个线程对每一个channel负责到底(即使中途阻塞切换任务,但是在这个任务数据准备完成后仍是刚才的worker线程进行处理)
EventLoop: 单线程的执行器,用来处理Channel上的io事件
而EventLoopGroup是一组EventLoop,对于每一个Channel会通过register()方法注册到一个EventLoop上,后续的操作都由这个EventLoop进行处理
/* NioEventLoopGroup() 可以处理io请求,普通任务,定时任务
参数含义:可以传递执行任务的线程数量,如果不传递参数会使用默认线程数:DEFAULT_EVENT_LOOP_THREADS
默认分配为 系统Math.max(1, CPU核数 * 2)
*/
EventLoopGroup eventExecutors = new NioEventLoopGroup(2);
// DefaultEventLoop() 处理普通任务,定时任务
// EventLoopGroup eventExecutors = new DefaultEventLoop();
System.out.println("系统CPU核数:" + NettyRuntime.availableProcessors());
// next() 获取下一个执行的EventLoop对象,内部会根据分配的线程数提供轮询策略
System.out.println(eventExecutors.next()); // io.netty.channel.nio.NioEventLoop@18ef96
System.out.println(eventExecutors.next()); // io.netty.channel.nio.NioEventLoop@6956de9
// System.out.println(eventExecutors.next()); // io.netty.channel.nio.NioEventLoop@18ef96
// 添加一个普通任务,异步的进行处理
eventExecutors.next().execute(() -> {
System.out.println("internal");
});
System.out.println("main");
// 添加一个定时任务
eventExecutors.next().scheduleAtFixedRate(() -> {
System.out.println("schedule task");
}, 0, 1, TimeUnit.SECONDS);
public static void main(String[] args) throws InterruptedException {
Channel ch = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println("Channel Object:" + ch);
ch.writeAndFlush("");
}
debug模式下,开启IDEA的并行运行,然后使用Evalute Expression
结论:EventLoopGroup是多线程环境,可以同时处理多个客户端;但是对于每一个客户端中的channel会和其中一个EventLoop进行绑定,下次的信息发送仍由这个EventLoop进行处理。
new ServerBootstrap()
// 职责划分:Netty中具有boss和worker线程组的概念,目的在于boss线程专门用于建立连接,worker用来处理任务
// 对于parentGroup中由于服务端只有一个,所以最多绑定到一个线程上
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
第二种情况:每个EventLoop的底层采用多路复用技术,因此可以处理多个客户端的请求;但是一旦某一个客户端执行时间过长,会导致该EventLoop下的其他客户端阻塞;
解决:将耗时的服务交给其他的线程组进行处理
public static void main(String[] args) {
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
// 职责划分:Netty中具有boss和worker线程组的概念,目的在于boss线程专门用于建立连接,worker用来处理任务
// 对于parentGroup中由于服务端只有一个,所以最多绑定到一个线程上
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 处理读事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 当没有使用StringDecoder的时候,此时msg为ByteBuf类型
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + ": " + buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 将数据将给下一个Handler进行处理
}
}).addLast(group, "other group", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 当没有使用StringDecoder的时候,此时msg为ByteBuf类型
ByteBuf buf = (ByteBuf) msg;
Thread.sleep(5000); // 模拟耗时
System.out.println("耗时操作 =》" + Thread.currentThread().getName() + ": " + buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
这里在外部额外创建一个DefaultEventLoop用于处理第二个Handler中出现的耗时处理状态;
如何在粉色h1和绿色h2中间切换不同的EventLoop?
底层实现:AbstractChannelHandlerContext.java # invokeChannelRead
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取下一个需要执行的EventLoop
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 如果下一个执行的线程和当前线程是同一个EventLoop,在当前线程中继续处理。
next.invokeChannelRead(m);
} else {
// 下一个线程executor和当前线程不同,将任务交付给executor进行处理
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
channel是数据传输的载体,在处理客户端连接建立的过程中,使用connet()方法可以获得ChannelFuture对象,在Netty中IO的操作都是异步的,因此对于每一个IO操作无法保证操作被实际完成,而ChannelFutrure对象就是异步非阻塞的。
在前面实先客户端的时候,代码如下:
ChannelFuture channelFuture = new Bootstrap()
....
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture
.sync()
.channel();
channel.writeAndFlush("hello");
上述代码获取ChannelFuture后会使用sync()方法进行阻塞,保证连接成功创建后,才会继续执行后续的数据发送操作。
上述的模式为同步,执行 *channel.writeAndFlush(“hello”);*的时候由 main 线程主导。
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
System.out.println(Thread.currentThread().getName() + " : " + channel);
channel.writeAndFlush("hello");
}
});
在监听到连接创建后,会通过回调方式执行 operationComplete 方法,此时运行的线程为 nio 线程
2. channel的关闭
问题描述:修改上述代码,想要提供用户一个持续发送数据,并且输入 'q’程序便会退出的,以及提供关闭后的额外操作场景。
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.addListener((ChannelFutureListener) future -> {
Channel channel = future.channel();
System.out.println(Thread.currentThread().getName() + " : " + channel);
channel.writeAndFlush("hello");
});
Channel channel = channelFuture.channel();
// 创建一个新的线程支持用户持续输入
new Thread(new Runnable() {
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
while (true) {
String msg = scanner.nextLine();
if ("q".equals(msg)) {
channel.close(); // 异步操作
System.out.println(Thread.currentThread().getName() + " 程序退出");
break;
}
channel.writeAndFlush(msg);
}
}
}, "input").start();
但是这里的 channel.close(); 属于Netty中的异步操作,因此无法保证 channel关闭和后续的处理之间的同步关系。
解决方案: Netty中对于channel的关闭提供了CloseFuture的对象,和ChannelFuture类似,也包含同步和异步两种模式。
// 代码省略
new Thread(new Runnable() {
@Override
public void run() {
Scanner scanner = new Scanner(System.in);
while (true) {
String msg = scanner.nextLine();
if ("q".equals(msg)) {
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("关闭前阻塞");
closeFuture.sync();
System.out.println(Thread.currentThread().getName() + " 程序退出");
此时具体执行后续处理的部分为 main 线程
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 程序退出"); // nioEventLoopGroup-2-1 程序退出
}
});
问题:在上述方案添加后,Java程序并没有正常终止
原因: Netty中的EventLoopGroup中的其他线程仍在工作,需要手动进行关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(Thread.currentThread().getName() + " 程序退出"); // nioEventLoopGroup-2-1 程序退出
group.shutdownGracefully(); // 优雅关闭EventLoopGroup
}
});
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executors = Executors.newFixedThreadPool(2);
Future<Integer> future = executors.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("开始计算");
Thread.sleep(1000);
return 80;
}
});
log.info("等待获取结果");
Integer res = future.get(); // 主线程阻塞,等待结果返回
log.info("计算结果:{}", res);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next(); // 获取一个EventLoop对象
// 提交一个异步任务
Future<Integer> future = eventLoop.submit(() -> {
log.info("开始计算");
Thread.sleep(1000);
return 80;
});
log.info("等待获取结果");
Integer res = future.get();
log.info("计算结果:{}", res);
}
整体代码的编写类似,只是在处理线程池的时候对于Netty中使用的EventLoop; Netty中还提供异步获取结果的方式(不需要get阻塞)。
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next(); // 获取一个EventLoop对象
// 提交一个异步任务
Future<Integer> future = eventLoop.submit(() -> {
log.info("开始计算");
Thread.sleep(1000);
return 80;
});
future.addListener(f -> {
log.info("等待获取结果");
// 立即获取数据
log.info("{} 计算结果:{}", Thread.currentThread().getName(), f.getNow()); // nioEventLoopGroup-2-1 计算结果:80
});
}
// Promise是对Future对象的进一步封装
public interface Promise<V> extends Future<V>
public static void main(String[] args) throws ExecutionException, InterruptedException {
EventLoop eventLoop = new NioEventLoopGroup().next();
// 提供一个异步对象promise
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
eventLoop.submit(() -> {
log.info("开始计算");
try {
Thread.sleep(1000);
promise.setSuccess(808); // 运行正常返回结果
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.info("等待获取结果");
promise.addListener(future -> {
// 立即获取数据
log.info("{} 计算结果:{}", Thread.currentThread().getName(), future.getNow()); // nioEventLoopGroup-2-1 计算结果:808
});
}
Pileline是一组Handler组成的链式结构,用来对chennel中的数据进行额外的处理。
对于Handler包含InBound以及OutBound两种,对应入站和出战两个操作;InBound在读数据的时候可以进行二次的修改然后传递给后续的Handler;而OutBound在处理写数据的时候才会触发。
整个pipeline是双向链表;inBoundHandler从Head处向后遍历;而对于OutBound从tail向前进行查找,二者的顺序是相反的。
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 处理读事件
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String newMsg = buf.toString(Charset.defaultCharset()) + "-hello";
log.info("server: {}", newMsg);
// 注意这里的两行代码
// ctx.writeAndFlush(msg);
ch.writeAndFlush(newMsg);
}
}).addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("outBound 1");
super.write(ctx, msg, promise);
}
});
}
}).bind(8080);
}
客户端发送hello, 服务端在原有基础上加工打印:
// ctx.writeAndFlush(msg);
ch.writeAndFlush(newMsg);
在InBound中存在这两行代码。发现,只有使用ch.writeAndFlush的时候才能正常打印OutBound中的内容。
原因:
Netty中对于ByteBuffer的封装,是数据的载体
public static void main(String[] args) {
// 默认创建一个ByteBuf; 默认初始容量为256;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
// 支持动态扩容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
// (ridx: 0, widx: 300, cap: 512), 最大容量512,进行2倍扩容
System.out.println(buf);
}
使用 ByteBufAllocator.DEFAULT.buffer(); 默认使用直接内存进行创建,并开启池化技术;减少ByteBuf的创建和销毁,提高整体性能,使用直接内容提高读写效率,相比于堆内存创建减少GC带来的影响。
// 创建的类对象:PooledUnsafeDirectByteBuf
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
使用堆内存创建
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(); // PooledUnsafeHeapByteBuf
ByteBuf的组成
扩容规则