• # SpringBoot 集成 Netty


    SpringBoot 集成 Netty

    背景描述

    • 如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

    Netty与SpringBoot整合关注点

    • NettySpringboot生命周期保持一致,同生共死
    • Netty能用上ioc中的Bean
    • Netty能读取到全局的配置

    Netty组件

    Bootstrap、ServerBootstrap

    • 帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序
    • Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
    • ServerBootstrap 往往是用于启动一个 Netty 服务端。

    Channel

    • ChannelNetty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 ChannelEventLoop
    • 其实就是我们平常网络编程中经常使用的socket套接字对象

    EventLoop、EventLoopGroup

    • EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发
    • 一个EventLoopGroup包含一个或者多个EventLoop
    • 一个EventLoop在它的生命周期内只和一个Thread绑定
    • 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理
    • 一个Channel在它的生命周期内只注册于一个EventLoop
    • 一个EventLoop可能会被分配给一个或者多个Channel

    ChannelHandler

    • ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。
    • ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
      • ChannelInboundHandler——处理入站数据
      • ChannelOutboundHandler——处理出站数据

    ChannelPipeline

    • ChannelPipelineChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline

    ByteBuf

    • ByteBuf就是字节缓冲区,用于高效处理输入输出。

    Pom依赖

    • 引入springboot starter web netty
    <!-- SpringBoot 初始化依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.3.5.RELEASE</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.85.Final</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Yml 配置

    # Springboot 端口
    server:
      port: 2345
    
    netty:
      websocket:
        # Websocket服务端口
        port: 1024
        # 绑定的网卡
        ip: 0.0.0.0
        # 消息帧最大体积
        max-frame-size: 10240
        # URI路径
        path: /channel
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    整合Netty步骤

    服务端

    • 使用 SpringBoot Runner 机制启动 Netty 服务。
    @Component
    @Order
    public class NettyStartListener implements ApplicationRunner {
    
        @Resource
        private SocketServer socketServer;
    
        @Override
        public void run(ApplicationArguments args) {
            this.socketServer.start();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • SocketServe.java
    @Component
    public class SocketServer {
    
        private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
    
        /**
         * 负责初始化 netty 服务器
         */
        private ServerBootstrap serverBootstrap;
    
        @Autowired
        private SocketInitializer socketInitializer;
    
        @Value("${netty.websocket.port}")
        private int port;
    
        /**
         * 启动 netty 服务器
         */
        public void start() {
            this.init();
            this.serverBootstrap.bind(this.port);
            logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);
        }
    
        /**
         * 初始化 netty 配置
         */
        private void init() {
            // 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
            // 实际工作的线程组
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            this.serverBootstrap = new ServerBootstrap();
            // 两个线程组加入进来 加入自己的初始化器
            this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 编写Netty服务端监听消息处理器
    public class SocketHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);
    
        public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        /**
         * 读取到客户端发来的消息
         *
         * @param ctx ChannelHandlerContext
         * @param msg msg
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组
            byte[] data = (byte[]) msg;
            log.info("收到消息: " + new String(data));
            // 给其他人转发消息
            for (Channel client : clients) {
                if (!client.equals(ctx.channel())) {
                    client.writeAndFlush(data);
                }
            }
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            log.info("新的客户端链接:" + ctx.channel().id().asShortText());
            clients.add(ctx.channel());
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) {
            clients.remove(ctx.channel());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.channel().close();
            clients.remove(ctx.channel());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 设置出站解码器和入站编码器
    @Component
    public class SocketInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
            // 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择
            pipeline.addLast(new ByteArrayDecoder());
            pipeline.addLast(new ByteArrayEncoder());
            // 添加上自己的处理器
            pipeline.addLast(new SocketHandler());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    客户端

    • 编写 socket连接
    public class ChatClient {
    
        public void start(String name) throws IOException {
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_READ);
            // 监听服务端发来得消息
            new Thread(new ClientThread(selector)).start();
            // 监听用户输入
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String message = scanner.next();
                if (StringUtils.hasText(message)) {
                    socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));
                }
            }
        }
    
        private class ClientThread implements Runnable {
    
            private final Logger logger = LoggerFactory.getLogger(ClientThread.class);
    
            private final Selector selector;
    
            public ClientThread(Selector selector) {
                this.selector = selector;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        int channels = selector.select();
                        if (channels == 0) {
                            continue;
                        }
                        Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = selectionKeySet.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey selectionKey = keyIterator.next();
                            // 移除集合当前得selectionKey,避免重复处理
                            keyIterator.remove();
                            if (selectionKey.isReadable()) {
                                handleRead(selector, selectionKey);
                            }
                        }
                    }
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    
        // 处理可读状态
        private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            StringBuilder message = new StringBuilder();
            if (channel.read(byteBuffer) > 0) {
                byteBuffer.flip();
                message.append(StandardCharsets.UTF_8.decode(byteBuffer));
            }
            // 再次注册到选择器上,继续监听可读状态
            channel.register(selector, SelectionKey.OP_READ);
            System.out.println(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • start()
    public static void main(String[] args) throws IOException {
        new ChatClient().start("张三");
    }
    
    • 1
    • 2
    • 3
  • 相关阅读:
    LeetCode220902_93、搜索二维矩阵 II
    B+tree - B+树深度解析+C语言实现+opencv绘图助解
    C语言指针操作(四)通过指针引用字符串
    什么是设计模式?总结
    【算法题】小红书2023秋招提前批算法真题解析
    【Mybatis】动态 SQL
    Unity接入SQLite (一):SQLite介绍
    三、RocketMQ的JAVAAPI的基础概念
    Google Guava Cache LoadingCache 基本使用
    input输入系统
  • 原文地址:https://blog.csdn.net/qq_37248504/article/details/127938165