• SpringBoot 整和 Netty 并监听多端口


    SpringBoot 整和 Netty 并监听多端口

    Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

    1.依赖

    <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.28.Final</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.PortDefinition

    读取yml 配置中的多端口配置

    //netty:
    //  port: {8300: A, 8500: B}
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "netty")
    public class PortDefinition {
    
        Map<Integer, String> port;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.GatewayType

    多端口判断使用的常量类

    public class GatewayType {
    
        //个人设备
        public final static String GERNESHEBEI_SHOUHUA="A";
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4.NettyServer

    实现Netty服务端

    @Slf4j
    @RefreshScope
    @Component
    public class NettyServer {
    
        @Autowired
        private PortDefinition portDefinition;
    
        public void start() throws InterruptedException {
            /**
             * 创建两个线程组 bossGroup 和 workerGroup
             * bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
             *  两个都是无线循环
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //创建服务器端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                //设置两个线程组
                bootstrap.group(bossGroup, workerGroup)
                        //使用NioServerSocketChannel 作为服务器的通道实现
                        .channel(NioServerSocketChannel.class)
                        //设置线程队列得到连接个数
                        .option(ChannelOption.SO_BACKLOG, 128)
                        //设置保持活动连接状态
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        //可以给 bossGroup 加个日志处理器
                        .handler(new LoggingHandler(LogLevel.INFO))
    					//监听多个端口
                        .childHandler(new SocketChannelInitHandler(portDefinition.getPort()))
                ;
    
    
    //            监听多个端口
                Map<Integer, String> ports = portDefinition.getPort();
                log.info("netty服务器在{}端口启动监听", JSONObject.toJSONString(ports));
    
    
                for (Map.Entry<Integer, String> p : ports.entrySet()) {
                    final int port = p.getKey();
                    // 绑定端口
                    ChannelFuture cf = bootstrap.bind(new InetSocketAddress(port)).sync();
    
                    if (cf.isSuccess()) {
                        log.info("netty 启动成功,端口:{}", port);
                    } else {
                        log.info("netty 启动失败,端口:{}", port);
                    }
                    //对关闭通道进行监听
                    cf.channel().closeFuture().sync();
                }
    
            } finally {
                //发送异常关闭
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    5.SocketChannelInitHandler

    根据多端口去判断去执行那个业务的 Handler 方法

    @Slf4j
    public class SocketChannelInitHandler extends ChannelInitializer<SocketChannel> {
    
    
        /**
         * 用来存储每个连接上来的设备
         */
        public static final Map<ChannelId, ChannelPipeline> CHANNEL_MAP = new ConcurrentHashMap<>();
    
        /**
         * 端口信息,用来区分这个端口属于哪种类型的连接 如:8300 属于 A
         */
        Map<Integer, String> ports;
    
        public SocketChannelInitHandler(Map<Integer, String> ports) {
            this.ports = ports;
        }
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            //每次连接上来 对通道进行保存
            CHANNEL_MAP.put(socketChannel.id(), socketChannel.pipeline());
            ChannelPipeline pipeline = socketChannel.pipeline();
            int port = socketChannel.localAddress().getPort();
            String type = ports.get(port);
    
            pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
            pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
    
            log.info("【initChannel】端口号: "+port+"  类型: "+type);
    
            //不同类型连接,处理链中加入不同处理协议
            switch (type) {
                case GatewayType.GERNESHEBEI_SHOUHUA:
                    //手环
                    pipeline.addLast(new NettyServerHandler());
                    break;
                default:
                    log.error("当前网关类型并不存在于配置文件中,无法初始化通道");
                    break;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    6.NettyServerHandler

    业务员的Handler 方法

    @Slf4j
    public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
        //private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
    
        protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {
            log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj);
    
            String message = (String)obj;
    
    		//之后写自己的业务逻辑即可
            String b=message.replace("[", "")
                    .replace("]","");
            String[] split1 = b.split("\\*");
    
            String key = split1[1];
    
            SocketChannel socketChannel = (SocketChannel) context.channel();
    
    		//调用业务代码类并执行
            WristWatchSocket.runSocket(key,message,socketChannel,true);
    
            ReferenceCountUtil.release(obj);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    7.WristWatchSocket 业务代码调用,根据自己的业务需要

    @Slf4j
    //@Data
    public class WristWatchSocket{
    
    
        //业务代码
        public static void runSocket(String key, String message, SocketChannel socketChannel, Boolean isNotFirst){
            try {
    
                if(message==null||message.trim().equals("")){
                    return;
                }
                
                String restr="测试发送";
                
    
                if(!"".equals(restr)){
                    socketChannel.writeAndFlush(restr);
                }
    
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    8.配置文件配置多端口

    netty:
      port: {8300: A, 8500: B}
    
    • 1
    • 2

    9.配置Netty 启动

    在启动类中配置

    @EnableAsync
    @EnableSwagger2
    @EnableFeignClients
    @EnableTransactionManagement
    @Slf4j
    @MapperScan({"com.yuandian.platform.mapper"})
    @SpringBootApplication(exclude={DataSourceAutoConfiguration.class})
    public class YunYiplatformApplication {
    
        public static void main(String[] args) {
            ApplicationContext run = SpringApplication.run(YunYiplatformApplication.class, args);
            log.info("\n\n【【【【平台成功启动!】】】】\n");
            //netty 启动配置
            try {
                run.getBean(NettyServer.class).start();
            }catch (Exception e){
                e.printStackTrace();
            }
    
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    截图
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    vite项目启动use `--host` to expose
    Java最强大的技术之一:反射
    C语言 —— sizeof与strlen对比表 (持续更新中....)
    一文概览NLP句法分析:从理论到PyTorch实战解读
    【Java】之继承
    Linux使用wget下载文件时报错
    Vscode爆红Delete `␍`eslintprettier/prettier
    【总线】AXI第十课时:AXI协议的Ordering Model 使用ID tag
    5.1 内存CRC32完整性检测
    Java-类和对象
  • 原文地址:https://blog.csdn.net/xyjcfucdi128/article/details/133860301