• 10.netty客户端与服务器使用protobuf传输报文


    【README】

    本文总结自B站《尚硅谷-netty》,很不错; 内容如下:

    •  netty的编码器与解码器;
    • netty客户端与服务器通过 protobuf  传输报文的开发方式;
    • 文末po出了所有代码;

    【1】netty的编码器与解码器 codec

    1)编解码器应用场景

    • 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码 ;如下图

                                                      【图1】 编码与解码流程图

    2) codec(编解码器) 的组成部分有两个:decoder(解码器)和encoder(编码器)。

    • encoder 负责把业务数据(如pojo对象)转换成字节码数据;
    • decoder 负责把字节码数据转换成业务数据(如pojo对象);

    补充: 编解码器对应的英文“codec”(compress和decompress简化而成的合成词语 ),from wikipedia;

    3)java序列化的问题(所以才引入了 protobuf):

    • Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现POJO对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题
      •  无法跨语言;
      •  序列化后的体积太大,是二进制编码的 5 倍多;
      •  序列化性能太低;

    引入新的解决方案: google 的 Protobuf


    【2】Protobuf

    【2.1】概述

    0) intro2 protobuf, refer2 https://developers.google.com/protocol-buffers ;

    1)ProtoBuf定义:

    • 是 Protocol Buffer 的缩写,即协议缓冲区;它是google实现的一种开源跨平台的序列化资料结构的协议

    2)ProtoBuf 是google提出的结构数据序列化方法,可简单类比于 XML,其具有以下特点:

    • ① 语言无关、平台无关。即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持多个平台;
    • ② 高效。即比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更为简单;
    • ③ 扩展性、兼容性好。你可以更新数据结构,而不影响和破坏原有的旧程序;

    【小结】

    • 说的直接点, protobuf序列化性能 优于  java序列化
    • 因为protobuf 很适合做数据存储或 RPC[远程过程调用]数据交换格式,所以目前有些公司把远程调用的报文传输方式从 http+json 修改为 tcp+protobuf

    3)基于protobuf的编码与解码流程

    【图解】 使用protobuf序列化 java对象的步骤;

    • 步骤1:编写 proto 文件;
    • 步骤2:使用 protoc.exe 编译把 proto文件编译为 java文件(pojo文件);
    • 步骤3:创建pojo对象并赋值,发送到服务器;

    【补充】关于 protobuf的更多介绍,refer2  https://www.jianshu.com/p/a24c88c0526a   


     【2.2】 netty客户端服务器通过protobuf传输报文

    说明:

    • 为了清晰展示 protobuf 的开发方式,下文使用了截图;但 netty通过 protobuf 传输报文的所有代码,会在文末po出;

    【步骤1】 引入 protobuf 依赖

    1. <dependency>
    2. <groupId>com.google.protobufgroupId>
    3. <artifactId>protobuf-javaartifactId>
    4. <version>3.6.1version>
    5. dependency>

    【步骤2】编写 proto 文件 

    Student.proto

    1. syntax="proto3"; // 版本
    2. option java_outer_classname = "StudentPOJO"; // 生成的外部类名,同时也是文件名
    3. // protobuf 使用 message 管理数据
    4. message Student { // 会在 StudentPOJO 外部类生成一个内部类 Student,他是真正方发送的POJO对象
    5. int32 id = 1; // Student类中有一个属性,属性名为 id,类型为 int32; 1 表示属性序号并不是属性值;
    6. string name = 2;
    7. }

    【步骤3】编译 proto文件到 POJO javabean 文件 

    编译命令:

    protoc.exe --java_out=. Student.proto

    【补充】


    【步骤4】 客户端发送 Student 对象到服务器

    1)客户端添加 protobuf 编码器 ProtobufEncoder;

    2) 客户端处理器

    • 根据 protobuf编译成的 POJO类文件,创建对应javabean对象,并发送到服务器;


    【步骤5】 服务器解码protobuf字节码

    添加 protobuf解码器 ProtobufDecoder

    netty服务器 处理器,接收POJO类对象,pojo类是由protobuf编译而成;

     【演示结果】netty客户端与服务器使用 probobuf传输报文


    【3】netty 使用 protobuf传输报文的客户端与服务器代码(源代码)

    【3.1】netty服务器

    1)netty服务器 (注意它添加的protobuf解码器 ProtobufDecoder

    1. public class ProtobufNettyServer76 {
    2. public static void main(String[] args) throws InterruptedException {
    3. // 创建 BossGroup 和 WorkerGroup
    4. // 1. 创建2个线程组 bossGroup, workerGroup
    5. // 2 bossGroup 仅处理连接请求; 真正的业务逻辑,交给workerGroup完成;
    6. // 3 两个线程组都是无限循环
    7. // 4 bossGroup 和 workerGroup 含有的子线程(NIOEventLoop)个数
    8. // 默认是 cpu核数 * 2
    9. EventLoopGroup boosGroup = new NioEventLoopGroup();
    10. EventLoopGroup workerGruop = new NioEventLoopGroup();
    11. try {
    12. // 创建服务器端的启动对象, 配置参数
    13. ServerBootstrap bootstrap = new ServerBootstrap();
    14. bootstrap.group(boosGroup, workerGruop) // 设置2个线程组
    15. .channel(NioServerSocketChannel.class) // 使用NIOSocketChannel 作为服务器的通道实现
    16. .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
    17. .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
    18. .childHandler(new ChannelInitializer() { // 创建一个通道初始化对象
    19. // 给 pipeline 设置处理器
    20. @Override
    21. protected void initChannel(SocketChannel socketChannel) throws Exception {
    22. ChannelPipeline pipeline = socketChannel.pipeline();
    23. // 添加 protobuf 解码器, 指定对哪种对象进行解码
    24. pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
    25. // 添加业务处理器
    26. // pipeline.addLast(new ProtobufNettyServerHandler());
    27. pipeline.addLast(new ProtobufNettyServerHandler2());
    28. }
    29. }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
    30. System.out.println("... server is ready.");
    31. // 启动服务器, 绑定端口并同步处理 ,生成一个 ChannelFuture对象
    32. ChannelFuture channelFuture = bootstrap.bind(6668).sync();
    33. channelFuture.addListener((future1) -> System.out.println("Finish binding"));
    34. // 给 channelFuture 注册监听器,监听我们关心的事件
    35. channelFuture.addListener(future -> {
    36. if (future.isSuccess()) {
    37. System.out.println("监听端口6668 成功");
    38. }else {
    39. System.out.println("监听端口6668 失败");
    40. }
    41. });
    42. // 对关闭通道进行监听
    43. channelFuture.channel().closeFuture().sync();
    44. } finally {
    45. // 优雅关闭
    46. boosGroup.shutdownGracefully();
    47. workerGruop.shutdownGracefully();
    48. }
    49. }
    50. }

    2)netty服务器处理器(channelRead0方法中的 StudentPOJO.Student javabean就是由 protobuf编译生成的java类

    1. public class ProtobufNettyServerHandler2 extends SimpleChannelInboundHandler {
    2. // 读写数据事件(读取客户端发送的消息)
    3. // 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
    4. // 2. Object msg: 客户端发送的数据,默认是 Object
    5. @Override
    6. public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student student) throws Exception {
    7. System.out.println("SimpleChannelInboundHandler子类handler-客户端发送的数据 id=" + student.getId() + " name=" + student.getName());
    8. }
    9. // 数据读取完毕,回复客户端
    10. @Override
    11. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    12. // writeAndFlush 是 write + flush ;把数据写入到缓冲,并刷新
    13. ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
    14. channelFuture.addListener(future -> System.out.println("回复成功"));
    15. }
    16. // 处理异常,关闭通道
    17. @Override
    18. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    19. ctx.channel().close();
    20. }
    21. }

    【3.2】netty客户端

    1)netty客户端 (注意其添加的 Protobuf编码器 ProtobufEncoder

    1. public class ProtobufNettyClient76 {
    2. public static void main(String[] args) throws InterruptedException {
    3. // 客户端需要一个事件循环组
    4. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    5. try {
    6. // 创建客户端启动对象, 注意是 BootStrap
    7. Bootstrap bootstrap = new Bootstrap();
    8. // 设置相关参数
    9. bootstrap.group(eventLoopGroup) // 设置线程组
    10. .channel(NioSocketChannel.class) // 设置客户端通道实现类(反射)
    11. .handler(new ChannelInitializer() {
    12. @Override
    13. protected void initChannel(SocketChannel socketChannel) throws Exception {
    14. ChannelPipeline pipeline = socketChannel.pipeline();
    15. // 添加 ProtobufEncoder 编码器
    16. pipeline.addLast(new ProtobufEncoder());
    17. // 添加业务处理器
    18. pipeline.addLast(new ProtobufNettyClientHandler()); // 加入自己的处理器
    19. }
    20. });
    21. System.out.println("client is ok");
    22. // 启动客户端去连接服务器
    23. // ChannelFuture, 设计到netty的异步模型
    24. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    25. // 给关闭通道进行监听
    26. channelFuture.channel().closeFuture().sync();
    27. } finally {
    28. eventLoopGroup.shutdownGracefully();
    29. }
    30. }
    31. }

    2)netty客户端处理器 (注意 channelActive方法中的 StudentPOJO.Student,是由Protobuf编译生成的POJO类) 

    1. public class ProtobufNettyClientHandler extends ChannelInboundHandlerAdapter {
    2. // 当通道就绪就会触发该方法
    3. @Override
    4. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    5. // 发送一个 StudentPOJO 对象到服务器
    6. StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("zhangsan").build();
    7. ctx.writeAndFlush(student);
    8. }
    9. // 当通道有读取事件时,会触发
    10. @Override
    11. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    12. ByteBuf byteBuf = (ByteBuf) msg;
    13. System.out.println("服务器回复消息:" + byteBuf.toString(StandardCharsets.UTF_8) + ", 当前时间=" + DateUtils.getNowTimestamp());
    14. System.out.println("服务器地址:" + ctx.channel().remoteAddress());
    15. }
    16. // 捕获异常
    17. @Override
    18. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    19. cause.printStackTrace();
    20. ctx.close();
    21. }
    22. }

  • 相关阅读:
    康耐视VisionPro 9.0 R2破解安装教程
    Android自动化测试32--Appium Desktop
    使用共享内存进行进程间通信
    Spring框架之IOC入门
    【数据库系统概论】第二章关系数据库
    Java开发Word转PDF技术栈汇总
    《优化接口设计的思路》系列:第四篇—接口的权限控制
    数据库级别的审计
    go ntp时间同步
    【数据结构】数组和字符串(四):特殊矩阵的压缩存储:稀疏矩阵——三元组表
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126692946