• Springboot中使用netty 实现 WebSocket 服务


    依赖

    
        <dependency>
            <groupId>io.nettygroupId>
            <artifactId>netty-allartifactId>
            <version>4.1.77.Finalversion>
        dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    创建启动类
    package com.message.after;
    
    import com.message.websocket.WebSocketServer;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    /**
     * @author kuaiting
     */
    @Component
    public class AfterExecuteMethods implements CommandLineRunner {
    	/**
    	 * 项目启动之后立即执行的方法,可以做些初始化项目的操作以及需要启动项目立即执行的任务
    	 * @param args
    	 * @throws Exception
    	 */
    	@Override
    	public void run(String... args) throws Exception {
    		/**
    		 * 启动WebSocketServer 服务使用netty实现
    		 */
    		new WebSocketServer().start();
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    创建WebSocket 服务
    package com.message.websocket;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * @author kuaiting
     * WebSocket
     */
    
    public class WebSocketServer {
    
    
    	public void start() {
    		// 一个主线程组
    		NioEventLoopGroup mainGroup = new NioEventLoopGroup();
    		//一个工作线程组
    		NioEventLoopGroup subGroup = new NioEventLoopGroup();
    		try {
    			ServerBootstrap serverBootstrap = new ServerBootstrap();
    			serverBootstrap.group(mainGroup, subGroup)
    					//设置队列大小
    					.option(ChannelOption.SO_BACKLOG, 1024)
    					.channel(NioServerSocketChannel.class)
    					// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
    					.childOption(ChannelOption.SO_KEEPALIVE, true)
    					//添加自定义初始化处理器
    					.childHandler(new WsServerInitialzer());
    			ChannelFuture channelFuture = serverBootstrap.bind(8082).sync();
    			channelFuture.channel().closeFuture().sync();
    
    		}catch (Exception e){
    			e.printStackTrace();
    		}finally {
    			//关闭主线程组
    			mainGroup.shutdownGracefully();
    			//关闭工作线程组
    			subGroup.shutdownGracefully();
    		}
    
    
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    WsServerInitialzer 初始化
    package com.message.websocket;
    
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * @author kuaiting
     */
    public class WsServerInitialzer extends ChannelInitializer<SocketChannel> {
    	@Override
    	protected void initChannel(SocketChannel ch) throws Exception {
    
    		ChannelPipeline pipeline = ch.pipeline();
    		//websocket基于http协议,所以需要http编解码器
    		pipeline.addLast(new HttpServerCodec());
    		//添加对于读写大数据流的支持
    		pipeline.addLast(new ChunkedWriteHandler());
    		//对httpMessage进行聚合
    		pipeline.addLast(new HttpObjectAggregator(1024*64));
    
    		// ================= 上述是用于支持http协议的 ==============
    
    		//websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
    		//比如处理一些握手动作(ping,pong)
    		pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    		//自定义handler
    		pipeline.addLast(new ChatHandler());
    
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    创建信息ChatHandler 处理类
    package com.message.websocket;
    
    import com.alibaba.fastjson.JSON;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.util.concurrent.GlobalEventExecutor;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * @author kuaiting
     */
    @Slf4j
    public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    	private static  ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		log.info("已创建WebSocket链接:{}", ctx.channel().remoteAddress());
    	}
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    		String text = msg.text();
    		sendAllMessages(ctx,text);
    	}
    
    	@Override
    	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    		channels.add(ctx.channel());
    	}
    
    	@Override
    	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    		log.info("断开链接的ID", ctx.channel().id().asLongText());
    
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		ctx.channel().closeFuture();
    	}
    
    
    	//给每个人发送消息,除发消息人外
    	private void sendAllMessages(ChannelHandlerContext ctx, String msg) {
    		for (Channel channel : channels) {
    			if (!channel.id().asLongText().equals(ctx.channel().id().asLongText())) {
    				channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg)));
    			}
    		}
    	}
    
    	//给每个人发送消息,除发消息人外
    	private void sendMessages(String msg) {
    		for (Channel channel : channels) {
    			channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg)));
    		}
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
  • 相关阅读:
    【记录一个问题】hexo+butterfly+自定义CSS格式浏览器无法识别报错为(text/html)
    浅谈游戏音效测试点
    <el-input-number>显示两位数字;如果是一位数字的话前面补0
    大数据Hadoop之——部署hadoop+hive+Mysql环境(Linux)
    vscode忽略某些文件
    企业网络安全:威胁情报解决方案
    Figma语言设置教程:简易切换至中文,提高操作便捷性!
    【Django学习笔记 - 13】:关联查询(日期查询、一对一查询、一对多查询、多对多查询)
    qlistwidget不显示内容
    FastestDet---ncnn及多线程部署
  • 原文地址:https://blog.csdn.net/qq_45834006/article/details/127715919