• Netty系列(一):Springboot整合Netty,自定义协议实现


    Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程。

    Springboot整合Netty

    新建springboot项目,并在项目以来中导入netty包,用fastjson包处理jsonStr。

    		
            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>4.1.42.Finalversion>
            dependency>
    
            
            <dependency>
                <groupId>com.alibaba.fastjson2groupId>
                <artifactId>fastjson2artifactId>
                <version>2.0.16version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    创建netty相关配置信息文件

    1. yml配置文件——application.yml
    # netty 配置
    netty:
      # boss线程数量
      boss: 4
      # worker线程数量
      worker: 2
      # 连接超时时间
      timeout: 6000
      # 服务器主端口
      port: 18023
      # 服务器备用端口
      portSalve: 18026
      # 服务器地址
      host: 127.0.0.1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. netty配置实体类——NettyProperties与yml配置文件绑定
      通过@ConfigurationProperties(prefix = "netty")注解读取配置文件中的netty配置,通过反射注入值,需要在实体类中提供对应的setter和getter方法。

    @ConfigurationProperties(prefix = "netty")对应的实体类属性名称不要求一定相同,只需保证“set”字符串拼接配置文件的属性和setter方法名相同即可。

    @Configuration
    @ConfigurationProperties(prefix = "netty")
    public class NettyProperties {
    
        /**
         * boss线程数量
         */
        private Integer boss;
    
        /**
         * worker线程数量
         */
        private Integer worker;
    
        /**
         * 连接超时时间
         */
        private Integer timeout = 30000;
    
        /**
         * 服务器主端口
         */
        private Integer port = 18023;
    
        /**
         * 服务器备用端口
         */
        private Integer portSalve = 18026;
    
        /**
         * 服务器地址 默认为本地
         */
        private String host = "127.0.0.1";
    	
    	// setter、getter 。。。。
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1. 对netty进行配置,绑定netty相关配置设置
      Netty通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。
    @Configuration
    @EnableConfigurationProperties
    public class NettyConfig {
        final NettyProperties nettyProperties;
    
        public NettyConfig(NettyProperties nettyProperties) {
            this.nettyProperties = nettyProperties;
        }
    
        /**
         * boss线程池-进行客户端连接
         *
         * @return
         */
        @Bean
        public NioEventLoopGroup boosGroup() {
            return new NioEventLoopGroup(nettyProperties.getBoss());
        }
    
        /**
         * worker线程池-进行业务处理
         *
         * @return
         */
        @Bean
        public NioEventLoopGroup workerGroup() {
            return new NioEventLoopGroup(nettyProperties.getWorker());
        }
    
        /**
         * 服务端启动器,监听客户端连接
         *
         * @return
         */
        @Bean
        public ServerBootstrap serverBootstrap() {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    // 指定使用的线程组
                    .group(boosGroup(), workerGroup())
                    // 指定使用的通道
                    .channel(NioServerSocketChannel.class)
                    // 指定连接超时时间
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                    // 指定worker处理器
                    .childHandler(new NettyServerHandler());
            return serverBootstrap;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    1. worker处理器,初始化通道以及配置对应管道的处理器
      自定义了##@##分割符,通过DelimiterBasedFrameDecoder来处理拆包沾包问题;
      通过MessageDecodeHandler将接收消息解码处理成对象实例;
      通过MessageEncodeHandler将发送消息增加分割符后并编码;
      最后通过ServerListenerHandler根据消息类型对应处理不同消息。
    public class NettyServerHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            // 数据分割符
            String delimiterStr = "##@##";
            ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节
            pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
            // 将上一步解码后的数据转码为Message实例
            pipeline.addLast(new MessageDecodeHandler());
            // 对发送客户端的数据进行编码,并添加数据分隔符
            pipeline.addLast(new MessageEncodeHandler(delimiterStr));
            // 对数据进行最终处理
            pipeline.addLast(new ServerListenerHandler());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 数据解码
      数据解码和编码都采用UTF8格式
    public class MessageDecodeHandler extends ByteToMessageDecoder {
    
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
            ByteBuf frame = in.retainedDuplicate();
            final String content = frame.toString(CharsetUtil.UTF_8);
            Message message = new Message(content);
            list.add(message);
            in.skipBytes(in.readableBytes());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 数据解码转换的实例
      Message类用于承载消息、转JsonString
    
    public class Message {
        /**
         * 数据长度
         */
        private Integer len;
    
        /**
         * 接收的通讯数据body
         */
        private String content;
    
        /**
         * 消息类型
         */
        private Integer msgType;
    
        public Message(Object object) {
            String str = object.toString();
            JSONObject jsonObject = JSONObject.parseObject(str);
            msgType = Integer.valueOf(jsonObject.getString("msg_type"));
            content = jsonObject.getString("body");
            len = str.length();
        }
    
        public String toJsonString() {
            return "{" +
                    "\"msg_type\": " + msgType + ",\n" +
                    "\"body\": " + content +
                    "}";
        }
    	// setter、getter 。。。。
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1. 数据编码
      netty服务端回复消息时,对消息转JsonString增加分割符,并进行编码。
    public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
        // 数据分割符
        String delimiter;
    
        public MessageEncodeHandler(String delimiter) {
            this.delimiter = delimiter;
        }
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
            out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    1. 数据处理器,针对不同类型数据分类处理
      在处理不同接收数据时使用了枚举类型,在使用switch时可以做下处理,具体参考代码,这里只演示如何操作,并没实现数据处理业务类。
    
    public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> {
        private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
    
        /**
         * 设备接入连接时处理
         *
         * @param ctx
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
        }
    
        /**
         * 数据处理
         *
         * @param ctx
         * @param msg
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
            // 获取消息实例中的消息体
            String content = msg.getContent();
            // 对不同消息类型进行处理
            MessageEnum type = MessageEnum.getStructureEnum(msg);
            switch (type) {
                case CONNECT:
                    // TODO 心跳消息处理
                case STATE:
                    // TODO 设备状态
                default:
                    System.out.println(type.content + "消息内容" + content);
            }
        }
    
        /**
         * 设备下线处理
         *
         * @param ctx
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            log.info("设备下线了:{}", ctx.channel().id().asLongText());
        }
    
        /**
         * 设备连接异常处理
         *
         * @param ctx
         * @param cause
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // 打印异常
            log.info("异常:{}", cause.getMessage());
            // 关闭连接
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    1. 数据类型枚举类
    
    public enum MessageEnum {
        CONNECT(1, "心跳消息"),
        STATE(2, "设备状态");
    
        public final Integer type;
        public final String content;
    
        MessageEnum(Integer type, String content) {
            this.type = type;
            this.content = content;
        }
    
        // case中判断使用
        public static MessageEnum getStructureEnum(Message msg) {
            Integer type = Optional.ofNullable(msg)
                    .map(Message::getMsgType)
                    .orElse(0);
            if (type == 0) {
                return null;
            } else {
                List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values())
                        .filter((item) -> item.getType() == type)
                        .distinct()
                        .collect(Collectors.toList());
                if (objectEnums.size() > 0) {
                    return objectEnums.get(0);
                }
                return null;
            }
        }
    	// setter、getter。。。。
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    到此Netty整个配置已经完成,但如果要跟随springboot一起启动,仍需要做一些配置。

    1. netty启动类配置
    @Component
    public class NettyServerBoot {
        private static final Logger log = LoggerFactory.getLogger(NettyServerBoot.class);
        @Resource
        NioEventLoopGroup boosGroup;
        @Resource
        NioEventLoopGroup workerGroup;
        final ServerBootstrap serverBootstrap;
        final NettyProperties nettyProperties;
    
        public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) {
            this.serverBootstrap = serverBootstrap;
            this.nettyProperties = nettyProperties;
        }
    
    
        /**
         * 启动netty
         *
         * @throws InterruptedException
         */
        @PostConstruct
        public void start() throws InterruptedException {
            // 绑定端口启动
            serverBootstrap.bind(nettyProperties.getPort()).sync();
            // 备用端口
            serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
            log.info("启动Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve());
        }
    
        /**
         * 关闭netty
         */
        @PreDestroy
        public void close() {
            log.info("关闭Netty");
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    增加NettyServerBoot配置后,启动application时,netty服务端会跟随一起启动。
    在这里插入图片描述
    同时,在springboot关闭前,会先销毁netty服务。
    在这里插入图片描述

    完整源码

    https://github.com/BerBai/JavaExample/tree/master/netty

  • 相关阅读:
    携职教育:一个暑假的时间,就能拿下这本证书啦
    jquery列表顺序倒转排序效果
    【数据挖掘】分类与回归预测
    计算机毕业设计springboot+vue基本微信小程序的旅游社系统
    190-Vue中环境变量的配置
    一起用Go做一个小游戏(上)
    【AI大模型-什么是大模型】
    GFS分布式文件系统
    [附源码]计算机毕业设计springboot企业人事管理系统
    day46
  • 原文地址:https://blog.csdn.net/Ber_Bai/article/details/127984963