补充:文末附带了 log4j整合到netty的配置;
1)概述:
2)入站事件如下(inbound):
3)出站事件很简单,如下(outbound):
4)netty应用包含许多网络和应用逻辑事件及它们的处理器。
1)入站和出栈事件都会经过预设的处理器链(多个处理器);
2)处理器举例:
3)以客户端服务器模式介绍入站与出站处理器的事件处理过程
【图解】
客户端的处理器有:
服务端的处理器有:
补充:多个处理器封装到通道管道 ChannelPipeline;
1)需求描述:
2)通道管道ChannelPipeline 可以封装多个处理器;其处理器执行顺序特别重要(前后关系特别重要,如入栈解码处理器要第1个执行,又如出站编码器要最后一个执行),否则客户端与服务器将无法通信(因为事件或数据要经过所有的处理器);类似于如下:
- for (event event : events) {
- handler1(event);
- handler2(event);
- handler3(event);
- }
3)入站与出站处理器执行顺序:
3.1)服务器初始化器,添加处理器到管道;
3.2)客户端初始化器:
1)服务器:用于监听网络端口,处理请求;
- /**
- * @Description 测试handler链的服务器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class NettyServerForHandlerChain80 {
-
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new NettyServerInitializer()); // 自定义一个初始化类
- // 自动服务器
- ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
- System.out.println("服务器启动成功");
- // 监听关闭
- channelFuture.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
2)通道初始化器:把多个处理器添加到通道管道;
- /**
- * @Description 初始化器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class NettyServerInitializer extends ChannelInitializer
{ - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- // 入站handler,把字节型数据解码为long型
- // pipeline.addLast(new MyByte2LongDecoder()); // MyByte2LongDecoder 与 MyByte2LongDecoder2 等价
- pipeline.addLast(new MyByte2LongDecoder2());
-
- // 出站handler, 把long型数据编码为字节(编码器)
- pipeline.addLast(new MyLong2ByteEncoder());
-
- // 添加业务逻辑handler
- pipeline.addLast(new NettyServerHandler());
-
- System.out.println("NettyServerInitializer.initChannel 执行成功.");
- }
- }
3)服务端业务处理器:业务逻辑处理;
- /**
- * @Description nety服务器handler
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class NettyServerHandler extends SimpleChannelInboundHandler
{ - // 被调用多次
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
- System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);
- // 给客户端回送消息
- ctx.writeAndFlush(98765L);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
1)客户端:建立与服务器的连接;
- /**
- * @Description netty客户端
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class NettyClientForHandlerChain81 {
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .handler(new NettyClientInitializer()); // 自定义一个初始化类
- // 连接服务器
- ChannelFuture channelFuture = bootstrap.connect("localhost", 8089).sync();
- channelFuture.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- }
- }
- }
2)通道初始化器:添加多个处理器到通道管道
- /**
- * @Description netty客户端初始化器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class NettyClientInitializer extends ChannelInitializer
{ - @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- // 出站handler,把long型数据解码为字节型
- pipeline.addLast(new MyLong2ByteEncoder());
-
- // 入站handler,把字节型数据解码为long型
- pipeline.addLast(new MyByte2LongDecoder());
-
- // 添加一个自定义handler(入站),处理业务逻辑
- pipeline.addLast(new NettyClientHandler());
- }
- }
3)客户端业务处理器:业务逻辑处理 ;
- /**
- * @Description 客户端处理器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class NettyClientHandler extends SimpleChannelInboundHandler
{ -
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
- System.out.println("得到服务器ip = " + ctx.channel().remoteAddress());
- System.out.println("收到服务器消息 = " + msg);
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("NettyClientHandler 发送数据");
- // 1 分析
- // 1.1 abcdefgabcdefgab 是16个字节, long型占8个字节,所以服务器需要解码2次,每次解码8个字节
- // 1.2 该处理器的前一个handler是 MyLong2ByteEncoder,
- // 1.3 MyLong2ByteEncoder 的父类是 MessageToByteEncoder
- // 1.4 MessageToByteEncoder.write()方法通过acceptOutboundMessage判断当前msg是否为要处理的数据类型;
- // 若不是,则跳过encode方法, 否则执行对应的encode 方法(处理方法)
-
- // 客户端发送一个ByteBuf,不走Long型编码器
- ctx.writeAndFlush(Unpooled.copiedBuffer("abcdefgabcdefgab", StandardCharsets.UTF_8));
- // 客户端发送一个Long,走Long型编码器
- // ctx.writeAndFlush(123456L); // 发送一个long
- }
- }
1)字节转 Long型的解码器处理器
- /**
- * @Description 字节转long的解码器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class MyByte2LongDecoder extends ByteToMessageDecoder {
-
- /**
- * @description decode 会根据接收的数据,被调用多次,直到确定没有新元素被添加到list为止, 或者 ByteBuf没有更多的可读字节为止;
- * 如果list out 不为空,就会将list的内容传递给下一个 ChannelInboundHandler,
- * 且下一个 ChannelInboundHandler的处理方法也会被调用多次
- * @param ctx 处 理器上下文
- * @param in 字节输入缓冲
- * @param out 集合,把处理后的数据传给下一个 ChannelInboundHandler
- *
- * @return
- * @author xiao tang
- * @date 2022/9/10
- */
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List throws Exception {
- System.out.println("decode解码");
- // long 8个字节,需要判断有8个字节,才能读取一个long
- if (in.readableBytes() >= 8) {
- out.add(in.readLong());
- }
- }
- }
一个 解码器 变体;
- /**
- * @Description 字节转long的解码器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class MyByte2LongDecoder2 extends ReplayingDecoder {
-
- /**
- * @description decode 会根据接收的数据,被调用多次,直到确定没有新元素被添加到list为止, 或者 ByteBuf没有更多的可读字节为止;
- * 如果list out 不为空,就会将list的内容传递给下一个 ChannelInboundHandler,
- * 且下一个 ChannelInboundHandler的处理方法也会被调用多次
- * @param ctx 处 理器上下文
- * @param in 字节输入缓冲
- * @param out 集合,把处理后的数据传给下一个 ChannelInboundHandler
- *
- * @return
- * @author xiao tang
- * @date 2022/9/10
- */
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List throws Exception {
- System.out.println("MyByte2LongDecoder2被调用, decode解码");
- // ReplayingDecoder,无需判断字节流是否足够读取,内部会进行处理判断
- // if (in.readableBytes() >= 8) { // 无需判断
- out.add(in.readLong());
- // }
- }
- }
2)Long型转字节的编码器处理器
- /**
- * @Description long转字节的编码器
- * @author xiao tang
- * @version 1.0.0
- * @createTime 2022年09月10日
- */
- public class MyLong2ByteEncoder extends MessageToByteEncoder
{ - @Override
- protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
- System.out.println("MyLong2ByteEncoder.encode 被调用");
- System.out.println("MyLong2ByteEncoder msg = " + msg);
- out.writeLong(msg);
- }
- }
1)客户端:
- decode解码
- 得到服务器ip = localhost/127.0.0.1:8089
- 收到服务器消息 = 98765
- decode解码
- 得到服务器ip = localhost/127.0.0.1:8089
- 收到服务器消息 = 98765
2)服务器:
- MyByte2LongDecoder2被调用, decode解码
- 从客户端/127.0.0.1:56272读取到long7017280452245743457
- MyLong2ByteEncoder.encode 被调用
- MyLong2ByteEncoder msg = 98765
- MyByte2LongDecoder2被调用, decode解码
- 从客户端/127.0.0.1:56272读取到long7089620625083818338
- MyLong2ByteEncoder.encode 被调用
- MyLong2ByteEncoder msg = 98765
1)引入log4j maven依赖;
- <dependency>
- <groupId>log4jgroupId>
- <artifactId>log4jartifactId>
- <version>1.2.17version>
- dependency>
- <dependency>
- <groupId>org.slf4jgroupId>
- <artifactId>slf4j-apiartifactId>
- <version>1.7.25version>
- dependency>
- <dependency>
- <groupId>org.slf4jgroupId>
- <artifactId>slf4j-log4j12artifactId><version>1.7.25version>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>org.slf4jgroupId>
- <artifactId>slf4j-simpleartifactId><version>1.7.25version>
- <scope>testscope>
- dependency>
2)配置log4j;
- log4j.rootLogger=DEBUG, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=[%p] %C{1} - %m%n
3)效果: