Kafka用到了零拷贝(Zero-Copy)技术来提升性能。所谓的零拷贝是指数据直接从磁盘复制文件到网卡设备,而无需经过应用程序,减少了内核和用户空间之间的上下文切换。
下面这个过程是不采用零拷贝技术时,从磁盘中读取文件然后通过网卡发送出去的流程,可以看到:经历了 4 次拷贝,4 次上下文切换。
如果采用零拷贝技术(底层通过 sendfile64 方法实现),流程将变成下面这样。可以看到:只需 3 次拷贝以及 2 次上下文切换,显然性能更高。
在Kafka的源码中,使用zero-copy相关的代码如下:
org.apache.kafka.common.record.FileRecords.java
- @Override
- public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
- long newSize = Math.min(channel.size(), end) - start;
- int oldSize = sizeInBytes();
- if (newSize < oldSize)
- throw new KafkaException(String.format(
- "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
- file.getAbsolutePath(), oldSize, newSize));
-
- long position = start + offset;
- long count = Math.min(length, oldSize - offset);
- return destChannel.transferFrom(channel, position, count);
- }
进一步跟踪代码到 org.apache.kafka.common.network.PlaintextTransportLayer.java,其调用的方法是 fileChannel.transferTo(position, count, socketChannel);,这也是从写代码的角度最需要关注的地方,即,通过文件的channel,直接调用其transferTo方法,传入参数为文件position, count,以及SocketChannel对象。这样就能直接将文件内容发送到SocketChannel对象所绑定的网卡,而不需要将文件的内容从内核空间复制到用户空间,然后再从用户空间复制到网卡设备的内核空间,减少了上下文切换和数据的复制,从而提高性能。
- @Override
- public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
- return fileChannel.transferTo(position, count, socketChannel);
- }
进一步跟踪 transferTo 方法,可以发现,其最终在Linux上会调用到 sun.nio.ch.FileChannelImpl.java 的 transferTo0方法,这是一个native方法,底层实现是调用sendfile64()的C函数,这与通过NIO快速拷贝文件非常类似,参考:Kafka中mmap的相关总结
- private long transferToDirectlyInternal(long position, int icount,
- WritableByteChannel target,
- FileDescriptor targetFD)
- throws IOException
- {
- assert !nd.transferToDirectlyNeedsPositionLock() ||
- Thread.holdsLock(positionLock);
-
- long n = -1;
- int ti = -1;
- try {
- beginBlocking();
- ti = threads.add();
- if (!isOpen())
- return -1;
- do {
- n = transferTo0(fd, position, icount, targetFD);
- } while ((n == IOStatus.INTERRUPTED) && isOpen());
- if (n == IOStatus.UNSUPPORTED_CASE) {
- if (target instanceof SinkChannelImpl)
- pipeSupported = false;
- if (target instanceof FileChannelImpl)
- fileSupported = false;
- return IOStatus.UNSUPPORTED_CASE;
- }
- if (n == IOStatus.UNSUPPORTED) {
- // Don't bother trying again
- transferSupported = false;
- return IOStatus.UNSUPPORTED;
- }
- return IOStatus.normalize(n);
- } finally {
- threads.remove(ti);
- end (n > -1);
- }
- }
下面我们通过一个简单的Java实例来看看如何使用zero-copy技术来直接向socketchannel发送文件内容。
Server
- package com.my.kafka.zerocopy;
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
-
- public class Server {
- private static final int PORT = 8888;
- private Selector selector;
-
- private void startServer() {
- try {
- selector = Selector.open();
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(PORT));
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-
- System.out.println("SERVER: Server started on port: " + PORT);
- while (true) {
- int selectCode = selector.select(1000);
- if (selectCode == 0) {
- continue;
- }
- Set<SelectionKey> keys = selector.selectedKeys();
- Iterator<SelectionKey> it = keys.iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- it.remove();
- if (key.isAcceptable()) {
- System.out.println("SERVER: Received a request.");
- serverSocketChannel = (ServerSocketChannel) key.channel();
- SocketChannel socketChannel = serverSocketChannel.accept();
- socketChannel.configureBlocking(false);
- socketChannel.register(selector, SelectionKey.OP_READ);
- } else if (key.isReadable()) {
- readData(key);
- } else {
- System.out.println("SERVER: Key is no recognised, key: " + key);
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void readData(SelectionKey key) {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
- try {
- socketChannel.read(buffer);
- buffer.flip();
- byte[] bytes = new byte[buffer.limit()];
- buffer.get(bytes);
- String message = new String(bytes).toString();
- System.out.println("SERVER: Received message from client: " + message);
- socketChannel.write(ByteBuffer.wrap("This is response from server side.".getBytes()));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
- new Server().startServer();
- }
- }
Client
- package com.my.kafka.zerocopy;
-
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.FileChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
-
- public class Client {
-
- private Selector selector;
-
- private static String FILE_PATH = "C:\\temp\\test_channel.txt";
-
- public void initClient(String ip, int port) throws IOException {
- SocketChannel channel = SocketChannel.open();
- channel.configureBlocking(false);
- this.selector = Selector.open();
- channel.connect(new InetSocketAddress(ip, port));
- channel.register(selector, SelectionKey.OP_CONNECT);
- }
-
- public void listen() throws IOException {
- while (true) {
- int selectCode = selector.select(1000);
- if (selectCode == 0) {
- continue;
- }
-
- Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey key = (SelectionKey) it.next();
- if (key.isConnectable()) {
- SocketChannel channel = (SocketChannel) key.channel();
- if (channel.isConnectionPending()) {
- channel.finishConnect();
- }
- channel.configureBlocking(false);
- channel.register(this.selector, SelectionKey.OP_READ);
- sendFileContent(channel);
- } else if (key.isReadable()) {
- readData(key);
- }
- it.remove();
- }
- }
- }
-
- /*
- * This is key method to send file based on zero-copy
- * Just call method channel.transferTo(0, channel.size(), socketChannel);
- */
- private static void sendFileContent(SocketChannel socketChannel) {
- try (RandomAccessFile raf = new RandomAccessFile(FILE_PATH, "rw"); FileChannel channel = raf.getChannel()) {
- channel.transferTo(0, channel.size(), socketChannel);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private static void prepareFile() {
- try (RandomAccessFile raf = new RandomAccessFile(FILE_PATH, "rw"); FileChannel channel = raf.getChannel()) {
- String str = "This is file content from client.";
- ByteBuffer buffer = ByteBuffer.allocate(str.getBytes().length);
- buffer.clear();
- buffer.put(str.getBytes());
- buffer.flip(); // Change mode to read, invoke this method when reading data from buffer
- channel.write(buffer);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
-
- public void readData(SelectionKey key) throws IOException {
- SocketChannel channel = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
- channel.read(buffer);
- byte[] data = buffer.array();
- String msg = new String(data).trim();
- System.out.println("CLIENT: Received message from server: " + msg);
- }
-
- public static void main(String[] args) throws Exception {
- prepareFile();
- Client client = new Client();
- client.initClient("localhost", 8888);
- client.listen();
- }
- }
运行结果