可以理解为:事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件
继承关系,查看源码可知:



可以理解为:事件循环组
一般我们不会直接使用EventLoop,而是使用EventLoopGroup
EventLoopGroup 是一组 EventLoop,
Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)继承自 netty 自己的 EventExecutorGroup

// 1. 创建事件循环组
/*
NioEventLoopGroup:
默认创建的线程个数为:
1. 首先读取netty自己的性能参数,读取到了就以这个为准备(就是自己指定的数)
2. 如果上面的没有读取到,会找到当前电脑的的cpu核心数 * 2
3. 如果指定的为错误(如0),他是会去取范围中间的一个数
功能最为全面,可以做下面的事情:
1. 处理IO事件
2. 处理普通任务
3. 处理定时任务
DefaultEventLoopGroup:
可以做下面的事情:
1. 处理普通任务
2. 处理定时任务
*/
// 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
//EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
// 2. 获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
/*
io.netty.channel.nio.NioEventLoop@548e7350
io.netty.channel.nio.NioEventLoop@1a968a59
io.netty.channel.nio.NioEventLoop@548e7350
io.netty.channel.nio.NioEventLoop@1a968a59
*/
好处:
// 1. 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
// 2. 获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 3. 执行普通任务
//group.next().submit()类似
group.next().execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
log.debug("main");
/*
io.netty.channel.nio.NioEventLoop@548e7350
io.netty.channel.nio.NioEventLoop@1a968a59
io.netty.channel.nio.NioEventLoop@548e7350
io.netty.channel.nio.NioEventLoop@1a968a59
23:44:33 [DEBUG] [main] c.i.n.c.TestEventLoop - main
23:44:34 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
*/
用途:在实现keep-alive时,实现任务的保活
// 1. 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
// 2. 获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 4. 执行定时任务
/*
scheduleAtFixedRate:执行一个定时任务,以一定的频率去执行
参数1:任务对象
参数2:初始化延迟时间,设置为1就是1s后执行,设置为0代表立即执行
参数3:间隔时间
参数4:时间单位
*/
group.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 0, 1, TimeUnit.SECONDS);
log.debug("main");
/*
io.netty.channel.nio.NioEventLoop@548e7350
io.netty.channel.nio.NioEventLoop@1a968a59
io.netty.channel.nio.NioEventLoop@548e7350
io.netty.channel.nio.NioEventLoop@1a968a59
23:48:26 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
23:48:26 [DEBUG] [main] c.i.n.c.TestEventLoop - main
23:48:27 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
23:48:28 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
23:48:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
23:48:30 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
23:48:31 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
23:48:32 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
Process finished with exit code -1
*/
在服务器端做了两个细分:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j//后面使用的log.debug需要这个注解
public class EventLoopServer {
public static void main(String[] args) {
// 细分2:创建一个独立的 EventLoopGroup
/*
如果遇见某一个handler的耗时较长,最好不要使这个handler占用worker的NIO线程,否则就会影响NIO的读写操作
下面这个EventLoopGroup专门处理耗时较长的操作,而不是NioEventLoopGroup去操作这种耗时较长的事件
DefaultEventLoopGroup:不需要处理IO事件,只能处理定时和普通任务
*/
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
/*细分1*/
// 把EventLoop分为: boss 和 worker
// 第一个参数: boss 只负责 ServerSocketChannel 上 accept 事件
// 第二个参数: worker 只负责 ServerSocketChannel 上的读写
/* 参数1:boss
ServerSocketChannel 只会跟一个EventLoop 绑定,更多的ServerSocketChannel不可能有,因为服务器只有一个,
其只会占用EventLoopGroup中的一个线程,剩余的也不会创建,这个地方不写参数也是可以的
参数2:worker
这个根据实际情况去设置,不设置的话就是cpu核心数 * 2 ;
下面设置为2,说明以后的worker线程只有两个
*/
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override//连接建立后调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {//自己处理数据
@Override// 没有StringDeCode,所以数据是一个ByteBuf类型(字节数据)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//关注读事件
ByteBuf buf = (ByteBuf) msg;
// 将buf 转为字符串, 最好指定哪一种字节码
log.debug(buf.toString(Charset.defaultCharset()));
//必须使用下面这个方法,否则,消息将会在这个地方断掉
ctx.fireChannelRead(msg); // 将消息传递给下一个handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class EventLoopClient{
public static void main(String[] args) throws InterruptedException {
// 1. 创建启动类,启动客户端
new Bootstrap()
// 2. 添加 EventLoop:比如服务器端发送数据过来,客户端的eventLoop就可以从选择器里触发读事件,去进行进一步的处理
.group(new NioEventLoopGroup())
// 3. 选择客户端 channel 实现,NioSocketChannel(封装了jdk的NioSocketChannel)
.channel(NioSocketChannel.class)
// 4. 添加处理器,ChannelInitializer(连接建立后会被调用,调用后便会执行initChannel方法)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
//客户端将字符串编码成为ByteBuf,服务器端将ByteBuf解码为字符串
ch.pipeline().addLast(new StringEncoder());
}
})
// 5. 连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()//
.channel();
// 6. 向服务器发送数据,即写数据
System.out.println(channel);
System.out.println("");
}
}
按下面的操作,便可以实现debug启动


第一步

按如下选择

点击apply–》ok
以后,直接右键,便可以启动多个客户端



一旦建立连接,Channel就会跟一个NioEventLoop绑定,后续的所有请求,都会由同一个EventLoop处理

举例说明:
问题也可以描述为:多个handler之间,使用的是不同的EventLoop,是如何进行线程的切换的
EventLoop就可以当做是一个个人,多个工人的工作是如何进行交换的
以下面的图举例:
在这里,将上面的NIOEventLoopGroup简称为Nio线程,DefaultEventLoopGroup简称为普通线程
核心代码在这个类中:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
其关键代码如下
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
/*
next.executor():返回下一个handler的EventLoop
*/
EventExecutor executor = next.executor();
// 是,直接调用
/*
executor.inEventLoop():当handler中的线程,是否与executor 是同一个线程
成立的进入if,不成立进入else
*/
if (executor.inEventLoop()) {
//让下一个handler也是在同一个线程去执行
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
//给另一个线程executor去执行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
简而言之:
如果两个handler绑定的是同一个线程,那么就直接调用; 否则,把要调用的代码封装为一个任务对象,由下一一个 handler的线程来调用
步骤:
next.invokeChannelRead(m);,便会进入下一个handlerchannel 的主要作用
close() 可以用来关闭 channel
closeFuture() 用来处理 channel 的关闭
当close事件发生后,去进行关闭后的善后处理
sync 方法作用是同步等待 channel 关闭
而 addListener 方法是异步等待 channel 关闭
pipeline() 方法添加处理器
当连接建立时,调用触发器时,会在触发器中的initChannel这个方法里去调用channel的pipeline() 方法。作用就是给 channel的pipeline流水线中加入一个个handler处理器
write() 方法将数据写入
将数据写入channel,但是不会立刻将数据通过·网络发出去,会触发一定条件时才会进行发送
writeAndFlush() 方法将数据写入并刷出
可以保证数据立即写入并发出
注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象
connect是异步非阻塞的,主线程(main)发起了调用,真正执行connect(底层的)的是Nio线程,主线程不会被阻塞,将继续向下运行
只要是带Future、Promise的,一般都是配合异步线程,一起使用的
有两种方法去进行结果的处理
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 1. 连接到服务器
// 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
.connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后
// 2.1 使用 sync 方法同步处理结果,谁发起的调用,谁就等结果
channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");
}
}
将等结果和处理结果的内容都交给另一个线程去处理
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 1. 连接到服务器
// 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
.connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后
// 2.2 使用 addListener(回调对象) 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在 nio 线程连接建立好之后,会调用 operationComplete
/*
这里的channelFuture对象与调用时的channelFuture是同一个
*/
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");
}
});
}
}
close()方法是一个异步操作,我们希望在close关闭后再去做一些操作
真正处理关闭的线程是在NioEventLoopGroup这个线程里面进行的
这个是handler是netty提供的,定义了日志级别,如下所示

这个handler里面的日志级别将被定义为debug级别进行输出

在当前线程进行阻塞,当真正调用了channel.close()方法后,sync方法才会继续向下运行
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
System.out.println(channelFuture.getClass());
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close 异步操作 1s 之后
// log.debug("处理关闭之后的操作"); // 不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
log.debug("waiting close...");
closeFuture.sync();//同步,会在当前线程进行阻塞
log.debug("处理关闭之后的操作");
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
System.out.println(channelFuture.getClass());
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close 异步操作 1s 之后
// log.debug("处理关闭之后的操作"); // 不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener((ChannelFutureListener) future -> {
log.debug("处理关闭之后的操作");
//使NioEventLoopGroup 的线程都关闭掉
group.shutdownGracefully();
});
}
}
要点
单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
异步并没有缩短响应时间,反而有所增加
异步的响应时间其实是增长了,
提高的是吞吐量(提高了单位时间的访问的个数)
合理进行任务拆分,也是利用异步的关键
合理拆分
提升的是是什么:吞吐量
吞吐量:单位时间内能够处理请求的速度
这两个异步处理时,经常用到的接口
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展(netty中的Future继承了JDK中的Future,netty中的Promise继承了netty中的Future接口)
只能同步等待任务结束(或成功、或失败)才能得到结果同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束脱离了任务独立存在,只作为两个线程间传递结果的容器| 功能/名称 | jdk Future | netty Future | Promise |
|---|---|---|---|
| cancel | 取消任务 | - | - |
| isCanceled | 任务是否取消 | - | - |
| isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
| getNow | - | 等待任务结束,获取任务结果,非阻塞,还未产生结果时返回 null | - |
| await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断任务是成功还是失败 | - |
| sync | - | 等待任务结束,不去获取结果,如果任务失败,抛出异常 | - |
| isSuccess | - | 判断任务是否成功 | - |
| cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
| addLinstener | - | 添加回调,异步接收结果 | - |
| setSuccess | - | - | 设置成功结果 |
| setFailure | - | - | 设置失败结果 |
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 2. 提交任务
/*
Callable:有返回结果
Runable:没有返回结果
*/
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
}
});
// 3. 主线程通过 future 来获取结果
log.debug("等待结果");
log.debug("结果是 {}", future.get());
}
}
运行结果
21:36:05 [DEBUG] [pool-1-thread-1] c.i.n.c.TestJdkFuture - 执行计算
21:36:05 [DEBUG] [main] c.i.n.c.TestJdkFuture - 等待结果
21:36:06 [DEBUG] [main] c.i.n.c.TestJdkFuture - 结果是 50
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 70;
}
});
//同步的方式获取
// log.debug("等待结果");
// log.debug("结果是 {}", future.get());
//异步的方式获取
future.addListener(new GenericFutureListener<Future<? super Integer>>(){
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果:{}", future.getNow());
}
});
}
}
运行结果
21:53:37 [DEBUG] [main] c.i.n.c.TestNettyFuture - 等待结果
21:53:37 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 执行计算
21:53:38 [DEBUG] [main] c.i.n.c.TestNettyFuture - 结果是 70
21:53:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - addListener接收结果:70
Process finished with exit code -1
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 准备 EventLoop 对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 2. 可以主动创建 promise;promise就类似于结果容器,存放结果
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
log.debug("开始计算...");
try {
int i = 1 / 0;
Thread.sleep(1000);
//填充正常的结果
promise.setSuccess(80);
} catch (Exception e) {
e.printStackTrace();
//填充异常的结果
promise.setFailure(e);
}
}).start();
// 4. 接收结果的线程
log.debug("等待结果...");
log.debug("结果是: {}", promise.get());
}
}
运行结果:
21:57:52 [DEBUG] [Thread-0] c.i.n.c.TestNettyPromise - 开始计算...
21:57:52 [DEBUG] [main] c.i.n.c.TestNettyPromise - 等待结果...
java.lang.ArithmeticException: / by zero
at cn.itcast.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:22)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
at cn.itcast.netty.c3.TestNettyPromise.main(TestNettyPromise.java:34)
Caused by: java.lang.ArithmeticException: / by zero
at cn.itcast.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:22)
at java.lang.Thread.run(Thread.java:745)
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
只有向channel里写入数据(write或writeAndFlush方法),才会触发
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1. 通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2. 添加处理器,下面的构成这种链: head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
/*
netty会自动创建两个handler:head和tail
上面的一条链会依次执行
*/
//入栈
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
//入栈
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
super.channelRead(ctx, name); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
}
});
//入栈
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
//写入操作,只是为了触发下面的出栈处理器
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
//出栈
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
//出栈
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
//出栈
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
运行结果

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import jdk.nashorn.internal.objects.annotations.Constructor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1. 通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2. 添加处理器,下面的构成这种链: head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
/*
netty会自动创建两个handler:head和tail
上面的一条链会依次执行
*/
//入栈
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString();
super.channelRead(ctx, name);
}
});
//入栈
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
Student student = new Student(name.toString());
super.channelRead(ctx, student); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
}
});
//入栈
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3, 结果{}, class:{}", msg, msg.getClass());
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
//写入操作,只是为了触发下面的出栈处理器
// ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
//出栈
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
//出栈
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
//出栈
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student {
private String name;
public Student(String name) {
this.name = name;
}
}
}
运行结果:

ctx(ChannelHandlerContext)是向前找outbound(出站)
ch(NioSocketChannel)是向后去找outbound(出站)
代码如下所示,如果将h4换到h3之前,并在h3里面使用ctx的话,会出下面的结果
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1. 通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2. 添加处理器,下面的构成这种链: head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
/*
netty会自动创建两个handler:head和tail
上面的一条链会依次执行
*/
//入栈
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
//入栈
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
super.channelRead(ctx, name); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
}
});
//出栈
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
//入栈
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
//ctx是向前去找出站(outbound)处理器
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
//写入操作,只是为了触发下面的出栈处理器
// ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
//出栈
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
//出栈
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}

如果h4没有换到h3之前,是只能打印到3的位置的
ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表(链表如下:)

数字代表了处理步骤的先后次序

如果在 in_3 使用的是ctx.write的话,是会向前找out