
关系继承图

由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理.
一个关于ByteToMessageDecoder实例分析
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
说明:
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());//自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
ctx.writeAndFlush(123456L);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(msg);
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入出站的handler 对数据进行一个编码
pipeline.addLast(new MyLongToByteEncoder());
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义的handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Myserver {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());//自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("读取到从客户端"+msg);
//给客户端发送一个long
ctx.writeAndFlush(98765l);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//入站的Handler进行解码 MyByteToLongDecoder
//MessageToByteEncoder 和 ByteToMessageDecoder不冲突
pipeline.addLast(new MyByteToLongDecoder());
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler 处理业务罗技
pipeline.addLast(new MyServerHandler());
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.List;
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encode 被调用");
System.out.println("msg="+msg);
out.writeLong(msg);
}
}
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*decode 会根据接收的数据,被调用多次,知道确定没有新的元素被添加到list,或者是ByteBuf 没有更多的可读字节为止
* 如果list out不为空 就会将list的内容传递给下一个channelinboundhandler处理,该处理器的方法也会调用多次
*
* @param ctx 上下文 the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in 入站的ByteBuf the {@link ByteBuf} from which to read data
* @param out List集合,将解码后的数据传给下一个handler the {@link List} to which decoded messages should be added
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decoder被调用");
//因为long8个字节,需要判断有8个字节才能读取一个long
if (in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行
在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果和期望结果可能不一致
package com.jhj.netty.inboundhandlerandoutboundhandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
/**
*decode 会根据接收的数据,被调用多次,知道确定没有新的元素被添加到list,或者是ByteBuf 没有更多的可读字节为止
* 如果list out不为空 就会将list的内容传递给下一个channelinboundhandler处理,该处理器的方法也会调用多次
*
* @param ctx 上下文 the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in 入站的ByteBuf the {@link ByteBuf} from which to read data
* @param out List集合,将解码后的数据传给下一个handler the {@link List} to which decoded messages should be added
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decoder被调用");
//不需要判断数据是否足够读取内部会进行哦按段
out.add(in.readLong());
}
}

如有问题,欢迎指正!