• Kafka中Zero-Copy的相关总结


    简介

    Kafka用到了零拷贝(Zero-Copy)技术来提升性能。所谓的零拷贝是指数据直接从磁盘复制文件到网卡设备,而无需经过应用程序,减少了内核和用户空间之间的上下文切换。

    下面这个过程是不采用零拷贝技术时,从磁盘中读取文件然后通过网卡发送出去的流程,可以看到:经历了 4 次拷贝,4 次上下文切换。

    如果采用零拷贝技术(底层通过 sendfile64 方法实现),流程将变成下面这样。可以看到:只需 3 次拷贝以及 2 次上下文切换,显然性能更高。

    Kafka源码

    在Kafka的源码中,使用zero-copy相关的代码如下:

    org.apache.kafka.common.record.FileRecords.java

    1. @Override
    2. public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
    3. long newSize = Math.min(channel.size(), end) - start;
    4. int oldSize = sizeInBytes();
    5. if (newSize < oldSize)
    6. throw new KafkaException(String.format(
    7. "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
    8. file.getAbsolutePath(), oldSize, newSize));
    9. long position = start + offset;
    10. long count = Math.min(length, oldSize - offset);
    11. return destChannel.transferFrom(channel, position, count);
    12. }

    进一步跟踪代码到 org.apache.kafka.common.network.PlaintextTransportLayer.java,其调用的方法是 fileChannel.transferTo(position, count, socketChannel);,这也是从写代码的角度最需要关注的地方,即,通过文件的channel,直接调用其transferTo方法,传入参数为文件position, count,以及SocketChannel对象。这样就能直接将文件内容发送到SocketChannel对象所绑定的网卡,而不需要将文件的内容从内核空间复制到用户空间,然后再从用户空间复制到网卡设备的内核空间,减少了上下文切换和数据的复制,从而提高性能。

    1. @Override
    2. public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    3. return fileChannel.transferTo(position, count, socketChannel);
    4. }

    进一步跟踪 transferTo 方法,可以发现,其最终在Linux上会调用到 sun.nio.ch.FileChannelImpl.javatransferTo0方法,这是一个native方法,底层实现是调用sendfile64()的C函数,这与通过NIO快速拷贝文件非常类似,参考:Kafka中mmap的相关总结

    1. private long transferToDirectlyInternal(long position, int icount,
    2. WritableByteChannel target,
    3. FileDescriptor targetFD)
    4. throws IOException
    5. {
    6. assert !nd.transferToDirectlyNeedsPositionLock() ||
    7. Thread.holdsLock(positionLock);
    8. long n = -1;
    9. int ti = -1;
    10. try {
    11. beginBlocking();
    12. ti = threads.add();
    13. if (!isOpen())
    14. return -1;
    15. do {
    16. n = transferTo0(fd, position, icount, targetFD);
    17. } while ((n == IOStatus.INTERRUPTED) && isOpen());
    18. if (n == IOStatus.UNSUPPORTED_CASE) {
    19. if (target instanceof SinkChannelImpl)
    20. pipeSupported = false;
    21. if (target instanceof FileChannelImpl)
    22. fileSupported = false;
    23. return IOStatus.UNSUPPORTED_CASE;
    24. }
    25. if (n == IOStatus.UNSUPPORTED) {
    26. // Don't bother trying again
    27. transferSupported = false;
    28. return IOStatus.UNSUPPORTED;
    29. }
    30. return IOStatus.normalize(n);
    31. } finally {
    32. threads.remove(ti);
    33. end (n > -1);
    34. }
    35. }

    Java实例

    下面我们通过一个简单的Java实例来看看如何使用zero-copy技术来直接向socketchannel发送文件内容。

    Server

    1. package com.my.kafka.zerocopy;
    2. import java.io.IOException;
    3. import java.net.InetSocketAddress;
    4. import java.nio.ByteBuffer;
    5. import java.nio.channels.SelectionKey;
    6. import java.nio.channels.Selector;
    7. import java.nio.channels.ServerSocketChannel;
    8. import java.nio.channels.SocketChannel;
    9. import java.util.Iterator;
    10. import java.util.Set;
    11. public class Server {
    12. private static final int PORT = 8888;
    13. private Selector selector;
    14. private void startServer() {
    15. try {
    16. selector = Selector.open();
    17. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    18. serverSocketChannel.configureBlocking(false);
    19. serverSocketChannel.bind(new InetSocketAddress(PORT));
    20. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    21. System.out.println("SERVER: Server started on port: " + PORT);
    22. while (true) {
    23. int selectCode = selector.select(1000);
    24. if (selectCode == 0) {
    25. continue;
    26. }
    27. Set<SelectionKey> keys = selector.selectedKeys();
    28. Iterator<SelectionKey> it = keys.iterator();
    29. while (it.hasNext()) {
    30. SelectionKey key = it.next();
    31. it.remove();
    32. if (key.isAcceptable()) {
    33. System.out.println("SERVER: Received a request.");
    34. serverSocketChannel = (ServerSocketChannel) key.channel();
    35. SocketChannel socketChannel = serverSocketChannel.accept();
    36. socketChannel.configureBlocking(false);
    37. socketChannel.register(selector, SelectionKey.OP_READ);
    38. } else if (key.isReadable()) {
    39. readData(key);
    40. } else {
    41. System.out.println("SERVER: Key is no recognised, key: " + key);
    42. }
    43. }
    44. }
    45. } catch (IOException e) {
    46. e.printStackTrace();
    47. }
    48. }
    49. private void readData(SelectionKey key) {
    50. SocketChannel socketChannel = (SocketChannel) key.channel();
    51. ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
    52. try {
    53. socketChannel.read(buffer);
    54. buffer.flip();
    55. byte[] bytes = new byte[buffer.limit()];
    56. buffer.get(bytes);
    57. String message = new String(bytes).toString();
    58. System.out.println("SERVER: Received message from client: " + message);
    59. socketChannel.write(ByteBuffer.wrap("This is response from server side.".getBytes()));
    60. } catch (Exception e) {
    61. e.printStackTrace();
    62. }
    63. }
    64. public static void main(String[] args) {
    65. new Server().startServer();
    66. }
    67. }

    Client

    1. package com.my.kafka.zerocopy;
    2. import java.io.FileNotFoundException;
    3. import java.io.IOException;
    4. import java.io.RandomAccessFile;
    5. import java.net.InetSocketAddress;
    6. import java.nio.ByteBuffer;
    7. import java.nio.channels.FileChannel;
    8. import java.nio.channels.SelectionKey;
    9. import java.nio.channels.Selector;
    10. import java.nio.channels.SocketChannel;
    11. import java.util.Iterator;
    12. public class Client {
    13. private Selector selector;
    14. private static String FILE_PATH = "C:\\temp\\test_channel.txt";
    15. public void initClient(String ip, int port) throws IOException {
    16. SocketChannel channel = SocketChannel.open();
    17. channel.configureBlocking(false);
    18. this.selector = Selector.open();
    19. channel.connect(new InetSocketAddress(ip, port));
    20. channel.register(selector, SelectionKey.OP_CONNECT);
    21. }
    22. public void listen() throws IOException {
    23. while (true) {
    24. int selectCode = selector.select(1000);
    25. if (selectCode == 0) {
    26. continue;
    27. }
    28. Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
    29. while (it.hasNext()) {
    30. SelectionKey key = (SelectionKey) it.next();
    31. if (key.isConnectable()) {
    32. SocketChannel channel = (SocketChannel) key.channel();
    33. if (channel.isConnectionPending()) {
    34. channel.finishConnect();
    35. }
    36. channel.configureBlocking(false);
    37. channel.register(this.selector, SelectionKey.OP_READ);
    38. sendFileContent(channel);
    39. } else if (key.isReadable()) {
    40. readData(key);
    41. }
    42. it.remove();
    43. }
    44. }
    45. }
    46. /*
    47. * This is key method to send file based on zero-copy
    48. * Just call method channel.transferTo(0, channel.size(), socketChannel);
    49. */
    50. private static void sendFileContent(SocketChannel socketChannel) {
    51. try (RandomAccessFile raf = new RandomAccessFile(FILE_PATH, "rw"); FileChannel channel = raf.getChannel()) {
    52. channel.transferTo(0, channel.size(), socketChannel);
    53. } catch (IOException e) {
    54. e.printStackTrace();
    55. }
    56. }
    57. private static void prepareFile() {
    58. try (RandomAccessFile raf = new RandomAccessFile(FILE_PATH, "rw"); FileChannel channel = raf.getChannel()) {
    59. String str = "This is file content from client.";
    60. ByteBuffer buffer = ByteBuffer.allocate(str.getBytes().length);
    61. buffer.clear();
    62. buffer.put(str.getBytes());
    63. buffer.flip(); // Change mode to read, invoke this method when reading data from buffer
    64. channel.write(buffer);
    65. } catch (FileNotFoundException e) {
    66. e.printStackTrace();
    67. } catch (IOException e1) {
    68. e1.printStackTrace();
    69. }
    70. }
    71. public void readData(SelectionKey key) throws IOException {
    72. SocketChannel channel = (SocketChannel) key.channel();
    73. ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
    74. channel.read(buffer);
    75. byte[] data = buffer.array();
    76. String msg = new String(data).trim();
    77. System.out.println("CLIENT: Received message from server: " + msg);
    78. }
    79. public static void main(String[] args) throws Exception {
    80. prepareFile();
    81. Client client = new Client();
    82. client.initClient("localhost", 8888);
    83. client.listen();
    84. }
    85. }

    运行结果

  • 相关阅读:
    华为OD机试真题-剩余银饰的重量-2024年OD统一考试(C卷D卷)
    环路与快速破环
    Python所有常见功能大汇总
    抖音短视频实操:矩阵号之为什么要做矩阵号和如何做矩阵号(下)
    Mac brew 安装与使用
    Pytorch入门实例
    openGauss学习笔记-130 openGauss 数据库管理-参数设置-重设参数
    Linux Vim 进阶教程
    解决跨域问题的FastAPI应用及常见报错解析
    Kafka系列之:Kafka Connect错误报告设置
  • 原文地址:https://blog.csdn.net/funnyrand/article/details/125513774