注意:本篇博客风格(不多比比就是撸代码!!!)
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>5.0.0.Alpha2version>
dependency>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author Andon
* 2022/7/22
*
* Netty服务端
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NettyServer implements CommandLineRunner {
private Channel channel;
// boss事件轮询线程组,处理连接事件
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
// worker事件轮询线程组,用于数据处理
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final NettyServerInitializer nettyServerInitializer;
@Value("${netty.port}")
private Integer port;
/**
* 开启Netty服务
*/
@Override
public void run(String... args) {
try {
// 启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置参数,组配置
serverBootstrap.group(bossGroup, workerGroup)
// 指定channel
.channel(NioServerSocketChannel.class)
// 初始化服务端可连接队列
.option(ChannelOption.SO_BACKLOG, 1024)
// 允许重复使用本地地址和端口,连接关闭后,可以立即重用端口
.option(ChannelOption.SO_REUSEADDR, true)
// 设置TCP长连接,TCP会主动探测空闲连接的有效性
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 禁用Nagle算法,小数据时可以即时传输
.childOption(ChannelOption.TCP_NODELAY, true)
// 发送缓冲区大小
.childOption(ChannelOption.SO_SNDBUF, 256 * 1024)
// 接收缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, 256 * 1024)
// Netty服务端channel初始化
.childHandler(nettyServerInitializer);
// 绑定端口,开始接收进来的连接
ChannelFuture future = serverBootstrap.bind(port).sync();
if (future.isSuccess()) {
log.info("Netty服务端启动!! 端口:[{}]", port);
}
channel = future.channel();
} catch (Exception e) {
log.error("Netty服务端启动异常!! error:{}", e.getMessage());
}
}
@PreDestroy
private void destroy() {
if (channel != null) {
channel.close();
}
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
log.warn("Netty服务关闭!!");
}
}
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* @author Andon
* 2022/7/22
*
* Netty服务端初始化配置
*/
@Component
@RequiredArgsConstructor
public class NettyServerInitializer extends ChannelInitializer<Channel> {
private final NettyServerHandler nettyServerHandler;
/**
* 初始化channel
*/
@Override
protected void initChannel(Channel channel) {
channel.pipeline()
// 分隔符解码器,处理半包
// maxFrameLength 表示一行最大的长度
// Delimiters.lineDelimiter(),以/n,/r/n作为分隔符
.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast("nettyServerHandler", nettyServerHandler);
}
}
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Andon
* 2022/7/22
*
* Netty服务端处理器
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 管理一个全局map,保存连接进服务端的通道数量
public static final Map<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* 当客户端主动连接服务端,通道活跃后触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inetSocketAddress.getAddress().getHostAddress();
int clientPort = inetSocketAddress.getPort();
// 获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
// 如果map中不包含此连接,就保存连接
if (CHANNEL_MAP.containsKey(channelId)) {
log.info("客户端【{}】是连接状态,连接通道数量:{}", channelId, CHANNEL_MAP.size());
} else {
// 保存连接
CHANNEL_MAP.put(channelId, ctx);
log.info("客户端【{}】连接Netty服务端!![clientIp:{} clientPort:{}]", channelId, clientIp, clientPort);
log.info("连接通道数量:{}", CHANNEL_MAP.size());
}
}
/**
* 当客户端主动断开连接,通道不活跃触发
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inetSocketAddress.getAddress().getHostAddress();
int clientPort = inetSocketAddress.getPort();
// 获取终止连接的客户端ID
ChannelId channelId = ctx.channel().id();
// 包含此客户端才去删除
if (CHANNEL_MAP.containsKey(channelId)) {
// 删除连接
CHANNEL_MAP.remove(channelId);
log.warn("客户端【{}】断开Netty连接!![clientIp:{} clientPort:{}]", channelId, clientIp, clientPort);
log.info("连接通道数量:{}", CHANNEL_MAP.size());
}
}
/**
* 通道有消息触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("接收到客户端【{}】的消息:{}", ctx.channel().id(), msg.toString());
StringBuilder sb;
Map<String, Object> result;
try {
// 报文解析处理
sb = new StringBuilder();
result = JSONObject.parseObject(msg.toString());
sb.append(result).append("解析成功!!").append("\n");
// 响应客户端
this.channelWrite(ctx.channel().id(), sb);
} catch (Exception e) {
ctx.writeAndFlush("-1\n");
log.error("报文解析失败:{}", e.getMessage());
}
}
public void channelWrite(ChannelId channelId, Object msg) {
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
if (ctx == null) {
log.info("通道【{}】不存在!!", channelId);
return;
}
// 将返回客户端的信息写入ctx
ctx.write(msg);
// 刷新缓存区
ctx.flush();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.warn("Client: 【{}】 READER_IDLE 读超时", socketString);
ctx.channel().close();
} else if (event.state() == IdleState.WRITER_IDLE) {
log.warn("Client: 【{}】 WRITER_IDLE 写超时", socketString);
ctx.channel().close();
} else if (event.state() == IdleState.ALL_IDLE) {
log.warn("Client: 【{}】 ALL_IDLE 读/写超时", socketString);
ctx.channel().close();
}
}
}
/**
* 当连接发生异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 当出现异常就关闭连接
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;
/**
* @author Andon
* 2022/7/22
*
* Netty客户端
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NettyClient implements CommandLineRunner {
private Channel channel;
private final EventLoopGroup workGroup = new NioEventLoopGroup();
private final NettyClientInitializer nettyClientInitializer;
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private Integer port;
public void sendMsg(String msg) {
boolean active = channel.isActive();
if (active) {
channel.writeAndFlush(msg);
} else {
log.warn("channel active:{}", false);
}
}
@Override
public void run(String... args) {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
// 设置TCP长连接,TCP会主动探测空闲连接的有效性
.option(ChannelOption.SO_KEEPALIVE, true)
// 禁用Nagle算法,小数据时可以即时传输
.option(ChannelOption.TCP_NODELAY, true)
// 发送缓冲区大小
.option(ChannelOption.SO_SNDBUF, 256 * 1024)
// 接收缓冲区大小
.option(ChannelOption.SO_RCVBUF, 256 * 1024)
// Netty客户端channel初始化
.handler(nettyClientInitializer);
// 连接服务器ip、端口
ChannelFuture future = bootstrap.connect(host, port);
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) futureListener -> {
if (futureListener.isSuccess()) {
log.info("连接Netty服务端成功!!");
} else {
log.warn("连接Netty服务端失败,准备30s后进行断线重连!!");
futureListener.channel().eventLoop().schedule((Runnable) this::run, 30, TimeUnit.SECONDS);
}
});
channel = future.channel();
} catch (Exception e) {
log.error("连接Netty服务端异常!! error:{}", e.getMessage());
}
}
@PreDestroy
private void destroy() {
if (channel != null) {
channel.close();
}
workGroup.shutdownGracefully();
log.warn("Netty连接关闭!!");
}
}
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author Andon
* 2022/7/22
*
* Netty客户端通道初始化
*/
@Component
@RequiredArgsConstructor
public class NettyClientInitializer extends ChannelInitializer<Channel> {
private final NettyClientHandler nettyClientHandler;
/**
* 初始化channel
*/
@Override
protected void initChannel(Channel channel) {
channel.pipeline()
.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(nettyClientHandler);
}
}
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author Andon
* 2022/7/22
*
* Netty客户端处理器
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Lazy
@Resource
private NettyClient nettyClient;
/**
* 建立连接时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("建立Netty连接!!");
ctx.fireChannelActive();
}
/**
* 关闭连接时
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.warn("Netty连接关闭!!");
reconnect(ctx);
}
/**
* 心跳处理,每5秒发送一次心跳请求
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("已经5秒没有发送消息给服务端!!");
// 向服务端发送心跳包
String heartbeat = "{\"msg\":\"client heartbeat\"}\n";
// 发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(heartbeat);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
/**
* 收到服务端消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("收到服务端消息:{}", msg);
}
/**
* 当连接发生异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 当出现异常就关闭连接
ctx.close();
}
private void reconnect(ChannelHandlerContext ctx) {
log.info("准备30s后断线重连!!");
ctx.channel().eventLoop().schedule(() -> nettyClient.run(), 30, TimeUnit.SECONDS);
}
}
import com.andon.nettyclient.socket.NettyClient;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Andon
* 2022/7/25
*/
@Component
@RequiredArgsConstructor
public class TaskScheduled implements CommandLineRunner {
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
private final NettyClient nettyClient;
/**
* 模拟业务处理
*/
@Override
public void run(String... args) throws Exception {
// 如果任务里面执行的时间大于 period 的时间,下一次的任务会推迟执行。
// 本次任务执行完后下次的任务还需要延迟period时间后再执行
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println("====定时任务开始====");
// 发送json字符串
String msg = "{\"key\":\"hello\",\"value\":\"world\",\"date\":\"" + new Date().toString() + "\"}\n";
nettyClient.sendMsg(msg);
}, 2, 10, TimeUnit.SECONDS);
}
}


GitHub: link. 欢迎star