• SpringBoot 整合 Netty



    GitHub: link. 欢迎star

    注意:本篇博客风格(不多比比就是撸代码!!!)

    一、common工程

    maven依赖

            
            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>5.0.0.Alpha2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    二、netty-server工程

    1.Netty服务端(NettyServer.java)

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PreDestroy;
    
    /**
     * @author Andon
     * 2022/7/22
     * 

    * Netty服务端 */ @Slf4j @Component @RequiredArgsConstructor public class NettyServer implements CommandLineRunner { private Channel channel; // boss事件轮询线程组,处理连接事件 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); // worker事件轮询线程组,用于数据处理 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final NettyServerInitializer nettyServerInitializer; @Value("${netty.port}") private Integer port; /** * 开启Netty服务 */ @Override public void run(String... args) { try { // 启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 设置参数,组配置 serverBootstrap.group(bossGroup, workerGroup) // 指定channel .channel(NioServerSocketChannel.class) // 初始化服务端可连接队列 .option(ChannelOption.SO_BACKLOG, 1024) // 允许重复使用本地地址和端口,连接关闭后,可以立即重用端口 .option(ChannelOption.SO_REUSEADDR, true) // 设置TCP长连接,TCP会主动探测空闲连接的有效性 .childOption(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法,小数据时可以即时传输 .childOption(ChannelOption.TCP_NODELAY, true) // 发送缓冲区大小 .childOption(ChannelOption.SO_SNDBUF, 256 * 1024) // 接收缓冲区大小 .childOption(ChannelOption.SO_RCVBUF, 256 * 1024) // Netty服务端channel初始化 .childHandler(nettyServerInitializer); // 绑定端口,开始接收进来的连接 ChannelFuture future = serverBootstrap.bind(port).sync(); if (future.isSuccess()) { log.info("Netty服务端启动!! 端口:[{}]", port); } channel = future.channel(); } catch (Exception e) { log.error("Netty服务端启动异常!! error:{}", e.getMessage()); } } @PreDestroy private void destroy() { if (channel != null) { channel.close(); } workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.warn("Netty服务关闭!!"); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    2.Netty服务端初始化配置(NettyServerInitializer.java)

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.Delimiters;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    import lombok.RequiredArgsConstructor;
    import org.springframework.stereotype.Component;
    
    /**
     * @author Andon
     * 2022/7/22
     * 

    * Netty服务端初始化配置 */ @Component @RequiredArgsConstructor public class NettyServerInitializer extends ChannelInitializer<Channel> { private final NettyServerHandler nettyServerHandler; /** * 初始化channel */ @Override protected void initChannel(Channel channel) { channel.pipeline() // 分隔符解码器,处理半包 // maxFrameLength 表示一行最大的长度 // Delimiters.lineDelimiter(),以/n,/r/n作为分隔符 .addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast("nettyServerHandler", nettyServerHandler); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3.Netty服务端处理器(NettyServerHandler.java)

    import com.alibaba.fastjson.JSONObject;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelId;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @author Andon
     * 2022/7/22
     * 

    * Netty服务端处理器 */ @Slf4j @Component @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 管理一个全局map,保存连接进服务端的通道数量 public static final Map<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); /** * 当客户端主动连接服务端,通道活跃后触发 */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inetSocketAddress.getAddress().getHostAddress(); int clientPort = inetSocketAddress.getPort(); // 获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); // 如果map中不包含此连接,就保存连接 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【{}】是连接状态,连接通道数量:{}", channelId, CHANNEL_MAP.size()); } else { // 保存连接 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【{}】连接Netty服务端!![clientIp:{} clientPort:{}]", channelId, clientIp, clientPort); log.info("连接通道数量:{}", CHANNEL_MAP.size()); } } /** * 当客户端主动断开连接,通道不活跃触发 */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = inetSocketAddress.getAddress().getHostAddress(); int clientPort = inetSocketAddress.getPort(); // 获取终止连接的客户端ID ChannelId channelId = ctx.channel().id(); // 包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { // 删除连接 CHANNEL_MAP.remove(channelId); log.warn("客户端【{}】断开Netty连接!![clientIp:{} clientPort:{}]", channelId, clientIp, clientPort); log.info("连接通道数量:{}", CHANNEL_MAP.size()); } } /** * 通道有消息触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("接收到客户端【{}】的消息:{}", ctx.channel().id(), msg.toString()); StringBuilder sb; Map<String, Object> result; try { // 报文解析处理 sb = new StringBuilder(); result = JSONObject.parseObject(msg.toString()); sb.append(result).append("解析成功!!").append("\n"); // 响应客户端 this.channelWrite(ctx.channel().id(), sb); } catch (Exception e) { ctx.writeAndFlush("-1\n"); log.error("报文解析失败:{}", e.getMessage()); } } public void channelWrite(ChannelId channelId, Object msg) { ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); if (ctx == null) { log.info("通道【{}】不存在!!", channelId); return; } // 将返回客户端的信息写入ctx ctx.write(msg); // 刷新缓存区 ctx.flush(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.warn("Client: 【{}】 READER_IDLE 读超时", socketString); ctx.channel().close(); } else if (event.state() == IdleState.WRITER_IDLE) { log.warn("Client: 【{}】 WRITER_IDLE 写超时", socketString); ctx.channel().close(); } else if (event.state() == IdleState.ALL_IDLE) { log.warn("Client: 【{}】 ALL_IDLE 读/写超时", socketString); ctx.channel().close(); } } } /** * 当连接发生异常时触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当出现异常就关闭连接 ctx.close(); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128

    三、netty-client工程

    1.Netty客户端(NettyClient.java)

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PreDestroy;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Andon
     * 2022/7/22
     * 

    * Netty客户端 */ @Slf4j @Component @RequiredArgsConstructor public class NettyClient implements CommandLineRunner { private Channel channel; private final EventLoopGroup workGroup = new NioEventLoopGroup(); private final NettyClientInitializer nettyClientInitializer; @Value("${netty.host}") private String host; @Value("${netty.port}") private Integer port; public void sendMsg(String msg) { boolean active = channel.isActive(); if (active) { channel.writeAndFlush(msg); } else { log.warn("channel active:{}", false); } } @Override public void run(String... args) { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup) .channel(NioSocketChannel.class) // 设置TCP长连接,TCP会主动探测空闲连接的有效性 .option(ChannelOption.SO_KEEPALIVE, true) // 禁用Nagle算法,小数据时可以即时传输 .option(ChannelOption.TCP_NODELAY, true) // 发送缓冲区大小 .option(ChannelOption.SO_SNDBUF, 256 * 1024) // 接收缓冲区大小 .option(ChannelOption.SO_RCVBUF, 256 * 1024) // Netty客户端channel初始化 .handler(nettyClientInitializer); // 连接服务器ip、端口 ChannelFuture future = bootstrap.connect(host, port); //客户端断线重连逻辑 future.addListener((ChannelFutureListener) futureListener -> { if (futureListener.isSuccess()) { log.info("连接Netty服务端成功!!"); } else { log.warn("连接Netty服务端失败,准备30s后进行断线重连!!"); futureListener.channel().eventLoop().schedule((Runnable) this::run, 30, TimeUnit.SECONDS); } }); channel = future.channel(); } catch (Exception e) { log.error("连接Netty服务端异常!! error:{}", e.getMessage()); } } @PreDestroy private void destroy() { if (channel != null) { channel.close(); } workGroup.shutdownGracefully(); log.warn("Netty连接关闭!!"); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    2.Netty客户端通道初始化(NettyClientInitializer.java)

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.Delimiters;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.CharsetUtil;
    import lombok.RequiredArgsConstructor;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Andon
     * 2022/7/22
     * 

    * Netty客户端通道初始化 */ @Component @RequiredArgsConstructor public class NettyClientInitializer extends ChannelInitializer<Channel> { private final NettyClientHandler nettyClientHandler; /** * 初始化channel */ @Override protected void initChannel(Channel channel) { channel.pipeline() .addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())) .addLast(new StringDecoder(CharsetUtil.UTF_8)) .addLast(new StringEncoder(CharsetUtil.UTF_8)) .addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)) .addLast(nettyClientHandler); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.Netty客户端处理器(NettyClientHandler.java)

    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Lazy;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Andon
     * 2022/7/22
     * 

    * Netty客户端处理器 */ @Slf4j @Component @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Lazy @Resource private NettyClient nettyClient; /** * 建立连接时 */ @Override public void channelActive(ChannelHandlerContext ctx) { log.info("建立Netty连接!!"); ctx.fireChannelActive(); } /** * 关闭连接时 */ @Override public void channelInactive(ChannelHandlerContext ctx) { log.warn("Netty连接关闭!!"); reconnect(ctx); } /** * 心跳处理,每5秒发送一次心跳请求 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { log.info("已经5秒没有发送消息给服务端!!"); // 向服务端发送心跳包 String heartbeat = "{\"msg\":\"client heartbeat\"}\n"; // 发送心跳消息,并在发送失败时关闭该连接 ctx.writeAndFlush(heartbeat); } } else { super.userEventTriggered(ctx, evt); } } /** * 收到服务端消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("收到服务端消息:{}", msg); } /** * 当连接发生异常时触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当出现异常就关闭连接 ctx.close(); } private void reconnect(ChannelHandlerContext ctx) { log.info("准备30s后断线重连!!"); ctx.channel().eventLoop().schedule(() -> nettyClient.run(), 30, TimeUnit.SECONDS); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    4.定时任务模拟业务处理(TaskScheduled.java)

    import com.andon.nettyclient.socket.NettyClient;
    import lombok.RequiredArgsConstructor;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author Andon
     * 2022/7/25
     */
    @Component
    @RequiredArgsConstructor
    public class TaskScheduled implements CommandLineRunner {
    
        private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        private final NettyClient nettyClient;
    
        /**
         * 模拟业务处理
         */
        @Override
        public void run(String... args) throws Exception {
            // 如果任务里面执行的时间大于 period 的时间,下一次的任务会推迟执行。
            // 本次任务执行完后下次的任务还需要延迟period时间后再执行
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                System.out.println("====定时任务开始====");
                // 发送json字符串
                String msg = "{\"key\":\"hello\",\"value\":\"world\",\"date\":\"" + new Date().toString() + "\"}\n";
                nettyClient.sendMsg(msg);
            }, 2, 10, TimeUnit.SECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    四、Netty服务端启动

    在这里插入图片描述

    五、Netty客户端启动

    在这里插入图片描述

    GitHub: link. 欢迎star

  • 相关阅读:
    开发《俄罗斯方块》的意义
    为什么MySQL索引选择B+树而不使用B树?
    @vue/cli创建项目遇到ERROR Failed to get response from /vue-cli-version-marker 解决方法
    Java夏招必知必会八股文198题,看完offer拿到手软
    Python如何17行代码画一个爱心
    Node.js 模块化及npm概念介绍
    C语言基础知识
    【高级篇 / ZTNA】(7.0) ❀ 06. 域用户自动安装 FortiClient (下) ❀ FortiGate 防火墙
    百万级别或以上的数据如何删除
    kali下对Docker的详细安装
  • 原文地址:https://blog.csdn.net/weixin_39792935/article/details/125980895