• 【Netty】七、Netty自定义编码和解码器


    一、Netty自定义编码和解码)

    编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转,网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码,因为编解码器由两部分组成:

    • Decoder(解码器)
    • Encoder(编码器)
      自定义编码器和解码器,可以参考netty里面内置的一些编码器和解码器的代码,比如:
      StringDecoder、StringEncoder
      ObjectDecoder、ObjectEncoder
      如下的例子是采用Fastjson将一个RpcMessage 的java对象在网络中传输所进行的编码和解码,更多场景可以采用该思路进行;
    • 解码:
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ByteBuf frame = in.retainedDuplicate();
        final String content = frame.toString(CharsetUtil.UTF_8);
        RpcMessage message = JSON.parseObject(content, RpcMessage.class);
        out.add(message);
        in.skipBytes(in.readableBytes());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    编码:

    @Override
    protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception {
        out.writeBytes(JSON.toJSONString(msg).getBytes(CharsetUtil.UTF_8));
    }
    
    • 1
    • 2
    • 3
    • 4

    服务端代码

    ProtocolServer

    /**
     * * Netty自定义编码和解码 服务端
     */
    public class ProtocolServer {
    
        public static void main(String[] args) {
            ProtocolServer server = new ProtocolServer();
            server.openSever(6666);
        }
    
        public void openSever(int port) {
            ServerBootstrap bootstrap = new ServerBootstrap();
    
            EventLoopGroup bootGroup = new NioEventLoopGroup(1); //connect \accept \read \write
            EventLoopGroup workGroup = new NioEventLoopGroup();
    
            bootstrap.group(bootGroup, workGroup);
            bootstrap.channel(NioServerSocketChannel.class);
    
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
    
                    //解决粘包拆包
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
                    pipeline.addLast(new LengthFieldPrepender(4));
    
                    //自定义的编码和解码
                    pipeline.addLast(new RpcDecoder());
                    pipeline.addLast(new RpcEncoder());
    
                    //业务处理handler
                    pipeline.addLast(ProtocolServerHandler.INSTANCE);
                }
            });
            try {
                System.out.println("服务启动成功");
                ChannelFuture f = bootstrap.bind(port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bootGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    ProtocolServerHandler

    import com.mytest.protocol.codec.RpcMessage;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    @ChannelHandler.Sharable
    public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {
    
        public static final ProtocolServerHandler INSTANCE = new ProtocolServerHandler();
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("服务端接收消息");
    
            if (msg instanceof RpcMessage) {
                RpcMessage rpcMessage = (RpcMessage) msg;
                System.out.println("服务端接收到的消息:" + rpcMessage);
    
                //ctx.write(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    客户端

    ProtocolClient

    import com.mytest.protocol.codec.RpcDecoder;
    import com.mytest.protocol.codec.RpcEncoder;
    import com.mytest.protocol.handler.ProtocolClientHandler;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    
    import java.io.IOException;
    
    
    /**
     * Netty自定义编码和解码 客户端
     */
    public class ProtocolClient {
    
        public static void main(String[] args) throws IOException {
            ProtocolClient client = new ProtocolClient("127.0.0.1", 6666);
        }
    
        public ProtocolClient(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup(1);
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.remoteAddress(host, port);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
    
                    //netty的粘包和拆包
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
                    pipeline.addLast(new LengthFieldPrepender(4));
    
                    //自定义的编码和解码
                    pipeline.addLast(new RpcDecoder());
                    pipeline.addLast(new RpcEncoder());
    
                    //业务处理的handler
                    pipeline.addLast(ProtocolClientHandler.INSTANCE);
                }
            });
            try {
                ChannelFuture f = bootstrap.connect().sync();
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("连接关闭,资源释放");
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    ProtocolClientHandler

    import com.mytest.protocol.codec.RpcMessage;
    import com.mytest.protocol.codec.RpcMessageType;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    @ChannelHandler.Sharable
    public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {
    
        public static final ProtocolClientHandler INSTANCE = new ProtocolClientHandler();
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
            //调用远程,传输一个java对象
            RpcMessage rpcMessage = new RpcMessage();
            rpcMessage.setContent("消息");
            rpcMessage.setSender("发送者");
            StringBuffer stringBuffer = new StringBuffer();
            for (int i=0; i<100; i++) {
                stringBuffer.append("这是一个消息字名字的");
            }
            rpcMessage.setReceiver(stringBuffer.toString());
            rpcMessage.setRpcMessageType(RpcMessageType.LOGIN);
            rpcMessage.setTime(System.currentTimeMillis());
    
            for (int i=0; i<20; i++) {
                ctx.writeAndFlush(rpcMessage);
            }
            System.out.println("启动后消息发送完毕");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("客户端接收");
            if (msg instanceof RpcMessage) {
                RpcMessage rpcMessage = (RpcMessage) msg;
                System.out.println("客户端接收到的消息:" + rpcMessage);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    编解码器

    RpcDecoder 、RpcEncoder

    /**
     * 网络通信传输的 字节流数据 的解码器(字节流解码成程序能识别的数据(比如解码成一个java对象))
     *
     */
    public class RpcDecoder extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
    
            //复制一份 字节流数据
            ByteBuf frame = byteBuf.retainedDuplicate();
    
            //把字节流数据转成字符串
            final String content = frame.toString(CharsetUtil.UTF_8);
    
            //把字符串通过fastjson转成java对象
            RpcMessage message = JSON.parseObject(content, RpcMessage.class);
    
            //把得到的java对象传给下一个handler
            out.add(message);
    
            //buf的数据已经读取过了,跳过已经读取的数据,更新一下buf的index下标
            byteBuf.skipBytes(byteBuf.readableBytes());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    import com.alibaba.fastjson.JSON;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import io.netty.util.CharsetUtil;
    
    public class RpcEncoder extends MessageToByteEncoder<RpcMessage> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, RpcMessage msg, ByteBuf out) throws Exception {
            out.writeBytes(JSON.toJSONString(msg).getBytes(CharsetUtil.UTF_8));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    @Data
    public class RpcMessage {
    
        private RpcMessageType rpcMessageType; //消息类型[LOGIN]或者[SYSTEM]或者[LOGOUT]
    
        private long time;               //消息发送时间
    
        private String sender;           //发送人
    
        private String receiver;         //接收人
    
        private Object content;          //消息内容
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    public enum RpcMessageType {
    
        /**
         * 系统消息
         */
        SYSTEM("SYSTEM"),
    
        /**
         * 登录消息
         */
        LOGIN("LOGIN"),
    
        /**
         * 登出消息
         */
        LOGOUT("LOGOUT"),
    
        /**
         * 调用消息
         */
        INVOKE("INVOKE");
    
        private String name;
    
        public static boolean isRpcMessageType(String content) {
            return content.matches("^\\[(SYSTEM|LOGIN|LOGIN|INVOKE)\\]");
        }
    
        RpcMessageType(String name) {
            this.name = name;
        }
    
        public String getName() {
            return this.name;
        }
    
        public String toString() {
            return this.name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
  • 相关阅读:
    springboot毕设项目高校宿舍管理系统的设计与实现ukgdt(java+VUE+Mybatis+Maven+Mysql)
    Pytorch加载数据初认识
    跨境电商首选腾讯云轻量应用服务器Lighthouse!
    Django笔记十之values_list指定字段取值及distinct去重处理
    【编程语言大比拼】java vs python vs js 如何编制对象数组的映射索引
    html+css鼠标悬停发光按钮![HTML鼠标悬停的代码]使用HTML + CSS实现鼠标悬停的一些奇幻效果!
    git强制删除本地分支 git branch -D
    【项目小tips】登录状态存储
    Matplotlib入门[01]——Pyplot
    selectTree单选iview+vue
  • 原文地址:https://blog.csdn.net/weixin_43333483/article/details/127715686