socket()
创建套接字,返回一个网络连接的文件描述符
bind()
将 socket() 函数创建出来的套接字与特定的ip和端口进行绑定,经过该ip地址和端口的数据才能交给套接字处理
listen()
让套接字进入被动监听状态
accept()
当套接字处于监听状态时,可以通过 accept() 函数来接收客户端请求。accept() 函数如果正确返回一个新的套接字的文件描述符
,则表示服务器和客户端成功建立一个TCP连接。
connect()
connect() 函数用来建立连接
write()
将缓冲区 buf 中的 nbytes 个字节写入文件 fd,成功则返回写入的字节数,失败则返回 -1
read()
从 fd 文件中读取 nbytes 个字节并保存到缓冲区 buf,成功则返回读取到的字节数(但遇到文件结尾则返回0),失败则返回 -1
send()
可看成网络编程中高级的write(),提供第四个参数来控制写入
recv()
可看成网络编程中高级的read函数(),提供第四个参数来控制读
首先从一个问题出发:服务器如何支持多个并发连接
同步阻塞IO,应用进程通过系统调用 recvfrom 接收数据,但由于内核还未准备好数据报,应用进程就会阻塞住,直到内核准备好数据报,recvfrom 完成数据报复制工作,应用进程才能结束阻塞状态。
在 BIO 模型中,主线程内 accept() 与 recv() 均是阻塞操作,所以若想实现一台服务器支持多个并发连接,则需要主线程内 accept() 接受客户端请求,有新的连接进来后 clone 线程去 recv() 读取请求信息
1. socket()->fd3 创建套接字,返回文件描述符 fd3
2. bind(fd3, 8090)
3. listen(fd3)
4. 接受客户端连接 accept(fd3)->fd4 BLOCKING ,返回文件描述符 fd4
5. clone 线程 recv(fd4) BLOCKING,等待接受信息
代码实现
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090,20);
while (true) {
//阻塞1
Socket client = server.accept();
new Thread(new Runnable(){
public void run() {
InputStream in = null;
try {
in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while(true){
//阻塞2
String dataline = reader.readLine();
if(null != dataline){
System.out.println(dataline);
}else{
client.close();
break;
}
}
System.out.println("客户端断开");
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
此种方法可以解决我们上述的问题,但是在 C10k 问题上,此实现会使得资源占用过多,同时对于多线程的管理也会对系统照成不小的压力,所以我们能否让 accept() 和 recv() 操作不阻塞,然后在一个线程中去实现,避免clone更多的线程?答案是可以的,在 NIO 中可以实现
同步非阻塞IO,应用进程通过 recvfrom 调用不停的去和内核交互,直到内核准备好数据。如果没有准备好,内核会返回error,应用进程在得到error后,过一段时间再发送 recvfrom 请求。在两次发送请求的时间段,进程可以先做别的事情,实现了非阻塞
通过设置属性值可以将 accept() 与 recv() 均设置为非阻塞
的,那样就可以在单一线程中串行的去处理了
1. socket()->fd3 创建套接字,返回文件描述符 fd3
2. bind(fd3, 8090)
3. listen(fd3)
4. 将 fd3 设置为 nonblocking
5. 接受客户端连接 accept(fd3)-> fd4 | -1 ,存在连接返回文件描述符 fd4,不存在返回 -1
6. 将 fd4 设置为 nonblocking
7. 循环去 recv(fd4)
代码实现
public static void main(String[] args) throws Exception {
LinkedList<SocketChannel> clients = new LinkedList<>();
ServerSocketChannel ss = ServerSocketChannel.open();
ss.bind(new InetSocketAddress(9090));
// 接受客户端连接设置为不阻塞
// 将 fd3 设置为 nonblocking
ss.configureBlocking(false);
while (true) {
//此时不阻塞,存在连接返回文件描述符 fd4,不存在返回null
SocketChannel client = ss.accept();
if (client == null) {
// System.out.println("null.....");
} else {
// 将 fd4 设置为 nonblocking
client.configureBlocking(false);
int port = client.socket().getPort();
System.out.println("client..port: " + port);
clients.add(client);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
//遍历已经连接进来的客户端能不能读写数据
for (SocketChannel c : clients) {
// 不阻塞 >0 -1 0
int num = c.read(buffer);
if (num > 0) {
buffer.flip();
byte[] aaa = new byte[buffer.limit()];
buffer.get(aaa);
String b = new String(aaa);
System.out.println(c.socket().getPort() + " : " + b);
buffer.clear();
}
}
}
}
上面貌似就又解决了问题,避免了过多线程的创建,在单一线程中实现了多个客户端的连接,但是我们知道 recv() 会从用户态切换到内核态去查看数据有没有准备好
(即有没有放入page cache),在C10k 问题下,在用户态中会遍历所有的 fds,逐一去内核态询问状态,会频繁的在用户态与内核态间切换,那这个成本也太高了,那有没有一种方法,只需要经过一次系统调用就得到所有的 fds 的状态呢?当然是有的,多路复用
就该登场了
多个进程的IO可以注册到同一个管道上,这个管道会统一和内核进行交互。当管道中的某一个请求需要的数据准备好之后,进程再把对应的数据拷贝到用户空间中。
在 SELECT & POLL 中,一次系统调用传入一批 fds,由内核来遍历传过来的 fds,避免频繁的在用户态与内核态间切换,如果数据准备好了,则修改 fd 状态,遍历结束后,返回一批准备好数据的 fds 给到用户态,在由程序自己来进行 R/W
其中SELECT 对 fds 存在 1024 的最大限制,而POLL则没有此限制
但是我们看到每次调用SELECT 或 POLL 都需要把全部的 fds 拷贝到内核态,在内核中全量遍历完之后,在由内核态拷贝到用户态,如果存在 100w 个fd,但是只有两三个就绪的,那就会造成大量的资源浪费:
1. 用户态每次都要重新重复传递fds到内核态
2. 内核态每次都要全量遍历传入的fds
能否避免这种浪费呢?那么 EPOLL 就可以对它进行优化
内置集合
1. 所有fd的总集(红黑树)
2. 就绪fd的集合(队列)
三个API
epoll_create() ===》创建红黑树的根节点
epoll_ctl() ===》add,del,mod 增加、删除、修改结点
epoll_wait() ===》把就绪队列的结点copy到用户态放到events里面,跟recv函数很像
epoll只要有新的io就调用epoll_ctl()加入到红黑树里面,数据准备好后由协议栈进行回调加入到就绪队列,一旦调用epoll_wait()就将就绪队列的fd带出来
代码实现
public class SocketMultiplexingSingleThreadv1 {
private ServerSocketChannel server = null;
//linux 多路复用器
private Selector selector = null;
int port = 9090;
public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
//epoll_create()
selector = Selector.open();
//epoll_ctl()
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) { //死循环
Set<SelectionKey> keys = selector.keys();
System.out.println(keys.size()+" size");
//epoll_wait()
while (selector.select() > 0) {
//返回的有状态的fd集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
//新连接,接入连接加入总集
acceptHandler(key);
} else if (key.isReadable()) {
//R/w
readHandler(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(8192);
//epoll_ctl()
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");
} catch (IOException e) {
e.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
service.start();
}
}
对于 服务器如何支持多个并发连接
这个问题,对 IO 模型的演变路程进行了推导
BIO
主线程内 accept() 阻塞接受客户端请求,有新的连接进来后 clone 线程去 recv()阻塞读取请求信息,此时过多线程的创建会造成资源浪费,多线程的管理也会对系统照成不小的压力
NIO
使 accept() 与 recv() 变为非阻塞,可在一个线程中串行的去执行,减少了线程的创建,但循环的recv()又会造成频繁的用户态与内核态间的切换,也会照成很高的成本
多路复用 - SELECT\POLL
一次系统调用传入一批 fds,由内核来遍历传过来的 fds 数据是否准备好了,避免频繁的在用户态与内核态间切换,但每次又必须传入全量的 fds,内核也就需要遍历全量的fds
多路复用 - EPOLL
开辟两个集合:1. 所有fd的总集(红黑树)2. 就绪fd的集合(队列),epoll只要有新的io就调用epoll_ctl()加入到红黑树里面,数据准备好后由协议栈进行回调加入到就绪队列,一旦调用epoll_wait()就将就绪队列的fd带出来