本文总结自B站《尚硅谷-netty》,很不错; 内容如下:
1)编解码器应用场景
【图1】 编码与解码流程图

2) codec(编解码器) 的组成部分有两个:decoder(解码器)和encoder(编码器)。
补充: 编解码器对应的英文“codec”(compress和decompress简化而成的合成词语 ),from wikipedia;
3)java序列化的问题(所以才引入了 protobuf):
引入新的解决方案: google 的 Protobuf ;
0) intro2 protobuf, refer2 https://developers.google.com/protocol-buffers ;
1)ProtoBuf定义:
2)ProtoBuf 是google提出的结构数据序列化方法,可简单类比于 XML,其具有以下特点:
【小结】
3)基于protobuf的编码与解码流程

【图解】 使用protobuf序列化 java对象的步骤;
【补充】关于 protobuf的更多介绍,refer2 https://www.jianshu.com/p/a24c88c0526a
说明:
- <dependency>
- <groupId>com.google.protobufgroupId>
- <artifactId>protobuf-javaartifactId>
- <version>3.6.1version>
- dependency>
Student.proto
- syntax="proto3"; // 版本
- option java_outer_classname = "StudentPOJO"; // 生成的外部类名,同时也是文件名
- // protobuf 使用 message 管理数据
- message Student { // 会在 StudentPOJO 外部类生成一个内部类 Student,他是真正方发送的POJO对象
- int32 id = 1; // Student类中有一个属性,属性名为 id,类型为 int32; 1 表示属性序号并不是属性值;
- string name = 2;
-
- }
编译命令:
protoc.exe --java_out=. Student.proto
【补充】
1)客户端添加 protobuf 编码器 ProtobufEncoder;

2) 客户端处理器

添加 protobuf解码器 ProtobufDecoder ;

netty服务器 处理器,接收POJO类对象,pojo类是由protobuf编译而成; 

1)netty服务器 (注意它添加的protobuf解码器 ProtobufDecoder )
- public class ProtobufNettyServer76 {
-
- public static void main(String[] args) throws InterruptedException {
- // 创建 BossGroup 和 WorkerGroup
- // 1. 创建2个线程组 bossGroup, workerGroup
- // 2 bossGroup 仅处理连接请求; 真正的业务逻辑,交给workerGroup完成;
- // 3 两个线程组都是无限循环
- // 4 bossGroup 和 workerGroup 含有的子线程(NIOEventLoop)个数
- // 默认是 cpu核数 * 2
- EventLoopGroup boosGroup = new NioEventLoopGroup();
- EventLoopGroup workerGruop = new NioEventLoopGroup();
-
- try {
- // 创建服务器端的启动对象, 配置参数
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(boosGroup, workerGruop) // 设置2个线程组
- .channel(NioServerSocketChannel.class) // 使用NIOSocketChannel 作为服务器的通道实现
- .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
- .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
- .childHandler(new ChannelInitializer
() { // 创建一个通道初始化对象 - // 给 pipeline 设置处理器
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 添加 protobuf 解码器, 指定对哪种对象进行解码
- pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
- // 添加业务处理器
- // pipeline.addLast(new ProtobufNettyServerHandler());
- pipeline.addLast(new ProtobufNettyServerHandler2());
- }
- }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
- System.out.println("... server is ready.");
-
- // 启动服务器, 绑定端口并同步处理 ,生成一个 ChannelFuture对象
- ChannelFuture channelFuture = bootstrap.bind(6668).sync();
- channelFuture.addListener((future1) -> System.out.println("Finish binding"));
-
- // 给 channelFuture 注册监听器,监听我们关心的事件
- channelFuture.addListener(future -> {
- if (future.isSuccess()) {
- System.out.println("监听端口6668 成功");
- }else {
- System.out.println("监听端口6668 失败");
- }
- });
-
- // 对关闭通道进行监听
- channelFuture.channel().closeFuture().sync();
- } finally {
- // 优雅关闭
- boosGroup.shutdownGracefully();
- workerGruop.shutdownGracefully();
- }
- }
- }
2)netty服务器处理器(channelRead0方法中的 StudentPOJO.Student javabean就是由 protobuf编译生成的java类)
- public class ProtobufNettyServerHandler2 extends SimpleChannelInboundHandler
{ -
- // 读写数据事件(读取客户端发送的消息)
- // 1. ChannelHandlerContext ctx: 上下文信息,包括管道pipeline,通道channel,地址
- // 2. Object msg: 客户端发送的数据,默认是 Object
- @Override
- public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student student) throws Exception {
- System.out.println("SimpleChannelInboundHandler子类handler-客户端发送的数据 id=" + student.getId() + " name=" + student.getName());
- }
-
- // 数据读取完毕,回复客户端
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- // writeAndFlush 是 write + flush ;把数据写入到缓冲,并刷新
- ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
- channelFuture.addListener(future -> System.out.println("回复成功"));
- }
-
- // 处理异常,关闭通道
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.channel().close();
- }
- }
1)netty客户端 (注意其添加的 Protobuf编码器 ProtobufEncoder)
- public class ProtobufNettyClient76 {
- public static void main(String[] args) throws InterruptedException {
- // 客户端需要一个事件循环组
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- try {
- // 创建客户端启动对象, 注意是 BootStrap
- Bootstrap bootstrap = new Bootstrap();
- // 设置相关参数
- bootstrap.group(eventLoopGroup) // 设置线程组
- .channel(NioSocketChannel.class) // 设置客户端通道实现类(反射)
- .handler(new ChannelInitializer
() { - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 添加 ProtobufEncoder 编码器
- pipeline.addLast(new ProtobufEncoder());
- // 添加业务处理器
- pipeline.addLast(new ProtobufNettyClientHandler()); // 加入自己的处理器
- }
- });
- System.out.println("client is ok");
- // 启动客户端去连接服务器
- // ChannelFuture, 设计到netty的异步模型
- ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
- // 给关闭通道进行监听
- channelFuture.channel().closeFuture().sync();
- } finally {
- eventLoopGroup.shutdownGracefully();
- }
- }
- }
2)netty客户端处理器 (注意 channelActive方法中的 StudentPOJO.Student,是由Protobuf编译生成的POJO类)
- public class ProtobufNettyClientHandler extends ChannelInboundHandlerAdapter {
- // 当通道就绪就会触发该方法
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 发送一个 StudentPOJO 对象到服务器
- StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("zhangsan").build();
- ctx.writeAndFlush(student);
- }
-
- // 当通道有读取事件时,会触发
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf) msg;
- System.out.println("服务器回复消息:" + byteBuf.toString(StandardCharsets.UTF_8) + ", 当前时间=" + DateUtils.getNowTimestamp());
- System.out.println("服务器地址:" + ctx.channel().remoteAddress());
- }
-
- // 捕获异常
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }