Boss:负责建立连接
Worker:worker负责数据的读写
在编写多线程时,我们要注意一个执行顺序
Client
package com.yjx23332.netty.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",8080));
socketChannel.write(Charset.defaultCharset().encode("helloword"));
System.in.read();
}
}
Boss
package com.yjx23332.netty.test;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* Boss角色
* */
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = serverSocketChannel.register(boss, SelectionKey.OP_ACCEPT, null);
serverSocketChannel.bind(new InetSocketAddress(8080));
//创建worker
Worker worker = new Worker("worker-0");
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
log.debug("connected...{}",socketChannel.getRemoteAddress());
//2. 关联 selector
log.debug("before register..{}",socketChannel.getRemoteAddress());
worker.register(socketChannel,SelectionKey.OP_READ,null);
log.debug("after register..{}",socketChannel.getRemoteAddress());
}
}
}
}
}
worker
package com.yjx23332.netty.test;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;
@Slf4j
@Data
class Worker implements Runnable{
private Selector selector;
volatile private Thread thread;
private String name;
public Worker(String name){
this.name = name;
}
//把同步队列作为消息队列
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
//初始化线程和selector
public void register(SocketChannel socketChannel,int selectionKey,Object attachment) throws IOException {
//懒汉模式
if(this.thread == null) {
synchronized (this) {
if (this.thread == null) {
this.thread = new Thread(this, this.name);
this.selector = Selector.open();
this.thread.start();
}
}
}
//加入消息队列
queue.add(()->{
try {
socketChannel.register(selector,selectionKey,attachment);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
//唤醒select
selector.wakeup();
}
@Override
public void run() {
SelectionKey selectionKey = null;
while(true){
try {
selector.select();
//从消息队列中取出
Runnable task = queue.poll();
if(task != null){
task.run();//执行注册
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
selectionKey = iter.next();
iter.remove();
if(selectionKey.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) selectionKey.channel();
log.debug("read...{}",channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
}
catch (IOException e) {
log.debug("{}",e.getStackTrace());
if(selectionKey != null)
selectionKey.cancel();
}
catch (Exception e){
throw new RuntimeException(e);
}
}
}
}
package com.yjx23332.netty.test;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Boss角色
* */
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = serverSocketChannel.register(boss, SelectionKey.OP_ACCEPT, null);
serverSocketChannel.bind(new InetSocketAddress(8080));
//创建特定数量的Worker,我们用CPU核心数目来设置线程数
//切记:在docker容器中,因为docker不是物理隔离的,因此会获取物理CPU个数,而不是容器申请的个数
//jdk10才解决这个问题,用JVM参数UserContainerSupport配置,默认开启
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for(int i = 0;i < workers.length;i++){
workers[i] = new Worker("worker-" + i );
}
AtomicInteger atomicIntegers = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
int index = atomicIntegers.getAndUpdate(e->{
return (e + 1) % workers.length;
});
//round robin 轮询负载均衡
workers[index].register(socketChannel,SelectionKey.OP_READ,null);
log.info("当前使用的是:" + index);
}
}
}
}
}
因为他没有建立可靠连接,所以我们传输与之前的TCP模式不一样。
package com.yjx23332.netty.test;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
@Slf4j
public class TestClient {
public static void main(String[] args) throws IOException {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.connect(new InetSocketAddress("localhost",9999));
datagramChannel.write(Charset.defaultCharset().encode("hello,word!"));
ByteBuffer byteBuffer = ByteBuffer.allocate(200);
datagramChannel.read(byteBuffer);
byteBuffer.flip();
log.debug("收到来自服务器消息:{}",Charset.defaultCharset().decode(byteBuffer));
byteBuffer.clear();
datagramChannel.close();
}
}
package com.yjx23332.netty.test;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.util.Iterator;
@Slf4j
public class UDPServer {
public static void main(String[] args){
try(DatagramChannel datagramChannel = DatagramChannel.open();){
datagramChannel.bind(new InetSocketAddress(9999));
datagramChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(200);
Selector selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
while (true) {
selector.select();
Iterator<SelectionKey> it = selector.keys().iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
if(selectionKey.isReadable()){
DatagramChannel channel = (DatagramChannel) selectionKey.channel();
//receive 会返回socket地址
InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.receive(byteBuffer);
byteBuffer.flip();
log.debug("The message received is :{}", Charset.defaultCharset().decode(byteBuffer));
byteBuffer.clear();
// 通过socket返回
datagramChannel.send(Charset.defaultCharset().encode("Server has received the message!"), inetSocketAddress);
}
//移除该事件
selector.selectedKeys().remove(selectionKey);
}
}
}catch(IOException ioException){
ioException.printStackTrace();
}
}
}
当调用一次channel.read或者stream.read后,会切换至操作熊内核态完成真正的树读取,而读取又分为
等待数据阶段
复制数据阶段
因此我们通常会在加一个循环反复执行上面的逻辑。
多路复用比之阻塞IO优势?
它前半段异步,后半段阻塞。
将一个文件通过socket写出
public static void main(String[] args) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("data.txt","r");
byte[] buf = new byte[(int)randomAccessFile.length()];
randomAccessFile.read(buf);
Socket socket = new Socket("localhost",8080);
socket.getOutputStream().write(buf);
}
工作流程
DMA硬件单元。计算机中有一些硬件转么处理一些简单的读写,USB连接等,以减少CPU的占用,只需要最后告知CPU相关结果。
2.内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(byte[] buf),这期间CPU会参与拷贝,无法利用DMA。
3.调用write方法,这时将数据从用户缓冲区(byte[])写入socket缓冲区,CPU会参与拷贝
4.接下来会访问网卡写数据,此时Java又不具备该能力,因此将从用户态转换至内核态,调用操作系统的写能力,使用DMA将socket缓冲区的数据写入网卡,该过程也不会使用CPU。
通过DirectByteBuf
工作流程
这里DirectByteBuffer将作为内核缓冲区与用户缓冲区的公用区域。但是仍然需要切换回用户态创建。
linux提供sendFile方法,可以将文件直接发向目标区域。
也即是FileChannel中的transferTo/transferFrom方法底层调用的方法,我们不需要再调用Java创建一个缓存来存储了。
工作流程
3.3.3与3.3.4都可以称作零拷贝。这里零指的是Java中进行的拷贝次数。
优点:
CPU缓存伪共享:
AIO用来解决数据复制阶段的阻塞问题。
异步模型需要底层操作系统(Kernel)提供支持
windows系统通过IOCP实现了真正的异步IO
LInux系统异步IO在2.6版本引入,但其底层实现还是用多路复用模拟了异步IO,性能没有优势
可以参考IO模型之AIO代码及其实践详解
package com.yjx23332.netty.test;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;
@Slf4j
public class AioDemo {
public static void main(String[] args){
try(AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)){
/**
* @param ByteBuffer,start,attachment,callbackfunction
*
*/
ByteBuffer buffer = ByteBuffer.allocate(16);
asynchronousFileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
//一次read操作成功,就调用一次
/**
* result读取的实际字节数,attachment
* */
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.debug("开始读取...");
attachment.flip();
debugAll(attachment);
}
//异常则调用
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
log.info("{}",exc);
}
});
log.debug("执行到最后了...");
System.in.read();
}catch (IOException ioException){
log.error("{}",ioException.getStackTrace());
}
}
}