本次采用jdk1.8、springboot2.7.5、Netty4.1.84.Final版本
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
dependency>
主要包括以下几步:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author lics
* @version 1.0.0
* @date 2022/10/31 15:14
*/
public class MyNettyServer {
public void startNettyServer(int... ports) {
/*
* 配置服务端的NIO线程组
* NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
* bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
* bossGroup接收到连接后就会把连接信息注册到workerGroup
* workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 是一个启动NIO服务的辅助启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
// ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
// 初始化通道,主要用于网络I/O事件,记录日志,编码、解码消息
serverBootstrap = serverBootstrap.childHandler(new MyNettyChannelInitializer());
// 绑定端口,同步等待成功
ChannelFuture[] channelFutures;
ChannelFuture futureTcp;
if (ports.length > 0) {
channelFutures = new ChannelFuture[ports.length];
int i = 0;
for (Integer port : ports) {
// 绑定端口,同步等待成功 绑定的服务器
futureTcp = serverBootstrap.bind(port).sync();
channelFutures[i++] = futureTcp;
futureTcp.addListener(future -> {
if (future.isSuccess()) {
System.out.println("netty server 启动成功!" + port);
} else {
System.out.println("netty server 启动失败!" + port);
}
});
}
for (ChannelFuture future : channelFutures) {
// 等待服务器监听端口关闭
future.channel().closeFuture().sync();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
继承ChannelInitializer
类,重写initChannel()
方法
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 通道初始化
*
* @author lics
* @version 1.0.0
* @date 2022/10/31 15:16
*/
public class MyNettyChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) {
/**
* 处理TCP请求
*/
// ChannelOutboundHandler,依照逆序执行
channel.pipeline().addLast("encoder", new StringEncoder());
// 属于ChannelInboundHandler,依照顺序执行
channel.pipeline().addLast("decoder", new StringDecoder());
// 自定义TCP请求的处理器
channel.pipeline().addLast(new TcpRequestHandler());
/**
* 下面代码是处理HTTP请求,测试时请把上面的TCP请求设置注释掉
*/
// HTTP协议解析,用于握手阶段;此处放开则会处理HTTP请求,TCP请求不会处理
channel.pipeline().addLast(new HttpServerCodec());
channel.pipeline().addLast(new HttpObjectAggregator(1024 * 1024 * 100));
// 自定义解析HTTP请求处理器
channel.pipeline().addLast(new HttpRequestHandler());
}
}
TCP请求,创建TcpRequestHandler类,继承ChannelInboundHandlerAdapter
,重写里面的方法,channelRead()
方法读取客户端传入的数据
import java.net.InetSocketAddress;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* I/O数据读写处理类 TCP连接
*
* @author lics
* @version 1.0.0
* @date 2022/10/31 15:17
*/
@Slf4j
public class TcpRequestHandler extends ChannelInboundHandlerAdapter {
/**
* 从客户端收到新的数据时,这个方法会在收到消息时被调用
*
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("channelRead: " + msg.toString());
//回应客户端
ctx.write("I got it");
}
/**
* 从客户端收到新的数据、读取完成时调用
*
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
System.out.println("channel Read Complete");
ctx.flush();
}
/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("exceptionCaught");
cause.printStackTrace();
ctx.close();//抛出异常,断开与客户端的连接
}
/**
* 客户端与服务端第一次建立连接时 执行
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.channel().read();
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
//此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
System.out.println("channelActive: " + clientIp + ctx.name());
}
/**
* 客户端与服务端 断连时 执行
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
System.out.println("channelInactive: " + clientIp);
}
/**
* 服务端当read超时, 会调用这个方法
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
ctx.close();//超时时断开连接
System.out.println("userEventTriggered: " + clientIp);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
System.out.println("channelUnregistered");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
System.out.println("channelWritabilityChanged");
}
}
HTTP请求,创建HttpRequestHandler类,继承SimpleChannelInboundHandler
,重写channelRead0()
方法
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
/**
* HTTP连接请求处理
*
* @author lics
* @version 1.0.0
* @date 2022/11/2 11:03
*/
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
String uri = fullHttpRequest.uri();
System.out.println(uri);
switch (fullHttpRequest.method().name()) {
case "GET":
processGetRequest(fullHttpRequest);
break;
case "POST":
if (fullHttpRequest.headers().get("Content-Type").contains("x-www-form-urlencoded")) {
processPostFormRequest(fullHttpRequest);
} else if (fullHttpRequest.headers().get("Content-Type").contains("application/json")) {
processPostJsonRequest(fullHttpRequest);
}
break;
default:
}
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
buf.writeCharSequence("sucess", StandardCharsets.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
response.headers().set("Content-Type","application/json;charset=UTF-8");
response.headers().set("Content-Length",response.content().readableBytes());
channelHandlerContext.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* 处理get请求
* @param request
*/
private void processGetRequest(FullHttpRequest request){
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
Map<String, List<String>> params = decoder.parameters();
params.forEach((key, value) -> System.out.println(key + " ==> "+ value));
}
/**
* 处理post form请求
* @param request
*/
private void processPostFormRequest(FullHttpRequest request){
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request);
List<InterfaceHttpData> httpDataList = decoder.getBodyHttpDatas();
httpDataList.forEach(item -> {
if (item.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute){
Attribute attribute = (Attribute) item;
try {
System.out.println(attribute.getName() + " ==> " + attribute.getValue());
}catch (Exception e){
e.printStackTrace();
}
}
});
}
/**
* 处理post json请求
* @param request
*/
private void processPostJsonRequest(FullHttpRequest request){
ByteBuf content = request.content();
byte[] bytes = new byte[content.readableBytes()];
content.readBytes(bytes);
JSONObject jsonObject = JSONObject.parseObject(new String(bytes));
jsonObject.getInnerMap().forEach((key,value) -> System.out.println(key + " ==> " + value));
}
}
import com.primeton.netty.server.MyNettyServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 启动类
*
* @author lics
* @version 1.0.0
* @date 2022/10/31 15:17
*/
@EnableAsync
@SpringBootApplication
public class NettyApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(NettyApplication.class);
app.run(args);
}
@Async
@Override
public void run(String... args) {
/*
* 使用异步注解方式启动netty服务端服务
*/
new MyNettyServer().startNettyServer(5678,8888);
}
}
启动成功后,可以看到控制台会打印两行输出
使用客户端连接工具进行连接测试
发送 ‘Hello world’ 数据,控制台打印并发送 ‘I got it’ 到客户端
TCP/UDP连接工具下载
链接:https://pan.baidu.com/s/1YMg602U9xPgGVsFZx5zVaw
提取码:144t
这里进行HTTP连接测试时,要把MyNettyChannelInitializer
类中有关TCP连接的代码注掉,否则会连接失败
到此,spring boot整合Netty完成,后面将根据这个demo去跟踪学习Netty的源码,喜欢可以关注一下,大家一起学习,共同进步。