<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>4.1.77.Finalversion>
dependency>
package com.message.after;
import com.message.websocket.WebSocketServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* @author kuaiting
*/
@Component
public class AfterExecuteMethods implements CommandLineRunner {
/**
* 项目启动之后立即执行的方法,可以做些初始化项目的操作以及需要启动项目立即执行的任务
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
/**
* 启动WebSocketServer 服务使用netty实现
*/
new WebSocketServer().start();
}
}
package com.message.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author kuaiting
* WebSocket
*/
public class WebSocketServer {
public void start() {
// 一个主线程组
NioEventLoopGroup mainGroup = new NioEventLoopGroup();
//一个工作线程组
NioEventLoopGroup subGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(mainGroup, subGroup)
//设置队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
.channel(NioServerSocketChannel.class)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//添加自定义初始化处理器
.childHandler(new WsServerInitialzer());
ChannelFuture channelFuture = serverBootstrap.bind(8082).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭主线程组
mainGroup.shutdownGracefully();
//关闭工作线程组
subGroup.shutdownGracefully();
}
}
}
package com.message.websocket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @author kuaiting
*/
public class WsServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//websocket基于http协议,所以需要http编解码器
pipeline.addLast(new HttpServerCodec());
//添加对于读写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//对httpMessage进行聚合
pipeline.addLast(new HttpObjectAggregator(1024*64));
// ================= 上述是用于支持http协议的 ==============
//websocket 服务器处理的协议,用于给指定的客户端进行连接访问的路由地址
//比如处理一些握手动作(ping,pong)
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//自定义handler
pipeline.addLast(new ChatHandler());
}
}
package com.message.websocket;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
/**
* @author kuaiting
*/
@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("已创建WebSocket链接:{}", ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String text = msg.text();
sendAllMessages(ctx,text);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("断开链接的ID", ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().closeFuture();
}
//给每个人发送消息,除发消息人外
private void sendAllMessages(ChannelHandlerContext ctx, String msg) {
for (Channel channel : channels) {
if (!channel.id().asLongText().equals(ctx.channel().id().asLongText())) {
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg)));
}
}
}
//给每个人发送消息,除发消息人外
private void sendMessages(String msg) {
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(msg)));
}
}
}