一、案例
1、编写一个NIO入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
2、目的:理解NIO非阻塞网络编程机制
3、代码
NIOServer.java
- package netty.niostart;
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
-
- public class NIOServer {
- public static void main(String[] args) throws Exception {
- //创建ServerSocketChannel -> 类似于ServerSocket
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-
- //得到一个Selector对象
- Selector selector = Selector.open();
-
- //绑定一个端口6666,在服务器端监听
- serverSocketChannel.socket().bind(new InetSocketAddress(6666));
- //设置为非阻塞
- serverSocketChannel.configureBlocking(false);
-
- //把serverSocketChannel注册到selector,关心事件为OP_ACCEPT
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-
- //循环等待客户端连接
- while(true) {
- if (selector.select(5*1000) == 0) {
- //等待5秒钟,如果没有事件发生,返回
- System.out.println("服务器等待了5秒,无连接");
- continue;
- }
-
- //如果返回的>0,就获取到相关的selectionKey集合
- //1.如果返回的>0,表示已经获取到关注的事件
- //2.selector.selectedKeys()返回关注的集合
- //3.通过selectionKeys反向获取通道
- Set
selectionKeys = selector.selectedKeys(); -
- //遍历集合
- Iterator
keyIterator = selectionKeys.iterator(); -
- while (keyIterator.hasNext()) {
- //获取到SelectionKey
- SelectionKey key = keyIterator.next();
-
- //根据key,对应的通道发生的事件,做相应的处理
- if (key.isAcceptable()) {
- //如果是OP_ACCEPT,有新的客户端连接
- //给该客户端生成一个SocketChannel
- SocketChannel socketChannel = serverSocketChannel.accept();
- //将socketChannel设置为非阻塞模式
- socketChannel.configureBlocking(false);
- //将客户端的socketChannel也注册到selector,关注事件为SelectionKey.OP_READ
- //同时给该socketChannel关联一个buffer
- socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10240));
-
- System.out.println("from " + socketChannel.hashCode() + " 客户端建立了连接");
-
- } else if (key.isReadable()) {
- //发生OP_READ
- //通过key反向获取到对应的channel
- SocketChannel socketChannel = (SocketChannel)key.channel();
- //获取到该channel关联的buffer
- ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
-
- byteBuffer.clear();
- //把当前通道的数据读到buffer中
- try {
- if (socketChannel.read(byteBuffer) == -1) {
- System.out.println("from " + socketChannel.hashCode() + " 客户端断开了连接");
- keyIterator.remove();
- socketChannel.close();
- continue;
- }
-
- //解析客户端数据
- //socket通讯格式可以自己定义:4字节报文长度+报文体
- String request = new String(byteBuffer.array(), 0, byteBuffer.position(), "utf-8");
- System.out.println("from " + socketChannel.hashCode() + " 客户端:" + request);
-
-
- //----------------------------------------------------------------//
- /**
- * 业务模块处理(能否做成异步,传入socketChannel对象)主线程只负责网络IO读写
- */
-
- //业务处理
- System.out.println("业务处理...");
-
- //使用新buffer返回
- ByteBuffer newByteBuffer = ByteBuffer.allocate(10240);
- newByteBuffer.clear();
-
- //获取返回数据
- String response = "result ok";
- newByteBuffer.put(response.getBytes("utf-8"));
- newByteBuffer.put((byte)'9');
-
- //注册写事件
- socketChannel.register(selector, SelectionKey.OP_WRITE, newByteBuffer);
- //----------------------------------------------------------------//
-
-
- } catch (IOException ioe) {
- ioe.printStackTrace();
- keyIterator.remove();
- socketChannel.close();
- continue;
- }
-
- } else if (key.isWritable()) {
- //通过key反向获取到对应的channel
- SocketChannel socketChannel = (SocketChannel)key.channel();
- //获取到该channel关联的buffer
- ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
-
- //读写切换
- byteBuffer.flip(); //只有buffer出数据需要切换
-
- //把buffer数据写入到channel
- try {
- socketChannel.write(byteBuffer);
- } catch (IOException ioe) {
- ioe.printStackTrace();
- keyIterator.remove();
- socketChannel.close();
- continue;
- }
-
- String response = new String(byteBuffer.array(), 0, byteBuffer.position(), "utf-8");
- System.out.println("to " + socketChannel.hashCode() + " 服务端:" + response);
-
- //如果长连接继续注册事件等待
- //注册读事件
- //socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10240));
- //如果短连接则断开
- socketChannel.close();
-
- } else if (key.isConnectable()) {
- System.out.println("Connectable");
- } else if (key.isValid()) {
- System.out.println("Valid");
- }
-
- //手动从集合中移除当前的SelectionKey,防止重复操作
- keyIterator.remove();
-
- }
-
- }
-
- }
- }
NIOClient.java
- package netty.niostart;
-
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SocketChannel;
-
- public class NIOClient {
- public static void main(String[] args) throws Exception {
- //得到一个网络通道
- SocketChannel socketChannel = SocketChannel.open();
- //设置非阻塞模式
- socketChannel.configureBlocking(false);
- //提供服务器端的ip和端口
- InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
-
- //连接服务器
- if (!socketChannel.connect(inetSocketAddress)) {
- while(!socketChannel.finishConnect()) {
- System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
- }
- }
-
- //如果连接成功,就发送数据
- String str = "111";
- //客户端也要关联buffer
- ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes("utf-8")); //根据字节数组产生buffer
-
- //Thread.sleep(30*1000); //模拟线程阻塞
-
- //发送数据
- //将buffer数据写入channel
- socketChannel.write(byteBuffer);
-
- //获取返回
- byteBuffer.clear();
-
- int numBytesRead;
- while ((numBytesRead = socketChannel.read(byteBuffer)) != -1) { //-1是读完
- if (numBytesRead == 0) { //0是读到0个
- if (byteBuffer.limit() == byteBuffer.position()) {
- byteBuffer.clear();
- }
- continue;
- }
-
- //buffer是数组用这个
- //System.out.println("客户端收到:" + new String(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.arrayOffset()+byteBuffer.position(), "utf-8"));
- //单个buffer用这个
- System.out.println("客户端收到:" + new String(byteBuffer.array(), 0, byteBuffer.position(), "utf-8"));
- }
-
- System.out.println("断开连接...");
- socketChannel.close();
-
- //System.in.read();
-
- }
- }
二、用telnet测试
1、打开cmd
2、执行:chcp 65001,将编码方式改为“utf-8”
3、telnet连接
4、按Ctrl + ]
5、执行send xxx
6、但是telnet传输字符长度有限制?
三、服务端设想
1、前端请求建立socket连接
2、网络通讯为NIO模型
3、将请求消息和socketChannel对象传到一个队列
4、业务模块维护一个线程池从队列获取请求并处理,还是一请求一线程模式
5、业务线程处理完成后注册写事件,结束
这样网络读写是异步的,通讯和业务处理也是异步的