黑马程序员Netty笔记合集
注意:由于章节连贯,此套笔记更适合学习《黑马Netty全套课程》的同学参考、复习使用。
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

他还是另一个著名网络应用框架 Mina 的重要贡献者
Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势

开发一个简单的服务器端和客户端
加入依赖
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>4.1.39.Finalversion>
dependency>
new ServerBootstrap() //1
.channel(NioServerSocketChannel.class) //2
.group(new NioEventLoopGroup()) //3
.childHandler( //4
new ChannelInitializer<NioSocketChannel>() { //5
@Override
protected void initChannel(NioSocketChannel ch) throws Exception { //6
//SocketChannel 处理器:解码 ByteBuf => String
ch.pipeline().addLast(new StringDecoder());
//业务处理器:使用上一个处理器的处理结果
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
//读事件
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8080); //7
启动器:负责组装 netty 组件,启动服务器
选择服务器的 ServerSocketChannel 实现类。 NioServerSocketChannel 表示基于 NIO 的服务器端实现

创建 Selector 线程处理组:简单理解为 线程池+Selector。[BossEventLoop、WorkerEventLoop(selector,thread)]
SocketChannel 连接处理器:决定了worker能执行哪些操作
初始化器:待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器。仅执行一次。
添加具体 handler:处理 SocketChannel
绑定监听端口
new Bootstrap() //1
.channel(NioSocketChannel.class) //2
.group(new NioEventLoopGroup()) //3
.handler(new ChannelInitializer<Channel>() { //4
@Override //建立连接后调用
protected void initChannel(Channel ch) throws Exception {
//消息会经过通道 handler 处理:将 String => ByteBuf 发出
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1",8080) //5
.sync() //6
.channel() //7
.writeAndFlush(new Date()+":Hello World!"); //8
启动类
选择客户 Channel 实现类。NioSocketChannel 表示基于 NIO 的客户端实现

创建 Selector 线程处理组
添加 SocketChannel 的处理器:待客户端 SocketChannel 建立连接后,执行 initChannel 以添加更多的处理器。ChannelInitializer 仅执行一次。
指定连接的服务器和端口
同步操作:阻塞等待Netty中异步方法的完成,如等待 connect 建立连接完毕
获取 channel 对象:它即为通道抽象,可以进行数据读写操作
写入消息并清空缓冲区

EventLoop:事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
EventLoopGroup:事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
public static void main(String[] args) {
//1.创建事件循环组
//没有指定线程数:max(1,SystemPropertyUtil.getInt("io.netty.eventLoopThreads",NettyRuntime.availableProcessors()*2))
EventLoopGroup group=new NioEventLoopGroup(2);
//2.获取下一个事件循环对象
log.debug(group.next().toString());
log.debug(group.next().toString());
log.debug(group.next().toString());
log.debug(group.next().toString());
//3.执行普通事件
group.execute(()->{
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("事件循环对象执行-普通任务");
});
//4.执行定时任务:0秒后开始执行,间隔1秒再次执行
group.scheduleAtFixedRate(()->{
log.debug("事件循环对象执行-定时任务");
},2,2, TimeUnit.SECONDS);
log.debug("主线程");
}
10:43:13 [DEBUG] [main] o.e.j.e.EventLoopTest : io.netty.channel.nio.NioEventLoop@3bfdc050
10:43:13 [DEBUG] [main] o.e.j.e.EventLoopTest : io.netty.channel.nio.NioEventLoop@1bce4f0a
10:43:13 [DEBUG] [main] o.e.j.e.EventLoopTest : io.netty.channel.nio.NioEventLoop@3bfdc050
10:43:13 [DEBUG] [main] o.e.j.e.EventLoopTest : io.netty.channel.nio.NioEventLoop@1bce4f0a
10:43:13 [DEBUG] [main] o.e.j.e.EventLoopTest : 主线程
10:43:14 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.e.EventLoopTest : 事件循环对象执行-普通任务
10:43:15 [DEBUG] [nioEventLoopGroup-2-2] o.e.j.e.EventLoopTest : 事件循环对象执行-定时任务
10:43:17 [DEBUG] [nioEventLoopGroup-2-2] o.e.j.e.EventLoopTest : 事件循环对象执行-定时任务
细分1:两个事件循环组分别处理 accept事件 和 读写事件
public static void main(String[] args) {
new ServerBootstrap()
//细分一:将只负责accept事件的 Boss 和 只处理读写事件的 woker 分成两个事件循环组
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf= (ByteBuf) msg;
log.debug(buf.toString(Charset.forName("UTF-8")));
}
});
}
})
.bind(8080);
}
/**
* 客户端
*/
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080))
.sync()
.channel();
System.out.println(channel);
System.out.println("");
}


细分2:增加处理 非IO事件 的事件循环组对象,让 IO循环组对象 可以及时处理多个客户端的IO请求
/**
* 服务端
*/
public static void main(String[] args) {
//细分二:增加处理 非IO事件 的事件循环组对象。让 IO循环组对象 可以及时处理多个客户端的IO请求
DefaultEventLoopGroup defaultGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
//细分一:将只负责accept事件的 Boss 和 只处理读写事件的 woker 分成两个事件循环组
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//IO循环组对象 处理
ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf= (ByteBuf) msg;
log.debug(buf.toString(Charset.forName("UTF-8")));
ctx.fireChannelRead(msg); // 让消息传递给下一个handler
}
});
//非IO事件循环组对象 处理
ch.pipeline().addLast(defaultGroup,"handler2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf= (ByteBuf) msg;
log.debug(buf.toString(Charset.forName("UTF-8")));
}
});
}
})
.bind(8080);
}
/**
* 客户端
*/
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080))
.sync()
.channel();
System.out.println(channel);
System.out.println("");
}


/**
* 关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
*/
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 1.获取下一个 hander 的事件循环对象
EventExecutor executor = next.executor();
// 2.判断:下一道工序的处理工人 与 当前工人 是否是同一个?
if (executor.inEventLoop()) {
//3.是,让该工人继续处理
next.invokeChannelRead(m);
}
// 4.不是,将 下一道工序的代码 作为任务提交给 另一个工人 处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
注意:在完成连接建立后通道才能进行操作
/**
* 客户端方式一:主线程同步等待连接建立,然后写入并刷出数据
*/
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
//1.channel()
Channel beforeConnect = channelFuture.channel();
beforeConnect.writeAndFlush("连接建立前!");
System.out.println(beforeConnect); //[id: 0xf9941f21]
//方式一:主线程同步等待
//2.sync()
channelFuture.sync();
Channel afterConnect = channelFuture.channel();
afterConnect.writeAndFlush("连接建立后:主线程写出!");
System.out.println(afterConnect); //[id: 0xf9941f21, L:/127.0.0.1:61900 - R:/127.0.0.1:8080]
}

/**
* 客户端方式二:主线程异步等待连接建立,使用另一线程写入并刷出数据
*/
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
//1.channel()
Channel beforeConnect = channelFuture.channel();
beforeConnect.writeAndFlush("连接建立前!");
System.out.println(beforeConnect); //[id: 0xaf48a651]
//方式二:主线程异步等待
//3.addListener(GenericFutureListener)
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
channelFuture.channel().writeAndFlush("连接建立后:另一线程写出");
System.out.println(channelFuture.channel()); //[id: 0x5201a68b, L:/127.0.0.1:62337 - R:/127.0.0.1:8080]
}
});
}

Channel channel=channelFuture.channel();
同步善后
/**
* 客户端-同步善后
*/
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner=new Scanner(System.in);
while(true){
String s = scanner.nextLine();
if("q".equalsIgnoreCase(s)) break;
channel.writeAndFlush(s);
}
//客户端输入任务完成,关闭通道
channel.close();
},"write").start();
//方式一:主线程同步等待通道关闭,进行善后处理
channel.closeFuture().sync();
log.debug("通道关闭,进行善后处理...");
}

异步善后
/**
* 客户端-异步善后
*/
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner=new Scanner(System.in);
while(true){
String s = scanner.nextLine();
if("q".equalsIgnoreCase(s)) break;
channel.writeAndFlush(s);
}
//客户端输入任务完成,关闭通道
channel.close();
},"write").start();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("通道关闭,进行善后处理...");
group.shutdownGracefully(); //优化关闭事件循环组
}
});
}

在异步处理时,经常用到这两个接口
Netty Promise —>(继承) Netty Future —>(继承) Jdk Future
| 功能/名称 | jdk Future | netty Future | Promise |
|---|---|---|---|
| cancel | 取消任务 | - | - |
| isCanceled | 任务是否取消 | - | - |
| isDone | 任务是否完成,不能区分成功失败 | - | - |
| get | 获取任务结果,阻塞等待。如果任务失败,抛出异常 | - | - |
| getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
| await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
| sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
| isSuccess | - | 判断任务是否成功 | - |
| cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
| addLinstener | - | 添加回调,异步接收结果 | - |
| setSuccess | - | - | 设置成功结果 |
| setFailure | - | - | 设置失败结果 |
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建 EventLoop
EventLoop eventLoop = new NioEventLoopGroup(2).next();
//2.获取 Promise
DefaultPromise<Integer> promise=new DefaultPromise<>(eventLoop);
//3.主线程创建随机线程执行任务,将任务结果设置到 promise容器 中
eventLoop.execute(()->{
log.debug("执行任务!");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
promise.setSuccess(50);
});
//4.主线程同步获取结果
log.debug("主线程非阻塞获取结果:{}",promise.getNow());
log.debug("主线程同步阻塞获取结果:{}",promise.get());
}
10:12:49 [DEBUG] [main] o.e.j.f.PromiseSynSuccess : 主线程非阻塞获取结果:null
10:12:49 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseSynSuccess : 执行任务!
10:12:50 [DEBUG] [main] o.e.j.f.PromiseSynSuccess : 主线程同步阻塞获取结果:50
public static void main(String[] args) {
//1.创建 EventLoop
EventLoop eventLoop = new NioEventLoopGroup(2).next();
//2.获取 Promise
DefaultPromise<Integer> promise=new DefaultPromise<>(eventLoop);
//3.主线程创建随机线程执行任务,将任务结果设置到 promise容器 中
eventLoop.execute(()->{
log.debug("执行任务!");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
promise.setSuccess(50);
});
//4.主线程异获取结果
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("主线程异步获取结果{}",future.getNow());
}
});
log.debug("主线程在异步获取结果时处理其他事情...");
}
10:15:59 [DEBUG] [main] o.e.j.f.PromiseAsynSuccess : 主线程在异步获取结果时处理其他事情...
10:15:59 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseAsynSuccess : 执行任务!
10:16:00 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseAsynSuccess : 主线程异步获取结果50
sync & get
/**
* get()
* sync():也会出现异常,只是 get 会再用 ExecutionException 包一层
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.创建 EventLoop
EventLoop eventLoop = new NioEventLoopGroup(2).next();
//2.获取 Promise
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
//3.主线程创建随机线程执行任务,将任务结果设置到 promise容器 中
eventLoop.execute(() -> {
log.debug("执行任务!")
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
promise.setFailure(new RuntimeException("同步失败"));
});
//4.主线程同步获取结果
log.debug("主线程非阻塞获取结果:{}", promise.getNow());
log.debug("主线程同步阻塞获取结果:{}", promise.get());
}
10:21:03 [DEBUG] [main] o.e.j.f.PromiseSynFail : 主线程非阻塞获取结果:null
10:21:03 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseSynFail : 执行任务!
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 同步失败
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
at org.example.java.future.PromiseSynFail.main(PromiseSynFail.java:30)
Caused by: java.lang.RuntimeException: 同步失败
at org.example.java.future.PromiseSynFail.lambda$main$0(PromiseSynFail.java:25)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
await
/**
* await():不会抛异常
*/
@Test
public void test() throws InterruptedException {
//1.创建 EventLoop
EventLoop eventLoop = new NioEventLoopGroup(2).next();
//2.获取 Promise
DefaultPromise<Integer> promise=new DefaultPromise<>(eventLoop);
//3.主线程创建随机线程执行任务,将任务结果设置到 promise容器 中
eventLoop.execute(()->{
log.debug("执行任务!");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
promise.setFailure(new RuntimeException("同步失败"));
});
//4.主线程同步获取结果
log.debug("主线程非阻塞获取结果:{}",promise.getNow());
promise.await();
log.debug("主线程同步阻塞获取结果:{}",promise.isSuccess()?promise.getNow():promise.cause().toString());
}
10:26:56 [DEBUG] [main] o.e.j.f.PromiseSynFail : 主线程非阻塞获取结果:null
10:26:56 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseSynFail : 执行任务!
10:26:57 [DEBUG] [main] o.e.j.f.PromiseSynFail : 主线程同步阻塞获取结果:java.lang.RuntimeException: 同步失败
/**
* 不会抛异常
*/
public static void main(String[] args) {
//1.创建 EventLoop
EventLoop eventLoop = new NioEventLoopGroup(2).next();
//2.获取 Promise
DefaultPromise<Integer> promise=new DefaultPromise<>(eventLoop);
//3.主线程创建随机线程执行任务,将任务结果设置到 promise容器 中
eventLoop.execute(()->{
log.debug("执行任务!");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
promise.setFailure(new RuntimeException("异步失败"));
});
//4.主线程异获取结果
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("主线程异步获取结果{}",promise.getNow());
log.debug("主线程异步获取结果{}",promise.isSuccess()?promise.getNow():promise.cause().toString());
}
});
log.debug("主线程在异步获取结果时处理其他事情...");
}
10:33:26 [DEBUG] [main] o.e.j.f.PromiseAsynFail : 主线程在异步获取结果时处理其他事情...
10:33:26 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseAsynFail : 执行任务!
10:33:27 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseAsynFail : 主线程异步获取结果null
10:33:27 [DEBUG] [nioEventLoopGroup-2-1] o.e.j.f.PromiseAsynFail : 主线程异步获取结果java.lang.RuntimeException: 异步失败
public static void main(String[] args) {
//1.创建 EventLoop
EventLoop eventLoop = new NioEventLoopGroup(2).next();
//2.获取 Promise
DefaultPromise<Integer> promise=new DefaultPromise<>(eventLoop);
//3.主线程创建随机线程执行任务,将任务结果设置到 promise容器 中
eventLoop.execute(()->{
System.out.println("1");
try {
promise.await();
// 注意不能仅捕获 InterruptedException 异常
// 否则 死锁检查抛出的 BlockingOperationException 会继续向上传播
// 而提交的任务会被包装为 PromiseTask,它的 run 方法中会 catch 所有异常然后设置为 Promise 的失败结果而不会抛出
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("2");
});
eventLoop.submit(()->{
System.out.println("3");
try {
promise.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("4");
});
}
1
2
3
4
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@3c788ced(incomplete)
at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)
at org.example.java.future.PromiseAwaitDeadlock.lambda$main$0(PromiseAwaitDeadlock.java:21)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@3c788ced(incomplete)
at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)
at org.example.java.future.PromiseAwaitDeadlock.lambda$main$1(PromiseAwaitDeadlock.java:33)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
Handler执行顺序
/**
* 注意:
* 1.写数据时应该使用channel写,才会从内部往外部的handler传递处理
* 2.处理到通道的数据应该是ByteBuf
*/
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
String name=((ByteBuf)msg).toString(Charset.forName("UTF-8"));
//1.启动下一个入站handler,并传递处理结果
ctx.fireChannelRead(name);
}
});
ch.pipeline().addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) {
System.out.println(2);
//2.启动下一个入站handler,并传递处理结果
ctx.fireChannelRead(name);
}
});
ch.pipeline().addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) {
System.out.println(3);
//3.从内部开始启动第一个出站handler,并传递处理结果
ch.writeAndFlush(ctx.alloc().buffer().writeBytes(((String)name).getBytes()));
//ctx.channel().writeAndFlush(ctx.alloc().buffer().writeBytes(((String)name).getBytes()));
//注意:该操作是从当前位置往外找处理器,而外面没有出站处理器导致内部的出站处理器无法执行
//ctx.write(ctx.alloc().buffer().writeBytes(((String)name).getBytes()), promise);
}
});
ch.pipeline().addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println(4);
//4.往外启动下一个出站handler,并传递处理结果
ctx.write(msg, promise);
}
});
ch.pipeline().addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println(5);
//5.往外启动下一个出站handler,并传递处理结果
ctx.write(msg, promise);
}
});
ch.pipeline().addLast("h6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
System.out.println(6);
//6.往外启动下一个出站handler,并传递处理结果
ctx.write(msg, promise);
}
});
}
})
.bind(8080);
1
2
3
6
5
4

/**
* - 不指定容量时,默认为256
* - 创建了一个默认的 ByteBuf(池化基于直接内存的 ByteBuf),初始容量是 10
*/
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
System.out.println(buffer);
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 10)
调试方法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
//堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
//直接内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
池化的最大意义:重用 ByteBuf
池化功能是否开启,可以通过系统环境变量来设置:-Dio.netty.allocator.type={unpooled|pooled}
| 创建实例 | 高并发时 | 分配效率 | 直接内存 | 堆内存 | |
|---|---|---|---|---|---|
| 有了池化 | 重用池中的实例 | 节约内存,减少内存溢出 | 采用了类似 jemalloc 的内存分配算法提升分配效率 | ||
| 没有池化 | 创建新的实例 | 创建、销毁的代价昂贵 | 增加 GC 压力 |

| 方法签名 | 含义 | 备注 |
|---|---|---|
| writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
| writeByte(int value) | 写入 byte 值 | |
| writeShort(int value) | 写入 short 值 | |
| writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
| writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
| writeLong(long value) | 写入 long 值 | |
| writeChar(int value) | 写入 char 值 | |
| writeFloat(float value) | 写入 float 值 | |
| writeDouble(double value) | 写入 double 值 | |
| writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
| writeBytes(byte[] src) | 写入 byte[] | |
| writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
| int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 | 写入字符串 |
| markReaderIndex() | 标记读指针 | |
| resetReaderIndex() | 重置读指针 |
使用案例
//1.先写入 4 个字节
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
//结果是:
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
//2.再写入一个 int 整数,也是 4 个字节
buffer.writeInt(5);
log(buffer);
//结果是:
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
//3.再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容
buffer.writeInt(6);
log(buffer);
//结果是:
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06 |............ |
+--------+-------------------------------------------------+----------------+
使用案例
//1.读取 4 次,每次一个字节
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);
//结果是:
1
2
3
4
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
//2.如果需要重复读取 int 整数 5,怎么办?
//方式一:可以在 read 前先做个标记 mark
//方式二:采用 get 开头的一系列方法,不会改变 read index
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
//结果是:
5
read index:8 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06 |.... |
+--------+-------------------------------------------------+----------------+
//3.这时要重复读取的话,重置到标记位置 reset
buffer.resetReaderIndex();
log(buffer);
//结果是:
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
回收内存的源码实现,请关注此方法的不同实现:protected abstract void deallocate()
引用计数法
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则
在 pipeline 中,ByteBuf 一般需要传递给下一个 handler 。所以释放规则是,谁是最后使用者,谁负责 release:
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
//io.netty.channel.DefaultChannelPipeline.TailContext ——>
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
this.onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
出站 ByteBuf 处理原则
**异常 **处理原则
对原始 ByteBuf 切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
注意:切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write

使用案例
//1.初始化
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
//2.切片:如果写入会报 IndexOutOfBoundsException 异常
ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
//3.原始 ByteBuf 进行读操作
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 04 |.. |
+--------+-------------------------------------------------+----------------+
//4.切片得到的 slice 不受影响,因为它有独立的读写指针
System.out.println(ByteBufUtil.prettyHexDump(slice));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+
//5.slice 的内容发生了更改
slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05 |... |
+--------+-------------------------------------------------+----------------+
//6.原始 ByteBuf 也会受影响,因为底层都是同一块内存
System.out.println(ByteBufUtil.prettyHexDump(origin));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05 |.. |
+--------+-------------------------------------------------+----------------+
注意:没有 max capacity 的限制,只是前部分与原始 ByteBuf 使用同一块底层内存,且读写指针独立

CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
使用案例
//1.有两个 ByteBuf 如下
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+
//2.现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?
//方式一:深拷贝
//方式二:零拷贝
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
buf3.addComponents(true, buf1, buf2);
//buf3结果
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
注意:当包装个数超过一个时, 底层使用了 CompositeByteBuf
使用案例
//1.有两个 ByteBuf 如下
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
//2.包装 ByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
//输出
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
//3.包装普通字节数组
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
//输出
class io.netty.buffer.CompositeByteBuf
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 |...... |
+--------+-------------------------------------------------+----------------+
实现一个 echo server
服务器
//1.创建 非IO事件循环组、IO事件循环组、接收事件循环组
DefaultEventLoopGroup defaultGroup=new DefaultEventLoopGroup(2);
NioEventLoopGroup group=new NioEventLoopGroup(2);
//2.监听 8080 端口
ChannelFuture channelFuture = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(new NioEventLoopGroup(),group)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(defaultGroup,new StringDecoder(Charset.forName("UTF-8")));
//接收信息
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端:"+msg);
//将信息进行封装
ByteBuf buf=(ByteBuf) msg;
ByteBuf message = ctx.alloc().buffer();
message.writeBytes("服务器:".getBytes());
message.writeBytes(buf);
//思考:释放 msg 缓冲区。无需释放 message ,由 HeadContext 进行释放。
buf.release();
ctx.channel().writeAndFlush(message);
}
});
//发出信息
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//无需释放 msg ,由 HeadContext 进行释放
ctx.write(msg);
}
});
ch.pipeline().addLast(defaultGroup,new StringEncoder(Charset.forName("UTF-8")));
}
})
.bind(8080);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("服务器启动成功!");
}
});
客户端
//1.创建 非IO事件循环组、IO事件循环组
DefaultEventLoopGroup defaultGroup = new DefaultEventLoopGroup(2);
NioEventLoopGroup group = new NioEventLoopGroup(2);
//2.连接 127.0.0.1:8080
ChannelFuture connect = new Bootstrap()
.channel(NioSocketChannel.class)
.group(group)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
System.out.print("客户端:");
//思考:无需释放 msg ,由 TailContext 进行释放。
ctx.fireChannelRead(msg);
}
});
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
//3.等待连接建立后执行操作
Channel channel = connect.sync().channel();
//4.接收控制台输入,并发送
defaultGroup.submit(()->{
System.out.println("客户端启动完成!");
Scanner in=new Scanner(System.in);
String message="";
System.out.print("客户端:");
while(!"q".equalsIgnoreCase(message)){
message=in.nextLine();
channel.writeAndFlush(message);
}
//5.关闭连接
channel.close();
});
//6.关闭连接之后释放资源
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
defaultGroup.shutdownGracefully();
group.shutdownGracefully();
}
});
我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读
服务器
public class TestServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888);
Socket s = ss.accept();
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
// 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
客户端
public class TestClient {
public static void main(String[] args) throws IOException {
Socket s = new Socket("localhost", 8888);
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}