• 【Hadoop】学习笔记(三)


    二、HDFS

    2.6 、 HDFS的Shell操作

    基本语法:

    # 使用hadoop命令操作hdfs
    #可操作任意文件系统,不仅仅是hdfs文件系统,还能操作本地、谷歌GFS等,使用范围更广
    hadoop fs <具体命令>
    
    # 或者 使用hdfs命令操作hdfs
    #只能操作hdfs文件系统(包括与local fs之间的操作)
    hdfs dfs <具体命令>  # 最后其实也是调用的:hadoop fs <具体命令>
    
    
    # 或者使用hadoop dfs操作hdfs,
    hadoop dfs <具体命令> # 不推荐,已过时
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    很多命令、参数和Linux命令很相似,例如:

    # 列出hdfs上的文件/文件夹列表,类似linux的ls命令
    hadoop fs -ls /
    
    # 创建文件夹,类似Linux的mkdir命令
    hadoop fs -mkdir /wcinput
    
    # 删除hdfs上的文件夹,类似linux的 rm -rf命令
    hadoop fs -rm -f -r /wcoutput
    
    # 统计文件夹的大小信息
    # -s 列出总大小,不加该参数时会列出文件夹下每个文件大小
    # -h 以适当的单位展示
    # 输出结果第一项表示文件大小,第二项表示所有副本加一起的总大小(默认情况下,一个文件有3个副本,所以默认情况下第二项结果等于第一项的3倍)
    hadoop fs -du -s -h /jinguo
    
    # 从本地剪切,粘贴到hdfs
    hadoop fs -moveFromLocal shuguo.txt /sanguo
    # 从本地复制,粘贴到hdfs
    hadoop fs -copyFromLocal weiguo.txt /sanguo
    # 上传,同 copyFromLocal 功能一样
    hadoop fs -put aa.txt /wcinput
    
    
    # 从服务器复制,粘贴到本地
    hadoop fs -copyToLocal /sanguo/shuguo.txt ./
    # 下载,等同于 copyToLocal
    hadoop fs -get /wcinput/aa.txt a.txt
    
    # 将liubei.txt追加到shuguo.txt文件末尾
    hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt
    
    #设置HDFS中文件副本数量
    hadoop fs -setrep 10 /jinguo/weiguo.txt
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    2.7 、 HDFS的JavaAPI操作

    2.7.1、客户端环境准备

    虽然我们编写 java 代码的电脑是作为客户端去连接 hdfs 服务器,但是 hdfs 要求如果要读写hdfs就需要在客户端也安装 hadoop。但是hdfs官方又没有Windows版的安装包:

    • 如果是准备在 Linux / mac 环境编写 java 代码连接HDFS,则只需要在Linux系统上也安装一下 hadoop
      即可(将Hadoop压缩包解压,然后配置环境变量)。
    • 如果准备在 Windows 环境下编写 java 代码连接 hdfs,则需要在windows系统中安装与hadoop服务器对应版本的
      winutils.exe和 hdfs.dll(因为hdfs默认不支持windows安装)。winutils.exe工具可以在 github
      上找到,也可以自己编译hadoop源码得到。
    1. 找到资料包路径下的 Windows 依赖文件夹,拷贝hadoop-3.1.0到非中文路径(比如d:)
    2. 配置HADOOP HOME环境变量
      在这里插入图片描述

    2.7.2、编写Demo

    创建Maven工程,添加依赖

    <dependency>
       <groupId>org.apache.hadoopgroupId>
       <artifactId>hadoop-clientartifactId>
       
       <version>3.2.3version>
    dependency>
    <dependency>
       <groupId>junitgroupId>
       <artifactId>junitartifactId>
       <version>4.12version>
       <scope>testscope>
    dependency>
    <dependency>
       <groupId>org.slf4jgroupId>
       <artifactId>slf4j-log4j12artifactId>
       <version>1.7.30version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    配置log4j日志:log4j.properties

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/hadoop-client.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    HdfsClient

    package com.example.bigdatademo.HdfsClient;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.util.Arrays;
    
    public class HdfsClient {
       private FileSystem fs;
       @BeforeEach
       /**
        * 获取客户端对象
        */
       public void init() throws URISyntaxException, IOException, InterruptedException {
           Configuration configuration = new Configuration();
           String uri = "hdfs://master:9000";  // 连接的集群NN地址
           String user = "root";  // 用户
           fs = FileSystem.get(new URI(uri), configuration, user);
       }
    
       @AfterEach
       /**
        * 关闭资源
        */
       public void close() throws IOException {
           fs.close();
       }
    
       @Test
       /**
        * 测试创建目录
        */
       public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {
           // 在HDFS上创建一个文件夹
           fs.mkdirs(new Path("/xiyouji/huaguoshan"));
       }
       @Test
       /**
        * 测试上传
        */
       public void testPut() throws IOException {
           // 上传完毕是否删除原数据
           boolean delSrc = false;
           // 如果hdfs上已有该文件,是否允许覆盖,(不允许覆盖时,如果目的地已经存在该文件,则抛出异常)
           boolean overwrite = false;
           // 源数据路径
           Path src = new Path("C:/Users/YoRHa/Desktop/word.txt");
           // 目的地路径,可以加上协议写成完整路径 hdfs://hadoop102:8020/xiyouji/huaguoshan/,也可以不加
           Path dst = new Path("/xiyouji/huaguoshan/");
           fs.copyFromLocalFile(delSrc, overwrite, src, dst);
       }
    
       @Test
       /**
        * 测试从hdfs下载
        */
       public void testGet() throws IOException {
           // 下载完毕后,是否删除hdfs上的源文件
           boolean delSrc = false;
           // hdfs源数据路径(文件或文件夹),也可以加上hdfs://hadoop102/写成完整路径
           Path src = new Path("/xiyouji/huaguoshan/word.txt");
           // 目的地路径是一个文件夹,程序会将hdfs中的文件下载到该文件夹
           Path dst = new Path("C:/Users/YoRHa/Desktop/temp/");
           // 是否进行CRC完整性校验。true则不生成crc校验文件,false会生成
           boolean useRawLocalFileSystem = true;
           fs.copyToLocalFile(delSrc, src, dst, useRawLocalFileSystem);
       }
    
       @Test
       /**
        * 测试删除hdfs文件
        */
       public void testRM() throws IOException {
           // 要删除的路径(文件或文件夹)
           Path path = new Path("/xiyouji/huaguoshan/word.txt");
           // 是否递归删除,删除非空文件夹时需要递归删除文件夹下的内容,类似 rm 的 -r 参数。删除文件或空文件夹时可以不递归
           boolean recursive = false;
           fs.delete(path, recursive);
       }
    
       @Test
       /**
        * 测试移动和重命名hdfs文件
        */
       public void testMV() throws IOException {
           //更改前的文件
           //Path src = new Path("/xiyouji/huaguoshan/word.txt");
           //更改后的文件
           //Path dst = new Path("/xiyouji/huaguoshan/ss.txt");
           // 重命名
           //fs.rename(src, dst);
    
           // 文件移动位置并重命名
           //fs.rename(new Path("/xiyouji/huaguoshan/ss.txt"), new Path("/xiyouji/word.txt"));
           //目录更名
           fs.rename(new Path( "/xiyouji/huaguoshan"), new Path(  "/xiyouji/shuliandong"));
       }
    
       @Test
       /**
        * 列举文件夹下文件详情, ls 命令
        */
       public void testLS() throws IOException {
           Path path = new Path("/xiyouji");
           boolean recursive = false; // 是否递归
           // 类似ls命令,返回值是一个迭代器
           RemoteIterator<LocatedFileStatus> listFilesIterator = fs.listFiles(path, recursive);
           while(listFilesIterator.hasNext()) {
               LocatedFileStatus fileStatus = listFilesIterator.next();  // 获取到文件属性
               System.out.println("==================="+fileStatus.getPath()+"===================");
               System.out.println(fileStatus.getPermission());
               System.out.println(fileStatus.getOwner());
               System.out.println(fileStatus.getGroup());
               System.out.println(fileStatus.getLen());
               System.out.println(fileStatus.getModificationTime());
               System.out.println(fileStatus.getReplication());
               System.out.println(fileStatus.getBlockSize());
               System.out.println(fileStatus.getPath().getName());
    
               //获取块信息
               BlockLocation[] blockLocations = fileStatus.getBlockLocations();
               System.out.println(Arrays.toString(blockLocations));
           }
       }
    
       @Test
       /**
        * HDFS 文件和文件夹判断
        */
       public void testListstatus () throws IOException, InterruptedException, URISyntaxException {
           FileStatus[] listStatus = fs.listStatus(new Path( "/"));
           for (FileStatus status : listStatus) {
               if (status.isFile()) {
                   System.out.println("文件: " + status.getPath().getName());
               } else {
                   System.out.println("目录: " + status.getPath().getName());
               }
           }
       }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

    参数优先级
    代码里面的配置 > 在项目资源目录下的配置文件 > hdfs-site.xml >hdfs-default.xml

    2.8 、 HDFS的读写流程

    2.8.1、HDFS写数据流程

    2.8.1.1 、剖析文件写入

    在这里插入图片描述
    在第7步中,客户端要向DataNode写数据时,是以一种管道(pipeline)的方式,先向一个DataNode写入,然后由该DataNode继续发给下一个、下一个再发给下下个。不是客户端多线程同时写多个DataNode。

    客户端向DataNode传输数据时,也不是直接串行将整个文件块写入,而是将文件块拆分成多个64k的数据包(packet),多个数据包并行的对同一个DataNode进行写入。

    DataNode写数据完成后会向给自己发送数据的前一级发送应答信号。如果DataNode写数据失败,没有正确发送应答信号,它的前一级会重试重新向该DataNode传输数据。

    写数据流程:

    • HDFS客户端创建对象实例 DistributedFileSystem,该对象中封装了与HDFS文件系统操作的相关方法
    • 调用DistributedFileSystem对象的create()方法,通过 RPC 请求 NameNode创建文件。NameNode执行各种检查判断:目标文件是否存在、父目录是否存在、客户端是否具有创建文件的权限。NameNode就会为本次请求记下一条记录,返回 FSDataOutputStream 输出流对象给客户端用于写数据。
    • 客户端通过 FSDataOutputStream输出流开始写入数据
    • 客户端写入数据时,将数据分成一个个数据包(packeg,默认64k)。内部组件DataStreamer请求NameNode挑选出适合存储数据副本的一组DataNode地址,默认是3副本存储。DataStreamer将数据包流式传输到管道(pipeline)的第一个 DataNode,该DataNode存储数据包并将它发送到 pipeline 的第二个DataNode。同样的,第二个DataNode存储数据包并发送给第三个(也是最后一个)DataNode。
    • 传输的反方向上,会通过ACK机制校验数据包传输是否成功
    • 客户端完成数据写入后,在 FSDataOutputStream 输出流上调用 close() 方法关闭
    • 客户端DsitributedFileSystem联系NameNode,告知NameNode文件写入完成,等待NameNode确认。因为NameNode已经知道文件由哪些块组成(DataStream请求分配数据块),因此仅需等待最小复制块即可成功返回。最小复制是由参数 dfs.namenode.replication.min 指定,默认是1。即只要有1个副本上传成功,NameNode就认为已经上传成功,如果其他DataNode有缺失的块,可以通过这个DataNode继续复制。
    2.8.1.2 、网络拓扑-节点距离计算

    在这里插入图片描述

    2.8.1.3 、机架感知(副本存储节点的选择)

    根据节点选择的官方说明,当使用的默认3个节点副本时,hdfs选择的副本存储节点为:

    • 副本1存储在本机节点
      one replica on the local machine
    • 副本2存储在另一个机架的一个节点
      another replica on a node in a different (remote) rack
    • 副本3存储在和副本2相同机架的另一个节点
      the last on a different node in the same remote rack
      在这里插入图片描述

    2.8.2、HDFS读数据流程

    在这里插入图片描述
    在第1步时,NameNode接到了客户端的请求,会判断客户端的用户是否有权限读,并且判断hdfs中是否有该文件,然后将元数据响应给客户端。

    在第3步时,客户端要从DataNode中读取数据,而一个文件块会有多个副本,客户端会考虑哪个DataNode离自己最近,并且该DataNode的访问量负载不是很高才从这个DataNode上下载。

    在第5步请求第二个文件块blk_2时,是在已经读取完成了blk_1之后才会发出该请求,是串行读,不是多线程并行。最后将读到的blk_2数据追加到blk_1末尾,就可以拼接成一个完整的文件。
    所以,我们在 hadoop 服务器上的data文件夹中找到hadoop存放数据的文件夹$HADOOP_HOME/data/dfs/data/current/BP-xxxxxxx/current/finalized/subdir0/subdir0,在里面将某个文件的几个blk_xxx按顺序拼接,也能恢复出原文件:

    2.9、NameNode和SecondaryNameNode

    2.9.1、NN 和 2NN

    在这里插入图片描述
    在没有 SecondaryNameNode 的集群中,NameNode的工作流程是:

    1. 系统启动后,将 edits 和 fsimage 从硬盘加载到内存中,方便数据的快速增删改。先加载fsimage内容到内存,然后在内存中按顺序把将edits中修改的内容执行一遍。
    2. 元数据发生变动时,edits中记录改变向量,内存中数据也同步修改,但是不写入磁盘的 fsimage
    3. 系统关闭时,将edits 中的改变向量按顺序执行,写入到fsimage中

    有了SecondaryNameNode的集群,SecondaryNameNode会定期询问NameNode是否到达检查点(checkpoint),如果到达了检查点,SecondaryNameNode就辅助NameNode执行edits信息向fsimage中写入。

    NameNode工作流程为:

    1. 系统启动,将edits 和 fsimage 从磁盘加载到内存中
    2. 客户端发出修改元数据的请求给NameNode
    3. NameNode 将修改数据的改变向量写入磁盘的edit_inprogress中,然后同步修改掉内存中的数据

    SecondaryNameNode工作流程为:

    1. 向NameNode询问是否到达检查点(默认如果Edits记录的修改次数达到100万,或者距离上个checkpoint时间间隔了1小时,就到达了检查点)
    2. 如果到达检查点,请求执行Checkpoint
    3. NameNode的edit_inprogress_001中存储的改变向量滚动写入Edits中。在 edit_inporgress_001写入edits过程中,如果客户端向NameNode发出改变元数据的请求,这部分新的改变向量被暂时先写入edit_inprogress_002中。
    4. 将 NameNode 的Edits、fsimage拷贝到SecondaryNameNode
    5. 将 Edits、fsimage信息加载到自己的内存中。在fsimage基础上顺序执行Edits中的改变向量
    6. 将内存中的计算结果写入磁盘fsimage.checkpoint
    7. 将fsimage.checkpoint拷贝给NameNode
    8. NameNode 将 fsimage.checkpoint 重命名为 fsimage,覆盖原有的fsimage

    因为checkpoint拷贝的时候,客户端如果向DataNode发出写元数据的请求,这部分元数据的修改被记录到了NameNode的edit_inprogress中,而SecondaryNameNode没有这个文件。所以如果NameNode宕机,使用SecondaryNameNode临时充当NameNode的话,会丢失掉这部分信息。

    2.9.2、FSImage 和 Edits

    NameNode被格式化之后,将在$HADOOP_HOME/data/dfs/name/current目录下产生如下文件:

    • fsimage_xxxxxx
    • fsimage_xxxxxx.md5
    • seen_txid
    • VERSION

    如果NameNode中存储的元数据信息有修改变动,还会在该文件夹下生成:

    • edits_xxxx
    • edits_inprogress_xxxxx

    FSImage文件:HDFS文件系统元数据的一个永久性的检查点。其中包含HDFS文件系统的所有目录和文件inode的序列化信息

    Edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits文件中。因为它只是记录本次修改的这个操作指令,并不直接去FSImage上修改指定元数据,所以Edits写的速度比较快。

    seen_txid:保存的是一个数字。就是最后一个 edits_xxx 的数字

    VERSION:记录了NameNode的命令空间编号namespaceID,还有集群编号clusterID等信息。

    每次NameNode启动时候都会将FSImage文件读入内存,加载Edis里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动时候就将FSImage和Edits文件进行了合并。

    2.9.3、oiv 和 oev命令

    使用cat等命令无法直接查看FSImage和Edits文件,可以借助 hdfs oiv查看FSImage,借助 hdfs oev查看Edits文件。

    使用oiv命令查看FSImage文件:

    hdfs oiv -p <文件类型> -d <FSImage文件> -o <转换后要输出的文件>
    
    • 1

    FSImage中并没有存储每个数据块存储在哪台DataNode上。而是在系统上电启动时,DataNode主动向NameNode汇报自己服务器上存储的数据块信息

    使用oev命令查看Edits文件:

    hdfs oev -p <文件类型> -i <Edits文件> -o <转换后要输出的文件>
    
    • 1

    2.9.4、CheckPoint设置

    默认执行CheckPoint的情况:

    • 通常情况下,SecondaryNameNode每隔一小时执行一次
    • 每分钟检查一次Edits的操作次数,当操作次数达到100万时,SecondaryNameNode执行一次

    间隔时间的配置:hdfs-default.xml

    <property>
        <name>dfs.namenode.checkpoint.periodname>
        <value>3600svalue>
    property>
    
    • 1
    • 2
    • 3
    • 4

    操作次数设置、检查操作次数的时间间隔设置:hdfs-site.xml

    <property>
        <name>dfs.namenode.checkpoint.txnsname>
        <value>1000000value>
        <describe>操作动作次数describe>
    property>
    
    <property>
        <name>dfs.namenode.checkpoint.check.periodname>
        <value>60svalue>
        <describe>1分钟检查一次操作次数describe>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.10、DataNode

    2.10.1、DataNode工作机制

    在这里插入图片描述
    一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件:一个是数据本身,一个是元数据(包括数据块的长度、块数据的校验和,以及时间戳)。

    DataNode启动后向NameNode注册,通过后,周期性(默认6小时)的向NameNode上报所有的块信息。

    心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令(如复制数据块到另一台机器,或删除某个数据块)。如果超过10分钟 + 30秒(超过10分钟,再给10次机会)还没有收到某个DataNode的心跳,就认为该节点不可用,认为该节点宕机了,不会再向该节点传输信息。

    集群运行中,可以安全的加入或退出一些机器。

    DN向NN汇报当前节点信息的时间间隔,默认6小时:hdfs-site.xml

    <property>
        <name>dfs.blockreport.intervalMsecname>
        <value>21600000value>
        <describe>Determines block reporting interval in millisecondsdescribe>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    DN扫描自己节点块信息列表的时间(默认也是6小时):hdfs-site.xml

    <property>
        <name>dfs.datanode.directoryscan.intervalname>
        <value>21600svalue>
    property>
    
    • 1
    • 2
    • 3
    • 4

    即:每隔6小时,DataNode扫描自己节点上的所有数据块信息,然后上报给NameNode。

    2.10.2、数据完整性

    DataNode保证数据完整性的做法:

    1. 当DataNode读取Block的时候,它会计算CheckSum
    2. 如果计算后的CheckSum,与Block创建时值不一样,说明Block已经损坏。
    3. Client读取其他DataNode上的Block

    常见的校验算法有:crc(32)、md5(128)、sha1(160)。

    DataNode在其文件创建后周期性验证CheckSum

    2.10.3、掉线时限设置

    在这里插入图片描述
    其中dfs.namenode.heartbeat.recheck-interval (单位毫秒)和 dfs.heartbeat.interval (单位秒)可以在 hdfs-site.xml 中进行配置。

    <property>
        <name>dfs.namenode.heartbeat.recheck-intervalname>
        <value>300000value>
        <describe>默认为300秒,即5分钟describe>
    property>
    
    <property>
        <name>dfs.heartbeat.intervalname>
        <value>3value>
        <describe>心跳时间默认3秒describe>
    property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    所以,如果是默认配置,最后真正判断的超时时间为 : 2 * 5分钟 + 10 * 3秒 = 10分钟30秒

    每个节点的上次心跳发送时间,可以在浏览器上查看:http://hadoop102:9870,进入 Datanodes,即可看到每个节点的上次发送心跳时间Last contact

  • 相关阅读:
    事件循环的学习、执行上文、this、执行栈和任务队列
    go fmt包详解
    小车PWM调速-模式选择
    Sqlmap(SQL注入自动化工具)
    计算机毕业设计Java超市管理系统(源码+系统+mysql数据库+lw文档)
    win使用git(保姆级教程)
    leetcode做题笔记198. 打家劫舍
    文件共享服务NFS(服务名nfs,端口tcp/2049)
    3.3-DIY一个Base Image
    使用 ORM 与原始 SQL 的性能对比
  • 原文地址:https://blog.csdn.net/lushixuan12345/article/details/126159846