• Netty编解码器


    1、Java的编解码

            编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。

            解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

            java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。 

            Java序列化目的:1.网络传输。2.对象持久化。

            Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低。

            Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。

    2、Netty编解码器

    1. 概念

            在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。

            对于Netty而言,编解码器由两部分组成:编码器、解码器。

    • 解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
    • 编码器:将消息对象转成字节或其他序列形式在网络上传输。

            Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。

    Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站OutboundHandler” 数据。

    2. 解码器(Decoder)

            解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder

    抽象解码器 

    • ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
    • ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
    • MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)

    核心方法:

    decode(ChannelHandlerContext ctx, ByteBuf msg, List out) 
    

    代码实现:

    解码器

    1. package com.lagou.code;
    2. import io.netty.buffer.ByteBuf;
    3. import io.netty.channel.ChannelHandlerContext;
    4. import io.netty.handler.codec.MessageToMessageDecoder;
    5. import io.netty.util.CharsetUtil;
    6. import java.util.List;
    7. /**
    8. * 消息解码器
    9. */
    10. public class MessageDecoder extends MessageToMessageDecoder {
    11. @Override
    12. protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    13. System.out.println("正在进行消息解码.....");
    14. ByteBuf byteBuf = (ByteBuf) msg;
    15. out.add(byteBuf.toString(CharsetUtil.UTF_8)); // 传递到下一个Handler
    16. }
    17. }

    通道读取方法:

    1. /**
    2. * 通道读取事件
    3. *
    4. * @param ctx
    5. * @param msg
    6. * @throws Exception
    7. */
    8. @Override
    9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    10. System.out.println("客户端发送过来的消息:" + msg);
    11. }

    启动类:

    1. protected void initChannel(SocketChannel ch) throws Exception {
    2. //8. 向pipeline中添加自定义业务处理handler
    3. ch.pipeline().addLast(new MessageDecoder());//添加解码器
    4. ch.pipeline().addLast(new NettyServerHandler());
    5. }

    3. 编码器(Encoder)

            与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。

    抽象编码器 

    • MessageToByteEncoder: 将消息转化成字节
    • MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)

    核心方法:

    encode(ChannelHandlerContext ctx, String msg, List out)

    代码实现:

    编码器:

    1. package com.lagou.code;
    2. import io.netty.buffer.Unpooled;
    3. import io.netty.channel.ChannelHandlerContext;
    4. import io.netty.handler.codec.MessageToMessageEncoder;
    5. import io.netty.util.CharsetUtil;
    6. import java.util.List;
    7. /**
    8. * 消息的编码
    9. */
    10. public class MessageEncoder extends MessageToMessageEncoder {
    11. @Override
    12. protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    13. System.out.println("消息正在进行编码.....");
    14. String str = (String) msg;
    15. out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
    16. }
    17. }

    消息发送:

    1. @Override
    2. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    3. // ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
    4. ChannelFuture future = ctx.writeAndFlush("你好呀,我是Netty客户端");
    5. future.addListener(new ChannelFutureListener() {
    6. @Override
    7. public void operationComplete(ChannelFuture future) throws Exception {
    8. if (future.isSuccess()) {
    9. System.out.println("数据发送成功");
    10. } else {
    11. System.out.println("数据发送失败");
    12. }
    13. }
    14. });
    15. }

    启动类

    1. @Override
    2. protected void initChannel(SocketChannel ch) throws Exception {
    3. //6. 向pipeline中添加自定义业务处理handler
    4. ch.pipeline().addLast(new MessageDecoder());//添加解码器
    5. ch.pipeline().addLast(new MessageEncoder());//添加编码器
    6. ch.pipeline().addLast(new NettyClientHandler());
    7. }

    4. 编码解码器Codec

            编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。

    Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec ,MessageToMessageCodec都继承与此类. 

    代码实现:

    1. package com.lagou.code;
    2. import io.netty.buffer.ByteBuf;
    3. import io.netty.buffer.Unpooled;
    4. import io.netty.channel.ChannelHandlerContext;
    5. import io.netty.handler.codec.MessageToMessageCodec;
    6. import io.netty.util.CharsetUtil;
    7. import java.util.List;
    8. /**
    9. * 消息编解码器
    10. */
    11. public class MessageCodec extends MessageToMessageCodec {
    12. /**
    13. * 编码
    14. *
    15. * @param channelHandlerContext
    16. * @param o
    17. * @param list
    18. * @throws Exception
    19. */
    20. @Override
    21. protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    22. System.out.println("消息正在进行编码.....");
    23. String str = (String) msg;
    24. out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
    25. }
    26. /**
    27. * 解码
    28. *
    29. * @param channelHandlerContext
    30. * @param o
    31. * @param list
    32. * @throws Exception
    33. */
    34. @Override
    35. protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    36. System.out.println("正在进行消息解码.....");
    37. ByteBuf byteBuf = (ByteBuf) msg;
    38. out.add(byteBuf.toString(CharsetUtil.UTF_8)); // 传递到下一个Handler
    39. }
    40. }

    启动类:

    1. protected void initChannel(SocketChannel ch) throws Exception {
    2. //8. 向pipeline中添加自定义业务处理handler
    3. ch.pipeline().addLast(new MessageCoder());//添加编解码器
    4. ch.pipeline().addLast(new NettyServerHandler());
    5. }

    注意:编解码器handler一定要定义在业务处理handler的前面

    3、客户端完整代码示例

    1. package com.lagou.code;
    2. import io.netty.bootstrap.Bootstrap;
    3. import io.netty.channel.ChannelFuture;
    4. import io.netty.channel.ChannelInitializer;
    5. import io.netty.channel.EventLoopGroup;
    6. import io.netty.channel.nio.NioEventLoopGroup;
    7. import io.netty.channel.socket.SocketChannel;
    8. import io.netty.channel.socket.nio.NioSocketChannel;
    9. /**
    10. * Netty客户端
    11. */
    12. public class NettyClient {
    13. public static void main(String[] args) throws InterruptedException {
    14. // 1. 创建线程组
    15. EventLoopGroup group = new NioEventLoopGroup();
    16. // 2. 创建客户端启动助手
    17. Bootstrap bootstrap = new Bootstrap();
    18. // 3. 设置线程组
    19. bootstrap.group(group)
    20. .channel(NioSocketChannel.class) // 4. 设置客户端通道实现为NIO
    21. .handler(new ChannelInitializer() { // 5. 创建一个通道初始化对象
    22. @Override
    23. protected void initChannel(SocketChannel ch) throws Exception {
    24. /* // 添加解码器
    25. ch.pipeline().addLast("messageDecoder", new MessageDecoder());
    26. // 添加编码器
    27. ch.pipeline().addLast("messageEncoder", new MessageEncoder()); */
    28. // 添加编解码器
    29. ch.pipeline().addLast("", new MessageCodec());
    30. // 6. 向pipeline中添加自定义业务处理handler
    31. ch.pipeline().addLast(new NettyClientHandler());
    32. }
    33. });
    34. // 7. 启动客户端,等待连接服务端,同时将异步改为同步
    35. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
    36. // 8. 关闭通道和关闭连接池
    37. channelFuture.channel().closeFuture().sync();
    38. group.shutdownGracefully();
    39. }
    40. }

    注:服务端与之类似,省略。

  • 相关阅读:
    台湾飞凌FM8PB513B单片机提供单片机方案开发 产品设计
    协议类型(总结为主,非详细)
    Pr怎么消除人声?三个方法解决!
    RabbitMQ小结
    Java项目:SSM汽车维修预约平台
    【Linux】《Linux命令行与shell脚本编程大全 (第4版) 》笔记-Chapter5-理解 shell
    最大流的算法 Algorithm for Maximum Flow
    Spring面试题(2022)
    y85.第四章 Prometheus大厂监控体系及实战 -- prometheus告警机制进阶、pushgateway和prometheus存储(十六)
    [附源码]Python计算机毕业设计钓鱼爱好者交流平台
  • 原文地址:https://blog.csdn.net/weixin_52851967/article/details/126116192