Netty 作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一个服务-----心跳机制 heartbeat。通过心跳检查对方是否有效,这是 RPC 框架中是必不可少的功能。下面我们分析一下 Netty 内部心跳服务源码实现。
Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三个 Handler 检测连接的有效性,
重点分析 IdleStateHandler
名称 | 描述 |
---|---|
IdleStateHandler | 如果连接闲置时间过长,则会触发 IdleStateEvent 事件。在 ChannelInboundHandler 中可以覆盖 userEventTriggered(…) 方法来处理 IdleStateEvent。 |
ReadTimeoutHandler | 在指定的时间间隔内没有接收到入站数据则会抛出 ReadTimeoutException 并关闭 Channel。ReadTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。 |
WriteTimeoutHandler | 在指定的时间间隔内没有接收到入站数据则会抛出 WriteTimeoutException 并关闭 Channel。WriteTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。 |
ReadTimeout 事件和 WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,这里只是介绍以下,我们重点看 IdleStateHandler。
private final boolean observeOutput; // 是否考虑出站时较慢的情况。默认值是 false
private final long readerIdleTimeNanos; // 读事件空闲时间,0 则禁用事件
private final long writerIdleTimeNanos; // 写事件空闲时间,0 则禁用事件
private final long allIdleTimeNanos; // 读或写空闲时间,0 则禁用事件
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx); // 进行初始化
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
default:
break;
}
state = 1;
initOutputChanged(ctx);
// 获取当前系统纳秒时间
lastReadTime = lastWriteTime = ticksInNanos();
// 读超时 大于0 说明关注了该事件
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
// 写超时 大于0 说明关注了该事件
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
// 读或写空闲时间, 大于0 说明关注了该事件
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
schedule 方法
Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
// 往当前处理器中的执行器添加了定时任务, ReaderIdleTimeoutTask,WriterIdleTimeoutTask,AllIdleTimeoutTask
return ctx.executor().schedule(task, delay, unit); // 单位都是纳秒
}
他们的父类
private abstract static class AbstractIdleTask implements Runnable {
private final ChannelHandlerContext ctx;
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
// 当所有的定时任务, 必须是在当前通道是打开状态下, 才真正执行 run
if (!ctx.channel().isOpen()) {
return;
}
run(ctx);
}
protected abstract void run(ChannelHandlerContext ctx);
}
ReaderIdleTimeoutTask
protected void run(ChannelHandlerContext ctx) {
// 读取超时时间, 纳秒
long nextDelay = readerIdleTimeNanos;
// 当通道阅读时候 这个值是true, 当通道读取完毕后, 这个值是false
if (!reading) {
// 当前系统时间纳秒 减 最后一次读取时间 channelReadComplete(ChannelHandlerContext ctx)赋值
nextDelay -= ticksInNanos() - lastReadTime;
}
// 当小于或等于0 说明已经到达超时时间
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
// 设置新的监听超时的定时任务
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 创建对应事件类型, 是否是第一次触发事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 触发事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
// 没有超时情况下, 设置下一次监听触发的定时任务,
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
// 触发(调用)当前处理器的 fireUserEventTriggered 事件
ctx.fireUserEventTriggered(evt);
}
WriterIdleTimeoutTask
protected void run(ChannelHandlerContext ctx) {
// 最后一次写时间 hasOutputChanged() 赋值
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
// writerIdleTimeNanos是设置写超时时间, 当前纳秒 - 最后一次写时间
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
// 说明超时
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
// 设置新的监听超时的定时任务
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
// 出站较慢数据的判断
if (hasOutputChanged(ctx, first)) {
return;
}
// 创建对应事件类型, 是否是第一次触发事件
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
// 触发事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
// 没有超时情况下, 设置下一次监听触发的定时任务,
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
写任务的 run 代码逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对 出站较慢数据的判断 hasOutputChanged
AllIdleTimeoutTask
protected void run(ChannelHandlerContext ctx) {
// 读写超时纳秒
long nextDelay = allIdleTimeNanos;
if (!reading) {
// 当前纳秒时间减去 最后一次写或读 的时间
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
// 若大于 0,说明超时了
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
// 设置新的监听超时的定时任务
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
// 出站较慢数据的判断
if (hasOutputChanged(ctx, first)) {
return;
}
// 创建对应事件类型, 是否是第一次触发事件
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
// 触发事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
// 没有超时情况下, 设置下一次监听触发的定时任务,
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致:
这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。
1