Netty 高级应用
1. 编解码器
概念:在网络应用中,需要实现某种编解码器。将原始字节数据与自定义消息数据进行相互转换。网络中都是以字节码的形式传输的。
对Netty而言,编解码器由两部分组成:编码器、解码器
- 编码器:将消息对象转为字节或其他序列形式在网络上传输
- 解码器:负责将字节或其他序列形式转为指定的消息对象
Netty的编解码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖与ChannelPipeline,可以将多个编解码器链接在一起,以实现复杂的转换逻辑。
- 解码器
- ByteToMessageDecoder:用于将字节转为消息,需要检查缓冲区是否有足够的字节
- ReplayingDecoder:继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecode
- MessageToMessageDecoder:用于从一种消息解码为另一种消息(如POJO到POJO)
解码器示例:
public class DemoDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
String msg = byteBuf.toString(CharsetUtil.UTF_8);
list.add(msg);
}
}
通道里加入解码器:
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DemoDecoder());
socketChannel.pipeline().addLast(new DemoNettyServerHandle());
}
- 编码器
- MessageToByteEncoder:将消息转为字节
- MessageToMessageEncoder:用于从一种消息编码为另外一种消息(例如POJO到POJO)
编码器示例:
public class DemoEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
list.add(Unpooled.copiedBuffer(s,CharsetUtil.UTF_8));
}
}
- 编码解码器Codec
同时具备编码与解码功能
- ByteToMessageCodec
- MessageToMessageCodec
2. 基于Netty的HTTP服务器开发
效果如图:
代码如下:
public class NettyHttpServer {
private int port;
public NettyHttpServer(int port) {
this.port = port;
}
public static void main(String[] args) {
new NettyHttpServer(8090).run();
}
public void run(){
EventLoopGroup bossGroup=null;
EventLoopGroup workerGroup=null;
try{
bossGroup=new NioEventLoopGroup(1);
workerGroup=new NioEventLoopGroup();
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加编解码器
socketChannel.pipeline().addLast(new HttpServerCodec());
socketChannel.pipeline().addLast(new NettyHttpServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if(httpObject instanceof HttpRequest){
DefaultHttpRequest request=(DefaultHttpRequest)httpObject;
if(request.uri().equals("/favicon.ico")){
//图标不响应
return;
}
System.out.println("接收到请求:"+request.uri());
ByteBuf byteBuf = Unpooled.copiedBuffer("你好,我是服务端", CharsetUtil.UTF_8);
DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,byteBuf);
//设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(response);
}
}
}
3. 粘包和拆包
简介:粘包和拆包是TCP网络编程中不可避免的,无论客户端还是服务端,当我们读取或发送消息的时候都要考虑TCP底层的粘包/拆包机制。
粘包产生的原因:
- 应用程序写入的数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上
- 接收方不及时读取套接字缓冲区数据
- TCP默认使用Nagle算法,将小数据包合并
拆包产生的原因:
- 数据太大超过剩余缓冲区的大小
- 数据太大超过MSS最大报文长度
粘包和拆包的解决方案
- 消息长度固定,累计读取到定长的报文就认为是一个完整的信息
- 将换行符作为消息结束符
- 将特殊的分隔符作为消息的结束标志
- 通过在消息头中定义长度字段来标识消息总长度
Netty中粘包和拆包的解决方案
Netty提供了4种解码器来解决:
- 固定长度拆包器FixedLengthFrameDecoder
- 行拆包器LineBasedFrameDecoder,以换行符作为分隔符
- 分隔符拆包器DelimiterBasedFrameDecoder,通过自定义的分隔符进行拆分
- 基于数据包长度的拆包器LengthFieldBasedFrameDecoder,将应用层数据包的长度最为拆分一句。要求应用层协议中包含数据包的长度。
DelimiterBasedFrameDecoder示例:
ByteBuf byteBuf =
Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
LengthFieldBasedFrameDecoder构造器参数讲解:
public LengthFieldBasedFrameDecoder(
ByteOrder byteOrder,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip,
boolean failFast)
-
byteOrder是指明Length字段是大端序还是小端序,因为Netty要读取Length字段的值,所以大端小端要设置好,默认Netty是大端序ByteOrder.BIG_ENDIAN。
-
maxFrameLength是指最大包长度,如果Netty最终生成的数据包超过这个长度,Netty就会报错。
-
lengthFieldOffset是指明Length的偏移位
-
lengthFieldLength是Length字段长度
-
lengthAdjustment 这个参数很多时候设为负数,这是最让小伙伴们迷惑的。下面我用一整段话来解释这个参数
当Netty利用lengthFieldOffset(偏移位)和lengthFieldLength(Length字段长度)成功读出Length字段的值后,Netty认为这个值是指从Length字段之后,到包结束一共还有多少字节,如果这个值是13,那么Netty就会再等待13个Byte的数据到达后,拼接成一个完整的包。但是更多时候,Length字段的长度,是指整个包的长度,如果是这种情况,当Netty读出Length字段的时候,它已经读取了包的4个Byte的数据,所以,后续未到达的数据只有9个Byte,即13 - 4 = 9,这个时候,就要用lengthAdjustment来告诉Netty,后续的数据并没有13个Byte,要减掉4个Byte,所以lengthAdjustment要设为 -4!!!
-
initialBytesToStrip,跳过的个数。比如这里initialBytesToStrip设置为4,那么Netty就会跳过前4位解析后面的内容
-
failFast 参数一般设置为true,当这个参数为true时,netty一旦读到Length字段,并判断Length超过maxFrameLength,就立即抛出异常。
示例:
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
for (int i=0;i<100;i++){
byte[] bytes = "你好,我是客户端".getBytes(CharsetUtil.UTF_8);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
channelHandlerContext.writeAndFlush(byteBuf);
}
}
第2个参数和第三个参数表示:0-4个字节是内容长度字段,第五个参数的4代表跳过前4个字节。
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(60535,0,4,0,4));
最后输出的内容:
4. 基于Netty和WebSocket的聊天室案例
1. WebSocket简介
WebSocket是一种在单个TCP连接上进行全双工通信的协议。相比HTTP协议,WebSocket具备如下特点:
- 支持双向通信,实时性更强
- 更好的二进制支持
- 较少的开销:协议控制的数据包头部较小
应用场景:
- 社交订阅
- 协同编辑
- 股票基金报价
- 体育实况更新
- 多媒体聊天
- 在线教育
2. 服务端开发
- 引入依赖
基于SpringBoot环境
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--添加thymeleaf依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.72.Final</version>
</dependency>
- 核心后端代码
@Component
public class NettyWebSocketServer implements Runnable {
@Autowired
private NettyConfig nettyConfig;
@Autowired
private WebSocketChannelInit webSocketChannelInit;
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup wokerGroup = new NioEventLoopGroup();
@PreDestroy
public void close(){
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
@Override
public void run() {
try{
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,wokerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(webSocketChannelInit);
ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync();
System.out.println("Netty服务端启动成功");
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
wokerGroup.shutdownGracefully();
}
}
}
@Component
public class WebSocketChannelInit extends ChannelInitializer {
@Autowired
private NettyConfig nettyConfig;
@Autowired
private WebSocketHandler webSocketHandler;
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//对http协议的支持
pipeline.addLast(new HttpServerCodec());
//对大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//post请求分为3部分。request line、request header、body
//HttpObjectAggregator将多个信息转化为单一的request或者response对象
pipeline.addLast(new HttpObjectAggregator(8000));
//将http协议升级为ws协议,websocket的支持
pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
pipeline.addLast(webSocketHandler);
}
}
@Component
@ChannelHandler.Sharable //设置通道共享
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private List<Channel> channels=new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
System.out.println("有新的连接了...");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channels.remove(ctx.channel());
System.out.println("连接下线了");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
String text = textWebSocketFrame.text();
Channel currentChannel = channelHandlerContext.channel();
for (Channel channel:channels){
//自己不给自己发消息
if(!channel.equals(currentChannel)){
channel.writeAndFlush(new TextWebSocketFrame(text));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
Channel channel = ctx.channel();
channels.remove(channel);
}
}
3. 前端js代码
$(function () {
//这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
var username = "";
while (true) {
//弹出一个输入框,输入一段文字,可以提交
username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
if (username.trim() === "")//如果返回的有内容
{
alert("名称不能输入空")
} else {
$("#username").text(username);
break;
}
}
var ws = new WebSocket("ws://localhost:8081/chatService");
ws.onopen = function () {
console.log("连接成功.")
};
ws.onmessage = function (evt) {
showMessage(evt.data);
};
ws.onclose = function (){
console.log("连接关闭")
};
ws.onerror = function (){
console.log("连接异常")
};
function showMessage(message) {
// 张三:你好
var str = message.split(":");
$("#msg_list").append('<li class="active"}>\n' +
' <div class="main">\n' +
' <img class="avatar" width="30" height="30" src="/img/user.png">\n' +
' <div>\n' +
' <div class="user_name">'+str[0]+'</div>\n' +
' <div class="text">'+str[1]+'</div>\n' +
' </div> \n' +
' </div>\n' +
' </li>');
// 置底
setBottom();
}
$('#my_test').bind({
focus: function (event) {
event.stopPropagation();
$('#my_test').val('');
$('.arrow_box').hide()
},
keydown: function (event) {
event.stopPropagation();
if (event.keyCode === 13) {
if ($('#my_test').val().trim() === '') {
this.blur();
$('.arrow_box').show();
setTimeout(this.focus(),1000);
} else {
$('.arrow_box').hide();
//发送消息
sendMsg();
this.blur();
setTimeout(this.focus())
}
}
}
});
$('#send').on('click', function (event) {
event.stopPropagation();
if ($('#my_test').val().trim() === '') {
$('.arrow_box').show()
} else {
sendMsg();
}
});
function sendMsg() {
var message = $("#my_test").val();
$("#msg_list").append('<li class="active"}>\n' +
' <div class="main self">\n' +
' <div class="text">'+message+'</div>\n' +
' </div>\n' +
' </li>');
$("#my_test").val('');
//发送消息
message = username + ":" + message;
ws.send(message);
// 置底
setBottom();
}
// 置底
function setBottom() {
// 发送消息后滚动到底部
var container = $('.m-message');
var scroll = $('#msg_list');
container.animate({
scrollTop: scroll[0].scrollHeight - container[0].clientHeight + container.scrollTop() + 100
});
}
});