案例说明:一个简单的群聊实现,支持重复上下线。
服务端
public class NIOServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 初始化服务器
serverChannel.bind(new InetSocketAddress(9999));
Selector selector = Selector.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 每过两秒中来看是否有请求过来
if (selector.select(2000) != 0) {
System.out.println("===================");
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
try {
String ipStr = "";
while (it.hasNext()) {
SelectionKey next = it.next();
// 建立连接
if (next.isAcceptable()) {
ByteBuffer bu = ByteBuffer.allocate(1024);
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ, bu);
ipStr = channel.getRemoteAddress().toString().substring(1);
System.out.println(ipStr + "上线 ...");
}
// 读取数据
if (next.isReadable()) {
SocketChannel channel = (SocketChannel) next.channel();
// 如果这个时候通道已经关闭了
if (!channel.isOpen()) {
next.cancel();
return;
}
try {
channel.configureBlocking(false);
ByteBuffer buffer = (ByteBuffer) next.attachment();
channel.read(buffer);
String msg = new String(buffer.array(), 0, buffer.position());
System.out.println("receive : " + msg);
// 广播消息
broadCast(selector, channel, msg);
buffer.clear();
} catch (Exception e) {
System.out.println("======================发生异常进行下线操作=========");
next.cancel();
it.remove();
continue;
}
}
it.remove();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public static void broadCast(Selector selector, SocketChannel channel, String msg) throws IOException {
Set<SelectionKey> keys = selector.keys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
SelectableChannel targetChannel = next.channel();
// 如果被广播的对象连接还在
if (targetChannel.isOpen()) {
if (targetChannel instanceof SocketChannel && channel != targetChannel) {
((SocketChannel) targetChannel).write(ByteBuffer.wrap(msg.getBytes()));
}
} else {
// 表示通道不存在了 进行下线操作
next.cancel();
}
}
}
}
客户端
public class NIOClient {
private SocketChannel channel;
private String userName;
private String bindIP;
private int bindPort;
public NIOClient(String userName, String bindIP, int bindPort) throws IOException {
channel = SocketChannel.open();
channel.configureBlocking(false);
this.bindIP = bindIP;
this.bindPort = bindPort;
channel.connect(new InetSocketAddress(bindIP, bindPort));
this.userName = userName;
while (!channel.finishConnect()) {
// 等待连接成功
}
}
public void sendMsg(String msg) throws IOException {
if (msg == "end") {
channel.close();
return;
}
msg = "from " + this.userName + " : " + msg;
channel.write(ByteBuffer.wrap(msg.getBytes()));
}
public void receive() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int size = channel.read(buffer);
if(size>0){
String msg=new String(buffer.array());
System.out.println(msg.trim());
}
}
}
// Main 函数
public static void main(String[] args) throws IOException {
new Thread(() -> {
final NIOClient nioClient;
try {
nioClient = new NIOClient("one", "127.0.0.1", 9999);
} catch (IOException e) {
throw new RuntimeException(e);
}
Thread thread = new Thread(() -> {
try {
while (true) {
nioClient.receive();
Thread.sleep(3000);
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
System.out.println("=============== 离线 ===================");
}
});
thread.start();;
System.out.println( "one pleas input : ");
Scanner scanner = new Scanner(System.in);
String msg = "";
while (!(msg = scanner.nextLine()).equals("end")) {
try {
nioClient.sendMsg(msg);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
thread.interrupt();
}).start();
};
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new NettyChatServerHandler());
}
});
ChannelFuture f = bootstrap.bind(9999).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
System.out.println("关闭");
}
}
}
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {
public static List<Channel> channels = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.add(channel);
System.out.println(channel.remoteAddress().toString().substring(1) + " online");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
Channel channel = ctx.channel();
for (Channel ch : channels) {
if (ch != channel) {
ch.writeAndFlush("["+ch.remoteAddress().toString().substring(1)+"]"+"said:"+s+"\n");
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channels.remove(channel);
System.out.println(channel.remoteAddress().toString().substring(1) + " off line");
}
}
public class ChatClient {
private String host;
private int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture sync = bootstrap.connect(new InetSocketAddress("127.0.0.1",9999)).sync();
Channel channel = sync.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg + "\\r\\n").sync();
}
sync.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim());
}
}