package com.example.nettytest01;
import com.google.common.primitives.Chars;
import lombok.extern.slf4j.Slf4j;
import org.omg.SendingContext.RunTime;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class ThreadsServer {
public static void main(String[] args) {
try (ServerSocketChannel server = ServerSocketChannel.open()) {
// 当前线程为Boss线程
Thread.currentThread().setName("Boss");
server.bind(new InetSocketAddress(8081));
// 负责轮询Accept事件的Selector
Selector boss = Selector.open();
server.configureBlocking(false);
server.register(boss, SelectionKey.OP_ACCEPT);
// 创建固定数量的Worker //Runtime.getRuntime().availableProcessors(): 获得可用核心线程数量
// 但是在docker 容器中 假设给容器分配一核 他会拿到 主机的全部 内核数量 !!!jdk10修复
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
// 用于负载均衡的原子整数
AtomicInteger robin = new AtomicInteger(0);
for(int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-"+i);
}
while (true) {
boss.select();
Set selectionKeys = boss.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// BossSelector负责Accept事件
if (key.isAcceptable()) {
// 建立连接
SocketChannel socket = server.accept();
System.out.println("connected...");
socket.configureBlocking(false);
// socket注册到Worker的Selector中
System.out.println("before read...");
// 负载均衡,轮询分配Worker getAndIncrement:自增
workers[robin.getAndIncrement()% workers.length].register(socket);
System.out.println("after read...");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Worker implements Runnable {
private Thread thread;
private volatile Selector selector;
private String name;
private volatile boolean started = false;
/**
* 同步队列,用于Boss线程与Worker线程之间的通信
*/
private ConcurrentLinkedQueue queue;
public Worker(String name) {
this.name = name;
}
public void register(final SocketChannel socket) throws IOException {
// 只启动一次
if (!started) {
thread = new Thread(this, name);
selector = Selector.open();
queue = new ConcurrentLinkedQueue<>();
thread.start();
started = true;
}
// 向同步队列中添加SocketChannel的注册事件
// 在Worker线程中执行注册事件
queue.add(new Runnable() {
@Override
public void run() {
try {
socket.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 唤醒被阻塞的Selector
// select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
selector.select();
log.info("线程已经开启lo~~");
// 通过同步队列获得任务并运行
Runnable task = queue.poll();
if (task != null) {
// 获得任务,执行注册操作
task.run();
}
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// Worker只负责Read事件
if (key.isReadable()) {
// 简化处理,省略细节
log.info("work.selector收到一个读事件~~");
SocketChannel socket = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
socket.read(buffer);
buffer.flip();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端
package com.example.nettytest01;
import com.google.common.primitives.Chars;
import lombok.extern.slf4j.Slf4j;
import org.omg.SendingContext.RunTime;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class ThreadsServer {
public static void main(String[] args) {
try (ServerSocketChannel server = ServerSocketChannel.open()) {
// 当前线程为Boss线程
Thread.currentThread().setName("Boss");
server.bind(new InetSocketAddress(8081));
// 负责轮询Accept事件的Selector
Selector boss = Selector.open();
server.configureBlocking(false);
server.register(boss, SelectionKey.OP_ACCEPT);
// 创建固定数量的Worker //Runtime.getRuntime().availableProcessors(): 获得可用核心线程数量
// 但是在docker 容器中 假设给容器分配一核 他会拿到 主机的全部 内核数量 !!!jdk10修复
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
// 用于负载均衡的原子整数
AtomicInteger robin = new AtomicInteger(0);
for(int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-"+i);
}
while (true) {
boss.select();
Set selectionKeys = boss.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// BossSelector负责Accept事件
if (key.isAcceptable()) {
// 建立连接
SocketChannel socket = server.accept();
System.out.println("connected...");
socket.configureBlocking(false);
// socket注册到Worker的Selector中
System.out.println("before read...");
// 负载均衡,轮询分配Worker getAndIncrement:自增
workers[robin.getAndIncrement()% workers.length].register(socket);
System.out.println("after read...");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Worker implements Runnable {
private Thread thread;
private volatile Selector selector;
private String name;
private volatile boolean started = false;
/**
* 同步队列,用于Boss线程与Worker线程之间的通信
*/
private ConcurrentLinkedQueue queue;
public Worker(String name) {
this.name = name;
}
public void register(final SocketChannel socket) throws IOException {
// 只启动一次
if (!started) {
thread = new Thread(this, name);
selector = Selector.open();
queue = new ConcurrentLinkedQueue<>();
thread.start();
started = true;
}
// 向同步队列中添加SocketChannel的注册事件
// 在Worker线程中执行注册事件
queue.add(new Runnable() {
@Override
public void run() {
try {
socket.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
});
// 唤醒被阻塞的Selector
// select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
selector.select();
log.info("线程已经开启lo~~");
// 通过同步队列获得任务并运行
Runnable task = queue.poll();
if (task != null) {
// 获得任务,执行注册操作
task.run();
}
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// Worker只负责Read事件
if (key.isReadable()) {
// 简化处理,省略细节
log.info("work.selector收到一个读事件~~");
SocketChannel socket = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
socket.read(buffer);
buffer.flip();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
复制数据是 查询完毕的数据写入 java缓存里面
阻塞io的缺点:在做一件事情的时候做不了另一件事 。 在处理read事件无法 处理新用户的accept事件
一个线程解决多个请求 ,select可以监听多个事件 ,一次性接收多个事件并且处理 。像一个漏斗,可以好几个进(接收事件),
一个个出(处理)
同步:线程自己去获取结果
阻塞io 是同步的 。
非阻塞io也是同步的
多路复用
异步:线程自己不去获取结果 ,由 其他线程递送结果
通过回调方法 来接收结果 ,相当于叫小弟 去买烟 自己等待
不存在 异步阻塞 ,异步天然非阻塞!!异步自己可以去淦别的事情
图1.1
java本身不具备io 读写能力 , 是操作系统帮他完成的 。
图1.1 一共发生了四次io 读写 3次内核态用户态切换 。
通过DirectByteBuf优化
ByteBuffer.allcate(10) HeaoByteBuffer 使用的是java内存
ByteBuffer.allcateDirect(10)DirectByteBuffer 使用的是操作系统内存
相对于图1.1 少一次io读写
零拷贝 : 文件不需要经过java缓冲区 ,但是知识和小文件
异步非阻塞 在springboot响应式编程中发光发热!! 天然干高并发