package com.yjx23332.netty.test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldServer {
void start(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
/**
* 滑动缓冲区-接收大小设置,方便展示半包现象
* 不设置会自适应,TCP协议的连接双方会自动协调
* */
.option(ChannelOption.SO_RCVBUF,10)
.group(boss,worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error {}",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args){
HelloWorldServer helloWorldServer = new HelloWorldServer();
helloWorldServer.start();
}
}
package com.yjx23332.netty.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldClient {
public static void main(String[] args){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap boostrap = new Bootstrap();
boostrap
.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0;i < 10;i++) {
ByteBuf byteBuf = ctx.alloc().buffer(16);
byteBuf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
ctx.writeAndFlush(byteBuf);
}
}
});
}
});
ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
log.error("client error {}",e);
}finally {
worker.shutdownGracefully();
}
}
}
参考滑动窗口协议详解
黏包
半包
package com.yjx23332.netty.test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldServer {
void start(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
//.option(ChannelOption.SO_RCVBUF,10)
.group(boss,worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error {}",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args){
HelloWorldServer helloWorldServer = new HelloWorldServer();
helloWorldServer.start();
}
}
package com.yjx23332.netty.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldClient {
private static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap boostrap = new Bootstrap();
boostrap
.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer(16);
byteBuf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
ctx.writeAndFlush(byteBuf);
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
log.error("client error {}",e);
}finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args){
for(int i = 0;i <10;i++){
send();
}
log.debug("finish !");
}
}
通过发一次就断开,保证数据可以分开。
但无法解决半包问题,我们可以让滑动窗口的接收缓冲区变小,来测试。
调整服务端缓冲区
/**
* 滑动缓冲区-接收大小设置
* 调整serverBootstrap整个滑动窗口大小,全局设置
* */
//.option(ChannelOption.SO_RCVBUF,10)
/**
* 滑动缓冲区-接收大小设置
* 调整child连接的滑动窗口大小
* 最小值(16的整数倍),初始值,最大值
* */
.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
调整客户端
byteBuf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,17,18});
缺点:
客户端与服务器约定固定的消息长度,少于则等待,大于则截断。
package com.yjx23332.netty.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Random;
@Slf4j
public class HelloWorldClient {
private static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap boostrap = new Bootstrap();
boostrap
.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for(int i = 0;i < 10;i++){
byte[] msg = fill10Bytes(c,r.nextInt(10)+1);
c++;
buf.writeBytes(msg);
}
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
log.error("client error {}",e);
}finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args){
send();
}
/**
* 长度填充,总长度为10
* */
public static byte[] fill10Bytes(char c, int len) {
if(len > 10)
len = 10;
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) '_');
for (int i = 0; i < len; i++) {
bytes[i] = (byte) c;
}
log.info(new String(bytes, Charset.forName("utf-8")));
return bytes;
}
}
}
package com.yjx23332.netty.test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldServer {
void start(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
.group(boss,worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//设置定长解码器
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error {}",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args){
HelloWorldServer helloWorldServer = new HelloWorldServer();
helloWorldServer.start();
}
}
缺点:
通过分割符号来对消息进行分割。
行解码器就是以换行符号作为消息的分割。指定构造时,需要确定最大长度。当达到最大长度时,仍然不能找到分隔符号,(如果failFast为true)就会抛出异常。
void start(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
.group(boss,worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//设置定长解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error {}",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
private static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap boostrap = new Bootstrap();
boostrap
.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for(int i = 0;i < 10;i++){
StringBuilder stringBuilder = makeString(c,r.nextInt(256) + 1);
c++;
buf.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
}
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
log.error("client error {}",e);
}finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args){
send();
}
/**
* 长度填充+换行符
* */
public static StringBuilder makeString(char c,int len){
StringBuilder stringBuilder = new StringBuilder(len + 2);
for(int i = 0;i < len;i++){
stringBuilder.append(c);
}
stringBuilder.append("\n");
return stringBuilder;
}
发送消息时,会把有关消息内容长度的信息一起发送,接收消息时,就可以处理黏包半包问题。
包含如下参数。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//设置定长解码器
/**
* @param maxFrameLength,lengthFieldOffset,lengthFieldLength,lengthAdjustment,initialBytesToStrip
*/
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
private static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap boostrap = new Bootstrap();
boostrap
.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
sendMsg(buf,"hello,world");
sendMsg(buf,"hi!");
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
log.error("client error {}",e);
}finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args){
send();
}
/**
* 发出内容写入
* */
private static void sendMsg(ByteBuf byteBuf,String content){
byte[] bytes = content.getBytes();
int length = bytes.length;
byteBuf.writeInt(length);
byteBuf.writeBytes(bytes);
}
set key value
依据协议,需要替换为,单词之间换行
*3 $3 set $4 name $8 zhangsan
多少个词 第一个词的字符串 词 ...
package com.yjx23332.netty.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
public class helloRedis {
public static void main(String[] args){
final byte[] LINE = {13,10}; //回车换行
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap boostrap = new Bootstrap();
boostrap
.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
buf.writeBytes("zhangsan".getBytes(StandardCharsets.UTF_8));
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
}
});
}
});
ChannelFuture channelFuture = boostrap.connect("IP",端口).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
worker.shutdownGracefully();
}
}
}


Netty已经帮助我们封装好了,我们只需要关注业务即可。
HttpServerCodec:解码器,组合了HttpRequestDecoder与HttpResponseEncoder
包括了请求解码与回复编码
将请求拆分为了:请求行请求头+请求体

package com.yjx23332.netty.test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import static org.springframework.http.HttpHeaders.CONTENT_LENGTH;
@Slf4j
public class TestHttp {
public static void main(String[] args){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup workers = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
.group(boss,workers)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
// ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.debug("{}",msg.getClass());
// if(msg instanceof HttpRequest){//请求头
//
// }else if (msg instanceof HttpContent){//请求体
//
// }
// }
// });
//只关心某种类型的消息
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
//请求行
log.debug("请求行:{}",msg.uri());
//请求头
log.debug("请求头:{}",msg.headers());
//返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(),HttpResponseStatus.OK);
byte[] bytes = "hello,world!
".getBytes();
response.headers().setInt(CONTENT_LENGTH,bytes.length);
response.content().writeBytes(bytes);
ctx.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
boss.shutdownGracefully();
workers.shutdownGracefully();
}
}
}

创建大致如下结构

package com.yjx23332.netty.test.entity;
import com.yjx23332.netty.test.entity.vo.req.*;
import com.yjx23332.netty.test.entity.vo.resp.*;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType){ return messageClasses.get(messageType);}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
/**
* 指令类型
* */
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
static{
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
messageClasses.put(PingMessage,PingMessage.class);
}
}
以LoginRequestMessage为例,先创建类似如下格式文件。主要先把getMessageType实现了。
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
//将父类中的属性也算到tostring中
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage(){}
public LoginRequestMessage(String username,String password){
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends Message {
private boolean success;
private String message;
LoginResponseMessage(boolean success, String message){
this.success= success;
this.message = message;
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
package com.yjx23332.netty.test.protocol;
import com.yjx23332.netty.test.entity.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
/***
* 编码
*/
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
//1. 4 字节表示魔数
out.writeBytes(new byte[]{1,2,3,4});
//2. 1 字节表示版本
out.writeByte(1);
//3. 1 字节表示序列化方式,0 jdk,1 json
out.writeByte(0);
//4. 1 字节表示指令类型
out.writeByte(msg.getMessageType());
//5. 4 字节表示序列号
out.writeInt(msg.getSequenceId());
// 上述和为15,为了满足2^n倍,让内存对齐。填入一个无意义字节
out.writeByte(0xff);
//6. 获取内容的字节数组
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(msg);
byte[] bytes = byteArrayOutputStream.toByteArray();
//7. 4字节表示长度
out.writeInt(bytes.length);
//8. 写入内容
out.writeBytes(bytes);
if(objectOutputStream != null)
objectOutputStream.close();
if(byteArrayOutputStream != null)
byteArrayOutputStream.close();
}
/**
* 解码
* */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
/**
* 从当前读指针位置开始读
* */
in.readBytes(bytes,0,length);
if(serializerType == 0){
ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) objectInputStream.readObject();
log.debug("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
log.debug("{}",message);
out.add(message);
}
}
}
测试,放在同一个包下,方便调用内部方法
package com.yjx23332.netty.test.protocol;
import com.yjx23332.netty.test.entity.vo.req.LoginRequestMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LoggingHandler;
public class TestMessage {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
new MessageCodec()
);
//encode
LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zhangsan","123","张三");
channel.writeOutbound(loginRequestMessage);
//decode
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null,loginRequestMessage,byteBuf);
//入站
channel.writeInbound(byteBuf);
}
}
这里黏包不大会有问题,但是半包问题仍会发生。因此我们加入帧解码器
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
new LoggingHandler(),
new MessageCodec()
);
半包测试
我们把内容进行切片,来测试半包即可
ByteBuf s1 = byteBuf.slice(0,100);
s1.retain();
ByteBuf s2 = byteBuf.slice(100, byteBuf.readableBytes() - 100);
s2.retain();
//入站
channel.writeInbound(s1);//会自动调用 s1.release()
channel.writeInbound(s2);
一个Handler实例,让其它管道共用可以吗?比如如下写法
LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = new LengthFieldBasedFrameDecoder(1024,12,4,0,0);
LoggingHandler loggingHandler = new LoggingHandler();
EmbeddedChannel channel = new EmbeddedChannel(
lengthFieldBasedFrameDecoder,
loggingHandler,
new MessageCodec()
);
对于Netty,如果它的Handler类上加了 @Sharable 的话,就说明它考虑到了多线程,可以被共享。

那我们自己的编解码器可以用吗?
我们编解码器继承了该类,该类要求其子类不能使用@Sharable
它认为我们可能会需要处理中间状态,因此强制要求不能共享

可以看到有这一句
`protected ByteToMessageCodec(boolean preferDirect) {
ensureNotSharable();
outboundMsgMatcher = TypeParameterMatcher.find(this, ByteToMessageCodec.class, "I");
encoder = new Encoder(preferDirect);
}
通过下列语句,来判断是否有该注解,
protected void ensureNotSharable() {
if (this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}
public boolean isSharable() {
Class<?> clazz = this.getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(ChannelHandler.Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
所以我们换一个即可,换成一个认为已经接收完毕所有消息的去继承
package com.yjx23332.netty.test.protocol;
import com.yjx23332.netty.test.entity.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
* 必须确保收到的消息是完整的,才可以使用
* */
@ChannelHandler.Sharable
@Slf4j
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf,Message> {
/***
* 编码
*/
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
ByteBuf outTemp = ctx.alloc().buffer();
//1. 4 字节表示魔数
outTemp.writeBytes(new byte[]{1,2,3,4});
//2. 1 字节表示版本
outTemp.writeByte(1);
//3. 1 字节表示序列化方式,0 jdk,1 json
outTemp.writeByte(0);
//4. 1 字节表示指令类型
outTemp.writeByte(msg.getMessageType());
//5. 4 字节表示序列号
outTemp.writeInt(msg.getSequenceId());
// 上述和为15,为了满足2^n倍,让内存对齐。填入一个无意义字节
outTemp.writeByte(0xff);
//6. 获取内容的字节数组
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(msg);
byte[] bytes = byteArrayOutputStream.toByteArray();
//7. 4字节表示长度
outTemp.writeInt(bytes.length);
//8. 写入内容
outTemp.writeBytes(bytes);
if(objectOutputStream != null)
objectOutputStream.close();
if(byteArrayOutputStream != null)
byteArrayOutputStream.close();
out.add(outTemp);
}
/**
* 解码
* */
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
/**
* 从当前读指针位置开始读
* */
in.readBytes(bytes,0,length);
if(serializerType == 0){
ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) objectInputStream.readObject();
log.debug("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
log.debug("{}",message);
out.add(message);
}
}
}
就着前面的代码,完成一个简易聊天室

package com.yjx23332.netty.test.server.service;
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* */
boolean login(String username,String password);
}
此处不连接数据库,直接获取值
package com.yjx23332.netty.test.server.service.Impl;
import com.yjx23332.netty.test.server.service.UserService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class UserServiceMemoryImpl implements UserService {
private Map<String,String> allUserMap = new ConcurrentHashMap<>();
{
allUserMap.put("zhangsan","123");
allUserMap.put("lisi","123");
allUserMap.put("wangwu","123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if(pass == null){
return false;
}
return pass.equals(password);
}
}
package com.yjx23332.netty.test.entity.dto;
import lombok.Data;
import java.util.Collections;
import java.util.Set;
@Data
public class Group {
private String name;
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
}
package com.yjx23332.netty.test.server.session;
import io.netty.channel.Channel;
public interface Session {
/**
* 绑定会话
* @param channel 哪一个channel
* @param username 绑定用户
* */
void bind(Channel channel, String username);
/**
* 解绑会话
* @param channel 哪一个channel
* */
void unbind(Channel channel);
/**
* 获取属性
* @param channel 哪一个channel
* @param name 属性名
* @return 属性值
* */
Object getAttribute(Channel channel,String name);
/**
* 设置属性
* @param channel 哪一个channel
* @param name 属性名
* @param value 属性值
* */
void setAttribute(Channel channel, String name, Object value);
/**
* 获取管道
* @param username 哪一个用户
* */
Channel getChannel(String username);
}
可以发现,很类似WebSocket搭建聊天室的设计
package com.yjx23332.netty.test.server.session;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SessionMemoryImpl implements Session{
private final Map<String,Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel,String> channelUsernameMap = new ConcurrentHashMap<>();
private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username,channel);
channelUsernameMap.put(channel,username);
channelAttributesMap.put(channel,new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Object getAttribute(Channel channel, String name) {
return channelAttributesMap.get(channel).get(name);
}
@Override
public void setAttribute(Channel channel, String name, Object value) {
channelAttributesMap.get(channel).put(name,value);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
}
package com.yjx23332.netty.test.server.session;
import io.netty.channel.Channel;
import java.util.List;
import java.util.Set;
public interface GroupSession {
/**
* 创建一个聊天组,如果不存在才能创建成功,否则返回null
* @param name 组名
* @param members 成员
* @return 成功则返回组对象
* */
Group createGroup(String name, Set<String> members);
/**
* 加入一个聊天组,如果不存在才能创建成功,否则返回null
* @param name 组名
* @param member 成员
* @return 成功则返回组对象
* */
Group joinMember(String name ,String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员
* @Return 如果组不存在则返回null,否则返回组对象
* */
Group removeMember(String name,String member);
/**
* 移除聊天组
* @param name 组名
* @Return 如果组不存在则返回null,否则返回组对象
* */
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @Return 如果组不存在则返回null,否则返回组对象
*/
Set<String> getMembers(String name);
/**
* 获取组成员的channel几何,只有在线的channel再回返回
* @param name 组名
* */
List<Channel> getMembersChannel(String name);
}
package com.yjx23332.netty.test.server.session.Impl;
import com.yjx23332.netty.test.entity.dto.Group;
import com.yjx23332.netty.test.server.session.GroupSession;
import com.yjx23332.netty.test.server.factory.SessionFactory;
import io.netty.channel.Channel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name,group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
if(groupMap.get(name) == null){
return null;
}
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
替换掉原来默认的基于长度字段的帧解码器
package com.yjx23332.netty.test.protocol;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder(){
super(1024,12,4,0,0);
}
}
服务器运行类
package com.yjx23332.netty.test.server;
import com.yjx23332.netty.test.protocol.MessageCodec;
import com.yjx23332.netty.test.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ChatServer {
public static void main(String[] args){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler();
MessageCodec MESSAGE_CODEC = new MessageCodec();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
.group(boss,worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProcotolFrameDecoder());
socketChannel.pipeline().addLast(LOGGING_HANDLER);
socketChannel.pipeline().addLast(MESSAGE_CODEC);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("{}",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
package com.yjx23332.netty.test.server.factory;
import com.yjx23332.netty.test.server.session.Impl.SessionMemoryImpl;
import com.yjx23332.netty.test.server.session.Session;
public abstract class SessionFactory {
private static Session session = new SessionMemoryImpl();
public static Session getSession() {
return session;
}
}
package com.yjx23332.netty.test.server.factory;
import com.yjx23332.netty.test.server.service.Impl.UserServiceMemoryImpl;
import com.yjx23332.netty.test.server.service.UserService;
public abstract class UserServiceFacotory {
private static UserService userService = new UserServiceMemoryImpl();
public static UserService getUserService(){
return userService;
}
}
package com.yjx23332.netty.test.server.factory;
import com.yjx23332.netty.test.server.session.GroupSession;
import com.yjx23332.netty.test.server.session.Impl.GroupSessionMemoryImpl;
public abstract class GroupSessionFactory {
private static GroupSession groupSession = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return groupSession;
}
}
登录相关的消息在2.3就已贴出源码,此处就不再给出
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage(String from,String to,String content){
this.content = content;
this.from = from;
this.to = to;
}
@Override
public int getMessageType() {
return ChatRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class ChatResponseMessage extends Message {
private String from;
private String content;
private boolean success;
public ChatResponseMessage(String from, String content){
this.from = from;
this.success = true;
this.content = content;
}
public ChatResponseMessage(boolean success,String content){
this.success = success;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
}
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from,String groupName,String content){
this.from = from;
this.groupName = groupName;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupChatResponseMessage extends Message {
private String from;
private String content;
public GroupChatResponseMessage(String from,String content){
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
}
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
import java.util.Set;
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName,Set<String> members){
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupCreateResponseMessage extends Message {
String message;
boolean success;
public GroupCreateResponseMessage(boolean success, String message){
this.message = message;
this.success = success;
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupMembersRequestMessage extends Message {
private String groupName;
public GroupMembersRequestMessage(String groupName){
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupMembersRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupJoinRequestMessage extends Message {
private String groupName;
private String username;
public GroupJoinRequestMessage(String username,String groupName){
this.username = username;
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupJoinRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupJoinResponseMessage extends Message {
String message;
boolean success;
public GroupJoinResponseMessage(boolean success,String message){
this.message = message;
this.success = success;
}
@Override
public int getMessageType() {
return GroupJoinResponseMessage;
}
}
package com.yjx23332.netty.test.entity.vo.req;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupQuitRequestMessage extends Message {
private String groupName;
private String username;
public GroupQuitRequestMessage(String username,String groupName){
this.username = username;
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupQuitRequestMessage;
}
}
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupQuitResponseMessage extends Message {
boolean success;
String message;
public GroupQuitResponseMessage(boolean success , String message){
this.success = success;
this.message = message;
}
@Override
public int getMessageType() {
return GroupQuitResponseMessage;
}
}
图方便,这里就不检验各个输入的正确性
且用一个比较简陋的处理方式,接收与处理所有消息
package com.yjx23332.netty.test.client;
import ch.qos.logback.classic.pattern.SyslogStartConverter;
import com.yjx23332.netty.test.entity.vo.req.*;
import com.yjx23332.netty.test.entity.vo.resp.LoginResponseMessage;
import com.yjx23332.netty.test.protocol.MessageCodec;
import com.yjx23332.netty.test.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler();
MessageCodec MESSAGE_CODEC = new MessageCodec();
// 计数器,用于检测是否有消息
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 记录登录状态
AtomicBoolean LoginStatus = new AtomicBoolean(false);
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(group)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProcotolFrameDecoder());
socketChannel.pipeline().addLast(LOGGING_HANDLER);
socketChannel.pipeline().addLast(MESSAGE_CODEC);
socketChannel.pipeline().addLast("client handler",new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//建立连接后,创建一个线程去接收用户在控制的输入,负责向服务器发送各种消息
new Thread(()->{
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String userName = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
//此处忽略校验
ctx.writeAndFlush(new LoginRequestMessage(userName,password));
//阻塞,直到计数为0
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
log.error("client login error {}",e);
}
//登录失败则关闭连接
if(!LoginStatus.get()){
ctx.channel().close();
return;
}
while(true){
System.out.println("=============================");
System.out.println("send [userName] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmemebers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("=============================");
String command = scanner.nextLine();
String[] s = command.split(" ");
switch (s[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(userName,s[1],s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(userName,s[1],s[2]));
break;
case "gcreate":
Set<String> members = Arrays.stream(s[2].split(",")).collect(Collectors.toSet());
members.add(userName);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],members));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(userName,s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(userName,s[1]));
break;
case "quit":
ctx.channel().close();
return;
default:
System.out.println("请参照输入格式");
break;
}
}
},"system.in").start();
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg:" + msg);
if(msg instanceof LoginResponseMessage){
LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
if(responseMessage.isSuccess()){
LoginStatus.set(true);
}
//数字 -1,唤醒阻塞的线程
WAIT_FOR_LOGIN.countDown();
}
super.channelRead(ctx, msg);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost",8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
log.error("client error {}",e);
}
finally {
group.shutdownGracefully();
log.debug("连接正在关闭");
}
}
}
由于服务端使用的处理器比较多,匿名的话比较难阅读,我们再建一个handler类,用于流水线上的处理方法

package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.vo.req.LoginRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.LoginResponseMessage;
import com.yjx23332.netty.test.server.factory.SessionFactory;
import com.yjx23332.netty.test.server.factory.UserServiceFacotory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 该处理器只处理 LoginRequestMessage,因此用 SimpleChannelInboundHandler
* */
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestMessage loginRequestMessage) throws Exception {
String username = loginRequestMessage.getUsername();
String password = loginRequestMessage.getPassword();
boolean login = UserServiceFacotory.getUserService().login(username,password);
LoginResponseMessage loginResponseMessage = null;
if(login){
/**
* 将登录的管道以及对应的用户名放入session之中
* */
SessionFactory.getSession().bind(channelHandlerContext.channel(),username);
loginResponseMessage = new LoginResponseMessage(true,"登录成功");
}else{
loginResponseMessage = new LoginResponseMessage(false,"用户名或密码不正确");
}
channelHandlerContext.writeAndFlush(loginResponseMessage);
}
}
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.vo.req.ChatRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.ChatResponseMessage;
import com.yjx23332.netty.test.server.factory.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
String to = chatRequestMessage.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
//在线
if(channel != null){
channel.writeAndFlush(new ChatResponseMessage(chatRequestMessage.getFrom(),chatRequestMessage.getContent()));
}
//不在线
else{
channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"对方用户不存在或未在线"));
}
}
}
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.dto.Group;
import com.yjx23332.netty.test.entity.vo.req.GroupCreateRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.GroupCreateResponseMessage;
import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
import com.yjx23332.netty.test.server.session.GroupSession;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
import java.util.Set;
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
String groupName = groupCreateRequestMessage.getGroupName();
Set<String> members = groupCreateRequestMessage.getMembers();
//群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName,members);
if(group == null){
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true,groupName + ",创建成功"));
//发送拉群消息
List<Channel> channels = groupSession.getMembersChannel(groupName);
for(Channel channel : channels){
channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入:" + groupName));
}
}
else{
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false,groupName + ",已经存在"));
}
}
}
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.dto.Group;
import com.yjx23332.netty.test.entity.vo.req.GroupQuitRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.GroupQuitResponseMessage;
import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
import com.yjx23332.netty.test.server.factory.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupQuitRequestMessage groupQuitRequestMessage) throws Exception {
String groupName = groupQuitRequestMessage.getGroupName();
String userName = groupQuitRequestMessage.getUsername();
Group group = GroupSessionFactory.getGroupSession().removeMember(groupName,userName);
if(group.getMembers().contains(userName)){
channelHandlerContext.writeAndFlush(new GroupQuitResponseMessage(false,"删除失败"));
}
else{
channelHandlerContext.writeAndFlush(new GroupQuitResponseMessage(true,"删除成功,当前成员为:" + group.toString()));
SessionFactory.getSession().getChannel(userName).writeAndFlush(new GroupQuitResponseMessage(true,"您已被移除群聊:" + groupName));
}
}
}
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.dto.Group;
import com.yjx23332.netty.test.entity.vo.req.GroupJoinRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.GroupJoinResponseMessage;
import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
import com.yjx23332.netty.test.server.factory.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupJoinRequestMessage groupJoinRequestMessage) throws Exception {
String groupName = groupJoinRequestMessage.getGroupName();
String userName = groupJoinRequestMessage.getUsername();
Group group = GroupSessionFactory.getGroupSession().joinMember(groupName,userName);
if(group.getMembers().contains(userName)){
channelHandlerContext.writeAndFlush(new GroupJoinResponseMessage(true,"添加成员成功,当前成员:" + group.toString()));
SessionFactory.getSession().getChannel(userName).writeAndFlush(new GroupJoinResponseMessage(true,"您已加入:" + groupName));
}
else{
channelHandlerContext.writeAndFlush(new GroupJoinResponseMessage(false,"添加失败 "));
}
}
}
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.vo.req.GroupChatRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.GroupChatResponseMessage;
import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
String groupName = groupChatRequestMessage.getGroupName();
String userName = groupChatRequestMessage.getFrom();
List<Channel> channelList = GroupSessionFactory.getGroupSession().getMembersChannel(groupChatRequestMessage.getGroupName());
for(Channel channel: channelList){
channel.writeAndFlush(new GroupChatResponseMessage(groupName+":"+userName,groupChatRequestMessage.getContent()));
}
}
}
这里就不详细处理了
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.entity.vo.req.GroupMembersRequestMessage;
import com.yjx23332.netty.test.entity.vo.resp.GroupMembersResponseMessage;
import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHanlder extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupMembersRequestMessage groupMembersRequestMessage) throws Exception {
channelHandlerContext.channel().writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(groupMembersRequestMessage.getGroupName())));
}
}
注意,此处没有处理Group中的失效连接
package com.yjx23332.netty.test.handler;
import com.yjx23332.netty.test.server.factory.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@ChannelHandler.Sharable
@Slf4j
public class QuitHandler extends ChannelInboundHandlerAdapter {
/**
* 正常关闭
* */
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
SessionFactory.getSession().unbind(channelHandlerContext.channel());
log.debug("{},已经正常断开",channelHandlerContext.channel());
}
/**
* 异常关闭
* */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{},已经异常断开, 异常内容:{}",ctx.channel(),cause);
}
}
package com.yjx23332.netty.test.server;
import com.yjx23332.netty.test.handler.*;
import com.yjx23332.netty.test.protocol.MessageCodecSharable;
import com.yjx23332.netty.test.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ChatServer {
public static void main(String[] args){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler();
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
GroupMembersRequestMessageHanlder GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHanlder();
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
QuitHandler QUIT_HANDLER = new QuitHandler();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
.group(boss,worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProcotolFrameDecoder());
socketChannel.pipeline().addLast(LOGGING_HANDLER);
socketChannel.pipeline().addLast(MESSAGE_CODEC);
socketChannel.pipeline().addLast(LOGIN_HANDLER);
socketChannel.pipeline().addLast(CHAT_HANDLER);
socketChannel.pipeline().addLast(GROUP_CREATE_HANDLER);
socketChannel.pipeline().addLast(GROUP_JOIN_HANDLER);
socketChannel.pipeline().addLast(GROUP_MEMBERS_HANDLER);
socketChannel.pipeline().addLast(GROUP_QUIT_HANDLER);
socketChannel.pipeline().addLast(GROUP_CHAT_HANDLER);
socketChannel.pipeline().addLast(QUIT_HANDLER);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("{}",e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
原因
问题
//用来判断是不是读空闲事件过长,或者写空闲事件过长
/**
* 单位为秒,为0则是不使用。
* @param 读时间
* @param 写实间
* @param 读与写共同空闲时间
* 超时触发 IdleStateEvent
* */
socketChannel.pipeline().addLast(new IdleStateHandler(5,0,0));
/**
* 同时为出入站处理器
* */
socketChannel.pipeline().addLast(new ChannelDuplexHandler(){
/**
* 用来触发特殊事件
* */
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state() == IdleState.READER_IDLE){
log.debug("已经5秒没有读取到数据");
// 主动关闭连接
ctx.channel().close();
}
}
});
但是如果连接好好的,只是没有东西发怎么办?这个可以由客户端去自动发一个心跳包。应该与业务无关。
package com.yjx23332.netty.test.entity.vo.resp;
import com.yjx23332.netty.test.entity.Message;
public class PingMessage extends Message {
@Override
public int getMessageType() {
return PingMessage;
}
}
//如果3秒没有向服务器写数据,触发一个事件
socketChannel.pipeline().addLast(new IdleStateHandler(0,3,0));
socketChannel.pipeline().addLast(new ChannelDuplexHandler(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state() == IdleState.WRITER_IDLE){
log.debug("已经3秒没有写数据了,发送一个心跳消息");
ctx.writeAndFlush(new PingMessage());
}
}
});
优化以及拓展见下一篇笔记。