• 分布式文件存储系统minio、大文件分片传输


    上传大文件

    1、Promise对象

    Promise 对象代表一个异步操作,有三种状态:

    • pending: 初始状态,不是成功或失败状态。
    • fulfilled: 意味着操作成功完成。
    • rejected: 意味着操作失败。

    只有异步操作的结果,可以决定当前是哪一种状态,任何其他操作都无法改变这个状态

    有了 Promise 对象,就可以将异步操作以同步操作的流程表达出来,避免了层层嵌套的回调函数

    1. //定义异步操作
    2. var promise = new Promise (function (resolve,reject)) {
    3. // ... some code
    4. if(/* 异步操作成功 */){
    5. resolve(value)
    6. } else {
    7. reject(error)
    8. }
    9. }
    10. //异步操作的回调(写法一)
    11. promise.then( function( result ) {
    12. // result ==上面的value
    13. console.log("fulfilled:",val)
    14. }).catch(function (error){
    15. // error==上面的error
    16. console.log('rejected',err)
    17. });
    18. //异步操作的回调(写法二)
    19. promise.then((result) => console.log("fulfilled:",result)).catch((error) =>console.log('rejected',error));

    Promise.all()

    除了串行执行若干异步任务外,Promise还可以并行执行异步任务。

    Promise.all() 可以将多个Promise实例包装成一个新的Promise实例;

    const p = Promise.all([p1, p2, p3]);

    p的状态由p1p2p3决定,分成两种情况。

    (1)只有p1p2p3的状态都变成fulfilledp的状态才会变成fulfilled,此时p1p2p3的返回值组成一个数组,传递给p回调函数

    (2)只要p1p2p3之中有一个被rejectedp的状态就变成rejected,此时第一个被reject的实例的返回值,会传递给p的回调函数。

    曾经困扰我的一个问题

    <button @click="bbb">异步调用button>

    异步的写法

    1. //异步的
    2. bbb() {
    3. let arr = [];
    4. for (let index = 0; index < 15; index++) {
    5. arr.push(this.aaa(index));
    6. }
    7. //使用之前不希望调用
    8. console.log(arr);
    9. },
    10. aaa(i) {
    11. return new Promise((resolve, reject) => {
    12. setTimeout(() => {
    13. console.log("i", i);
    14. resolve(i);
    15. this.a++;
    16. }, 200 - 5 * i);
    17. });
    18. },

    同步的写法

    1. bbb() {
    2. let arr = [];
    3. let p = undefined;
    4. for (let index = 0; index < 15; index++) {
    5. if (index == 0) {
    6. p = this.aaa(index);
    7. } else {
    8. //获取上一个
    9. p = this.aaa(index, p);
    10. }
    11. }
    12. //使用之前不希望调用
    13. console.log(p);
    14. },
    15. aaa(i, promise) {
    16. if (i == 0) {
    17. return new Promise((resolve, reject) => {
    18. setTimeout(() => {
    19. console.log("i", i);
    20. resolve(i);
    21. this.a++;
    22. }, 200 - 5 * i);
    23. });
    24. } else {
    25. return promise.then((value) => {
    26. return new Promise((resolve, reject) => {
    27. setTimeout(() => {
    28. console.log("i", i);
    29. resolve(i);
    30. this.a++;
    31. }, 200 - 5 * i);
    32. });
    33. });
    34. }
    35. },

    2、Blob对象和File 对象

    Blob对象

    Blob对象有一个slice方法,返回一个新的Blob对象,包含了源Blob对象中制定范围内的数据。

    var blob = instanceOfBlob.slice([start [, end [, contentType]]]};

    参数说明:

    start: 可选,代表 Blob 里的下标,表示第一个会被会被拷贝进新的 Blob 的字节的起始位置。如果传入的是一个负数,那么这个偏移量将会从数据的末尾从后到前开始计算。

    end: 可选,代表的是 Blob 的一个下标,这个下标-1的对应的字节将会是被拷贝进新的Blob 的最后一个字节。如果你传入了一个负数,那么这个偏移量将会从数据的末尾从后到前开始计算。

    contentType 可选

    给新的 Blob 赋予一个新的文档类型。这将会把它的 type 属性设为被传入的值。它的默认值是一个空的字符串。

    通过slice方法,从blob1中创建出一个新的blob对象,size等于3。

    1. var data = "abcdef";
    2. var blob1 = new Blob([data]);
    3. var blob2 = blob1.slice(0,3);
    4. console.log(blob1); //输出:Blob {size: 6, type: ""}
    5. console.log(blob2); //输出:Blob {size: 3, type: ""}

    File继承字Blob,因此我们可以调用slice方法对大文件进行分片上传。


    3、FileReader函数

    FileReader的使用方式非常简单分为三个步骤:

    1.创建FileReader对象 2.读取文件 3.处理文件读取事件(完成,中断,错误等...)

    创建FileReader对象

    let reader = new FileReader();

    读取文件

    FileReader 的实例拥有 4 个方法,其中 3 个用以读取文件,另一个用来中断读取。

    方法名参数描述
    abort中断读取
    readAsBinaryStringfile将文件读取为二进制码
    readAsDataURLfile将文件读取为 DataURL
    readAsTextfile, [文件编码]将文件读取为文本
    上面的表格列出了这些方法以及他们的参数和功能,需要注意的是 ,无论读取成功或失败,方法并不会返回读取结果, 这一结果存储在 result属性中
    1. //1.将文件读取为二进制数据
    2. reader.readAsBinaryString(file);
    3. //2.将文件读取为文本数据
    4. reader.readAsText(file,"UTF-8")

    读取事件

    FileReader 包含了一套完整的事件模型,用于捕获读取文件时的状态,下面这个表格归纳了这些事件。

    事件描述
    onabort中断时触发
    onerror出错时触发
    onload文件读取成功完成时触发
    onloadend读取完成触发,无论成功或失败
    onloadstart读取开始时触发
    onprogress读取中

    完整读取示例

    1. //1.获取文件对象
    2. let file = data.file;
    3. //2.创建FileReader对象,用于读取文件
    4. let reader = new FileReader();
    5. //3.将文件读取为二进制码
    6. reader.readAsBinaryString(file);
    7. //4.处理读取事件
    8. //4.1读取完成事件,获取数据
    9. reader.onload = (e) => {
    10. //获取数据
    11. const data = e.currentTarget.result;
    12. };
    13. //4.2 //读取中断事件
    14. reader.onabort = () => {
    15. console.log('读取中断了');
    16. };

    4、前端文件转MD5

    MD5计算将整个文件或者字符串,通过其不可逆的字符串变换计算,产生文件或字符串的MD5散列值。

    因此MD5常用于校验文件,以防止文件被“篡改”。因为如果文件、字符串的MD5散列值不一样,说明文件内容也是不一样的

    安装依赖

    npm install spark-md5 --save

    导包

    import SparkMD5 from 'spark-md5'

    要配合js的 FileReader 函数来使用 SparkMD5

    1. const getFileMD5 = (file:File) => {
    2. return new Promise((resolve, reject) => {
    3. const spark = new SparkMD5.ArrayBuffer()
    4. const fileReader = new FileReader()
    5. fileReader.onload = (e:any) => {
    6. spark.append(e.target.result)
    7. resolve(spark.end())
    8. }
    9. fileReader.onerror = () => {
    10. reject('')
    11. }
    12. fileReader.readAsArrayBuffer(file)
    13. })
    14. }

    6、


    Minio基本知识

    minio下载启动

    minio:对象存储服务。一个对象文件可以是任意大小,从几kb到最大5T不等

    中文网站: MinIO | 高性能分布式存储,私有云存储

    下载后:新建一个minioData文件夹用来存储上传的文件

    在minio.exe文件夹的路径处输入cmd进入命令行界面(该exe文件不能双击运行)
    输入命令:minio.exe server E:\server\minio\install\minioData

    接着访问:http://localhost:9000/

    用户名和密码都是minioadmin


    基础概念

    Object:存储到minio中的基本对象,如文件、字节流

    Bucket:存放文件的顶层目录,起到一个隔离的作用

    Drive:存储数据的磁盘


    mc客户端使用

    MinIO 提供客户端工具访问和操作服务端。MinIO 客户端工具 mc(minio client)

    提供了类似 unix 的命令去操作服务端。mc 相关命令列表如下所示(请查看英文官网):

    1. ls 列出文件和文件夹。
    2. mb 创建一个存储桶或一个文件夹。
    3. cat 显示文件和对象内容。
    4. pipe 将一个STDIN重定向到一个对象或者文件或者STDOUT。
    5. share 生成用于共享的URL。
    6. cp 拷贝文件和对象。
    7. mirror 给存储桶和文件夹做镜像。
    8. find 基于参数查找文件。
    9. diff 对两个文件夹或者存储桶比较差异。
    10. rm 删除文件和对象。
    11. events 管理对象通知。
    12. watch 监听文件和对象的事件。
    13. policy 管理访问策略。
    14. session 为cp命令管理保存的会话。
    15. config 管理mc配置文件。
    16. update 检查软件更新。
    17. version 输出版本信息。

    查看 minio服务端

    mc config host ls

    添加服务端

    mc config host add xiayumao_minio http://127.0.0.1:9000 minioadmin minioadmin

    再次查看

    删除服务端

    mc config host remove xiayumao_minio

    mc config host add minio-server http://127.0.0.1:9000 minioadmin minioadmin

    查看

    ls命令列出文件、对象和存储桶。

    -----------------

    创建桶

    mc mb minio-server/sringcloud

    递归创建桶,对吗?

    mc mb minio-server/nacos/2023/9/23


    springboot整合minio

    依赖和配置

    1. <dependency>
    2. <groupId>io.miniogroupId>
    3. <artifactId>minioartifactId>
    4. <version>8.2.1version>
    5. dependency>

    application.yml 配置信息

    1. minio:
    2. endpoint: http://127.0.0.1:9000 #Minio服务所在地址
    3. accessKey: minioadmin #账号
    4. secretKey: minioadmin #密码

    注入MinioClient 

    关于 MinIO 的一切操作都得通过 MinioClient 对象来进行

    1. @Data
    2. @Configuration
    3. @ConfigurationProperties(prefix = "minio")
    4. public class MinioConfig {
    5. private String endpoint;
    6. private String accessKey;
    7. private String secretKey;
    8. @Bean
    9. public MinioClient minioClient() {
    10. return MinioClient.builder()
    11. .endpoint(endpoint)//服务地址
    12. .credentials(accessKey, secretKey)//账号 密码
    13. .build();
    14. }
    15. }

    MinioUtil封装桶操作

    1. @Component
    2. @Slf4j
    3. public class MinioUtil {
    4. @Resource
    5. private MinioClient minioClient;
    6. /**
    7. * 查看存储bucket是否存在
    8. *
    9. * @return boolean
    10. */
    11. public Boolean bucketExists(String bucketName) {
    12. Boolean found;
    13. try {
    14. found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
    15. } catch (Exception e) {
    16. e.printStackTrace();
    17. return false;
    18. }
    19. return found;
    20. }
    21. /**
    22. * 创建存储bucket
    23. *
    24. * @return Boolean
    25. */
    26. public Boolean makeBucket(String bucketName) {
    27. try {
    28. minioClient.makeBucket(MakeBucketArgs.builder()
    29. .bucket(bucketName)
    30. .build());
    31. } catch (Exception e) {
    32. e.printStackTrace();
    33. return false;
    34. }
    35. return true;
    36. }
    37. /**
    38. * 删除存储bucket
    39. *
    40. * @return Boolean
    41. */
    42. public Boolean removeBucket(String bucketName) {
    43. try {
    44. //删除一个空桶
    45. minioClient.removeBucket(RemoveBucketArgs.builder()
    46. .bucket(bucketName)
    47. .build());
    48. } catch (Exception e) {
    49. e.printStackTrace();
    50. return false;
    51. }
    52. return true;
    53. }
    54. /**
    55. * 获取全部bucket
    56. */
    57. public List getAllBuckets() {
    58. try {
    59. return minioClient.listBuckets();
    60. } catch (Exception e) {
    61. e.printStackTrace();
    62. }
    63. return null;
    64. }
    65. /**
    66. * 列出存储桶中的所有对象
    67. *
    68. * @param bucketName 存储桶名称
    69. * @return
    70. * @throws Exception
    71. */
    72. public Iterable> listObjects(String bucketName) {
    73. boolean flag = bucketExists(bucketName);
    74. if (flag) {
    75. return minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).build());
    76. }
    77. return null;
    78. }
    79. /**
    80. * 递归查询桶下对象
    81. *
    82. * @param bucketName 存储桶名称
    83. * @return
    84. * @throws Exception
    85. */
    86. public Iterable> recursiveListObjects(String bucketName) {
    87. boolean flag = bucketExists(bucketName);
    88. if (flag) {
    89. return minioClient.listObjects(ListObjectsArgs.builder().bucket(bucketName).recursive(true).build());
    90. }
    91. return null;
    92. }
    93. /**
    94. * 列出某个桶中的所有文件名
    95. * 文件夹名为空时,则直接查询桶下面的数据,否则就查询当前桶下对于文件夹里面的数据
    96. *
    97. * @param bucketName 桶名称
    98. * @param folderName 文件夹名
    99. * @param isDeep 是否递归查询
    100. */
    101. public Iterable> getBucketAllFile(String bucketName, String folderName, Boolean isDeep) {
    102. if (!StringUtils.hasLength(folderName)) {
    103. folderName = "";
    104. }
    105. System.out.println(folderName);
    106. Iterable> listObjects = minioClient.listObjects(
    107. ListObjectsArgs
    108. .builder()
    109. .bucket(bucketName)
    110. .prefix(folderName + "/")
    111. .recursive(isDeep)
    112. .build());
    113. return listObjects;
    114. }
    115. /**
    116. * 创建文件夹
    117. *
    118. * @param bucketName 桶名
    119. * @param folderName 文件夹名称
    120. * @return
    121. * @throws Exception
    122. */
    123. public ObjectWriteResponse createBucketFolder(String bucketName, String folderName) throws Exception {
    124. if (!bucketExists(bucketName)) {
    125. throw new RuntimeException("必须在桶存在的情况下才能创建文件夹");
    126. }
    127. if (!StringUtils.hasLength(folderName)) {
    128. throw new RuntimeException("创建的文件夹名不能为空");
    129. }
    130. PutObjectArgs putObjectArgs = PutObjectArgs.builder()
    131. .bucket(bucketName)
    132. .object(folderName + "/")
    133. .stream(new ByteArrayInputStream(new byte[0]), 0, 0)
    134. .build();
    135. ObjectWriteResponse objectWriteResponse = minioClient.putObject(putObjectArgs);
    136. return objectWriteResponse;
    137. }
    138. }

    创建桶和文件夹

    通过MinioClient 对象,指定 Bucket 即可进行相应的操作。

    桶操作接口测试

    1. @Slf4j
    2. @RestController
    3. @RequestMapping(value = "product/file")
    4. public class FileController {
    5. @Autowired
    6. private MinioUtil minioUtil;
    7. @Autowired
    8. private MinioConfig prop;
    9. //查看存储bucket是否存在
    10. @GetMapping("/bucketExists")
    11. public Boolean bucketExists(@RequestParam("bucketName") String bucketName) {
    12. return minioUtil.bucketExists(bucketName);
    13. }
    14. //创建存储bucket
    15. @GetMapping("/makeBucket")
    16. public Boolean makeBucket(String bucketName) {
    17. return minioUtil.makeBucket(bucketName);
    18. }
    19. //删除存储bucket
    20. @GetMapping("/removeBucket")
    21. public Boolean removeBucket(String bucketName) {
    22. return minioUtil.removeBucket(bucketName);
    23. }
    24. }

    新创建桶ipadmini6下,创建文件夹

    1. //创建文件夹
    2. @GetMapping("/createBucketFolder")
    3. public void createBucketFolder(String bucketName) throws Exception {
    4. String folderName = "aa/bb";
    5. ObjectWriteResponse response = minioUtil.createBucketFolder(bucketName, folderName);
    6. }

    递归查询桶ipadmini6下所有对象


    桶中对象信息查询

    查询所有的桶

    1. //获取全部bucket
    2. @GetMapping("/getAllBuckets")
    3. public List getAllBuckets() {
    4. List allBuckets = minioUtil.getAllBuckets();
    5. allBuckets.forEach(bucket -> {
    6. System.out.printf("存储桶名:%s,创建时间:%s \n", bucket.name(), bucket.creationDate());
    7. });
    8. return allBuckets;
    9. }

    查询桶下所有对象

    1. //查询桶下所有对象
    2. @GetMapping("/listObjects")
    3. public void listObjects(@RequestParam("bucketName") String bucketName) throws Exception {
    4. Iterable> listObjects = minioUtil.listObjects(bucketName);
    5. for (Result result : listObjects) {
    6. Item item = result.get();
    7. System.out.println(item.objectName() + "\t" + item.size());
    8. }
    9. }

    控制台打印:

    递归查询桶下所有对象

    1. //递归查询桶下所有对象
    2. @GetMapping("/recursiveListObjects")
    3. public void recursiveListObjects(@RequestParam("bucketName") String bucketName) throws Exception {
    4. Iterable> listObjects = minioUtil.recursiveListObjects(bucketName);
    5. for (Result result : listObjects) {
    6. Item item = result.get();
    7. System.out.println(item.objectName() + "\t" + item.size());
    8. }
    9. }

    控制台打印:

    指定前缀查询桶下所有对象

    1. //指定前缀查询
    2. @GetMapping("/getBucketAllFile")
    3. public void getBucketAllFile(@RequestParam("bucketName") String bucketName) throws Exception {
    4. Iterable> listObjects = minioUtil.getBucketAllFile(bucketName,"兰兰/img",true);
    5. for (Result result : listObjects) {
    6. Item item = result.get();
    7. System.out.println(item.objectName() + "\t" + item.size());
    8. }
    9. }


    文件上传下载

    上传本地文件(path)

    OSS没有文件夹的概念,所有资源都是以文件来存储,但您可以通过创建一个以正斜线(/)结尾,大小为0的Object来创建模拟文件夹。

    MinioUtil中继续新增

    1. /**
    2. * 上传本地文件,根据路径上传
    3. * minio 采用文件内容上传,可以换成上面的流上传
    4. *
    5. * @param filePath 上传本地文件路径
    6. * @Param bucketName 上传至服务器的桶名称
    7. */
    8. public boolean uploadPath(String filePath, String bucketName) throws Exception {
    9. File file = new File(filePath);
    10. if (!file.isFile()) {
    11. throw new RuntimeException("上传文件为空,请重新上传");
    12. }
    13. if (!StringUtils.hasLength(bucketName)) {
    14. throw new RuntimeException("传入桶名为空,请重新上传");
    15. }
    16. if (!this.bucketExists(bucketName)) {
    17. throw new RuntimeException("当前操作的桶不存在!");
    18. }
    19. String minioFilename = UUID.randomUUID().toString() + "_" + file.getName();//获取文件名称
    20. String fileType = minioFilename.substring(minioFilename.lastIndexOf(".") + 1);
    21. minioClient.uploadObject(
    22. UploadObjectArgs.builder()
    23. .bucket(bucketName)
    24. .object(minioFilename)//文件存储在minio中的名字
    25. .filename(filePath)//上传本地文件存储的路径
    26. .contentType(fileType)//文件类型
    27. .build());
    28. return this.getBucketFileExist(minioFilename, bucketName);
    29. }

    上传后,结果只能在桶下吗

    • objectName,是指文件的路径,即存储桶下文件的相对路径

    略微修改一下objectName

    再上传,就能看到


    上传文件(MultipartFile)

    MinioUtil中继续新增

    使用的是建造者模式,创建PutObjectArgs参数对象

    1. /**
    2. * 根据MultipartFile file上传文件
    3. * minio 采用文件流上传,可以换成下面的文件上传
    4. *
    5. * @param file 上传的文件
    6. * @param bucketName 上传至服务器的桶名称
    7. */
    8. public boolean uploadFile(MultipartFile file, String bucketName) throws Exception {
    9. if (file == null || file.getSize() == 0 || file.isEmpty()) {
    10. throw new RuntimeException("上传文件为空,请重新上传");
    11. }
    12. if (!this.bucketExists(bucketName)) {
    13. throw new RuntimeException("当前操作的桶不存在!");
    14. }
    15. // 获取上传的文件名
    16. String filename = file.getOriginalFilename();
    17. assert filename != null;
    18. //可以选择生成一个minio中存储的文件名称
    19. String minioFilename = UUID.randomUUID().toString() + "_" + filename;
    20. InputStream inputStream = file.getInputStream();
    21. long size = file.getSize();
    22. String contentType = file.getContentType();
    23. // Upload known sized input stream.
    24. minioClient.putObject(
    25. PutObjectArgs.builder()
    26. .bucket(bucketName) //上传到指定桶里面
    27. .object(minioFilename)//文件在minio中存储的名字
    28. //p1:上传的文件流;p2:上传文件总大小;p3:上传的分片大小
    29. .stream(inputStream, size, -1) //上传分片文件流大小,如果分文件上传可以采用这种形式
    30. .contentType(contentType) //文件的类型
    31. .build());
    32. return this.getBucketFileExist(minioFilename, bucketName);
    33. }

    再去尝试上传一个很大的(433MB)文件,报错

    报错的原因是: springBoot项目自带的tomcat对上传的文件大小有默认的限制,SpringBoot官方文档中展示:每个文件的配置最大为1Mb,单次请求的文件的总数不能大于10Mb。


    文件大小限制配置

    重新指定最大限制

    1. spring:
    2. servlet:
    3. multipart:
    4. enabled: true
    5. max-file-size: 500MB
    6. max-request-size: 500MB
    • spring.servlet.multipart.enabled:表示是否开启文件上传支持,默认为 true
    • spring.servlet.multipart.file-size-threshold:表示文件写入磁盘的阀值,默认为 0
    • spring.servlet.multipart.location:表示上传文件的临时保存位置
    • spring.servlet.multipart.max-file-size:表示上传的单个文件的最大大小,默认为 1MB
    • spring.servlet.multipart.max-request-size:表示多文件上传时文件的总大小,默认为 10MB
    • spring.servlet.multipart.resolve-lazily:表示文件是否延迟解析,默认为 false

    重新调用 上传文件(MultipartFile)


    文件下载(path)

    MinioUtil中继续新增

    1. /**
    2. * 文件下载到指定路径
    3. *
    4. * @param downloadPath 下载到本地路径
    5. * @param bucketName 下载指定服务器的桶名称
    6. * @param objectName 下载的文件名称
    7. */
    8. public void downloadPath(String downloadPath, String bucketName, String objectName) throws Exception {
    9. if (downloadPath.isEmpty() || !StringUtils.hasLength(bucketName) || !StringUtils.hasLength(objectName)) {
    10. throw new RuntimeException("下载文件参数不全!");
    11. }
    12. if (!new File(downloadPath).isDirectory()) {
    13. throw new RuntimeException("本地下载路径必须是一个文件夹或者文件路径!");
    14. }
    15. if (!this.bucketExists(bucketName)) {
    16. throw new RuntimeException("当前操作的桶不存在!");
    17. }
    18. downloadPath += objectName;
    19. minioClient.downloadObject(
    20. DownloadObjectArgs.builder()
    21. .bucket(bucketName) //指定是在哪一个桶下载
    22. .object(objectName)//是minio中文件存储的名字;本地上传的文件是user.xlsx到minio中存储的是user-minio,那么这里就是user-minio
    23. .filename(downloadPath)//需要下载到本地的路径,一定是带上保存的文件名;如 d:\\minio\\user.xlsx
    24. .build());
    25. }

    文件预览

    在分片上传的时候如果没有设置分片类型,会默认设置分片类型为application/octet-stream。这样的话不能在线观看,只能够进行下载再查看资源,从而不利于前端直接引用地址。


    大文件分片(前端传)

    CustomMinioClient

    思路:

    1. 前端对文件进行切片,并且记录切片总数
    2. 访问后端预上传接口,该接口仅仅处理目标文件上传url并且返回给前端,没有太多的资源占用。
    3. 前端获取到返回的预上传url后,循环分片进行上传。
    4. 在前端上传完分片后,请求后端文件合并接口对目标分片进行合并。

    这样可以最大限度的利用前端的性能,而不是只发请求到后端,并且实现了前端直接对接minio服务器的功能。

    1. public class CustomMinioClient extends MinioClient {
    2. public CustomMinioClient(MinioClient client) {
    3. super(client);
    4. }
    5. /**
    6. * 创建分片上传请求
    7. *
    8. * @param bucket 存储桶
    9. * @param region 区域
    10. * @param object 对象名
    11. * @param headers 消息头
    12. * @param extraQueryParams 额外查询参数
    13. */
    14. public String initMultiPartUpload(String bucket, String region, String object, Multimap headers, Multimap extraQueryParams) throws Exception {
    15. CreateMultipartUploadResponse response = this.createMultipartUpload(bucket, region, object, headers, extraQueryParams);
    16. return response.result().uploadId();
    17. }
    18. /**
    19. * 完成分片上传,执行合并文件
    20. *
    21. * @param bucketName 存储桶
    22. * @param region 区域
    23. * @param objectName 对象名
    24. * @param uploadId 上传ID
    25. * @param parts 分片
    26. * @param extraHeaders 额外消息头
    27. * @param extraQueryParams 额外查询参数
    28. */
    29. public ObjectWriteResponse mergeMultipartUpload(String bucketName, String region, String objectName, String uploadId, Part[] parts, Multimap extraHeaders, Multimap extraQueryParams) throws Exception {
    30. return this.completeMultipartUpload(bucketName, region, objectName, uploadId, parts, extraHeaders, extraQueryParams);
    31. }
    32. /**
    33. * 查询分片数据
    34. *
    35. * @param bucketName 存储桶
    36. * @param region 区域
    37. * @param objectName 对象名
    38. * @param uploadId 上传ID
    39. * @param extraHeaders 额外消息头
    40. * @param extraQueryParams 额外查询参数
    41. */
    42. public ListPartsResponse listMultipart(String bucketName, String region, String objectName, Integer maxParts, Integer partNumberMarker, String uploadId, Multimap extraHeaders, Multimap extraQueryParams) throws Exception {
    43. return this.listParts(bucketName, region, objectName, maxParts, partNumberMarker, uploadId, extraHeaders, extraQueryParams);
    44. }
    45. }

    分片规则

    在构建InputStream时,会进行分片操作,我们可以了解到上传文件大小的一些限制:

    • 分片大小不能小于5MB,大于5GB
    • 对象大小不能超过5TiB
    • partSize传入-1,默认按照5MB进行分割
    • 分片数量不能超过10000

    分片规则如下:

    1. // 参数为 文件大小objectSize、分片大小partSize,分片数我们传入的是-1,表示使用默认配置
    2. protected long[] getPartInfo(long objectSize, long partSize) {
    3. // 1. 校验大小,如果设置的分片大小 小于5M或者大于5GB,报错不支持
    4. // 对象大小超过5TiB,报错不支持
    5. this.validateSizes(objectSize, partSize);
    6. if (objectSize < 0L) {
    7. return new long[]{partSize, -1L};
    8. } else {
    9. // 2. 没有设置分片数据大小,怎按照默认的5M进行分割
    10. if (partSize <= 0L) {
    11. double dPartSize = Math.ceil((double)objectSize / 10000.0D);
    12. dPartSize = Math.ceil(dPartSize / 5242880.0D) * 5242880.0D;
    13. partSize = (long)dPartSize;
    14. }
    15. if (partSize > objectSize) {
    16. partSize = objectSize;
    17. }
    18. long partCount = partSize > 0L ? (long)Math.ceil((double)objectSize / (double)partSize) : 1L;
    19. // 3. 分片数量不能超过10000
    20. if (partCount > 10000L) {
    21. throw new IllegalArgumentException("object size " + objectSize + " and part size " + partSize + " make more than " + 10000 + "parts for upload");
    22. } else {
    23. // 4. 返回一个数组,第一个值为分片数据大小,第二个为分片数量
    24. return new long[]{partSize, partCount};
    25. }
    26. }
    27. }

    创建分片请求(获取uploadId)

    createMultipartUpload方法会创建分块请求,根据对象名和存储桶名去Minio获取上传当前对象的uploadId。

    uploadId在循环中使用的都是同一个,说明分片上传的时候都会使用同一个uploadId,最后合并同一个uploadId的文件。

    用户调用初始化接口,后端调用minio初始化,得到uploadId,生成每个分片的minio上传url

    1. /**
    2. * 初始化分片上传
    3. *
    4. * @param bucket 桶
    5. * @param objectName 文件全路径名称
    6. * @param partCount 分片数量
    7. * @param contentType 类型,如果类型使用默认流会导致无法预览
    8. * @return /
    9. */
    10. public Map initMultiPartUpload(String bucket, String objectName, int partCount, String contentType) {
    11. Map result = new HashMap<>();
    12. try {
    13. if (StrUtil.isBlank(contentType)) {
    14. contentType = "application/octet-stream";
    15. }
    16. HashMultimap headers = HashMultimap.create();
    17. headers.put("Content-Type", contentType);
    18. String uploadId = customMinioClient.initMultiPartUpload(bucket, null, objectName, headers, null);
    19. result.put("uploadId", uploadId);
    20. List partList = new ArrayList<>();
    21. Map reqParams = new HashMap<>();
    22. //reqParams.put("response-content-type", "application/json");
    23. reqParams.put("uploadId", uploadId);
    24. for (int i = 1; i <= partCount; i++) {
    25. reqParams.put("partNumber", String.valueOf(i));
    26. String uploadUrl = customMinioClient.getPresignedObjectUrl(
    27. GetPresignedObjectUrlArgs.builder()
    28. .method(Method.PUT)
    29. .bucket(bucket)
    30. .object(objectName)
    31. .expiry(1, TimeUnit.DAYS)
    32. .extraQueryParams(reqParams)
    33. .build());
    34. partList.add(uploadUrl);
    35. }
    36. result.put("uploadUrls", partList);
    37. } catch (Exception e) {
    38. e.printStackTrace();
    39. return null;
    40. }
    41. return result;
    42. }

    测试,我们把partCount给成4


    上传分片

    获取到了uploadId以后,就会执行上传操作,调用uploadPart方法,uploadPart最终也是调用execute,可以看到该方法,是调用的OkHttpClient 去执行的。


    合并文件

    MinioUtil中继续新增

    1. /**
    2. * 分片上传完后合并
    3. *
    4. * @param objectName 文件全路径名称
    5. * @param uploadId 返回的uploadId
    6. * @return /
    7. */
    8. public boolean mergeMultipartUpload(String bucket, String objectName, String uploadId) {
    9. try {
    10. //TODO::目前仅做了最大1000分片
    11. Part[] parts = new Part[1000];
    12. ListPartsResponse partResult = customMinioClient.listMultipart(bucket, null, objectName, 1000, 0, uploadId, null, null);
    13. int partNumber = 1;
    14. for (Part part : partResult.result().partList()) {
    15. parts[partNumber - 1] = new Part(partNumber, part.etag());
    16. partNumber++;
    17. }
    18. customMinioClient.mergeMultipartUpload(bucket, null, objectName, uploadId, parts, null, null);
    19. } catch (Exception e) {
    20. e.printStackTrace();
    21. return false;
    22. }
    23. return true;
    24. }

    大文件分片(后端传)

    思路:

    1. 前端上传文件到web后台。
    2. 后端对文件进行切割,并且记录切割段数。
    3. 后端调用minio上传api。
    4. 等待分片全部上传后再调用合并文件

    分片上传和断点续传的实现过程中,需要在Minio内部记录已上传的分片文件。

    这些分片文件将以文件md5作为父目录,分片文件的名字按照01,02,...的顺序进行命名。同时,还必须知道当前文件的分片总数,这样就能够根据总数来判断文件是否上传完毕了。

    比如,一个文件被分成了10片,所以总数是10。当前端发起上传请求时,把一个个文件分片依次上传,Minio 服务器中存储的临时文件依次是01、02、03 等等。

    假设前端把05分片上传完毕了之后断开了连接,由于 Minio 服务器仍然存储着01~05的分片文件,因此前端再次上传文件时,只需从06序号开始上传分片,而不用从头开始传输。这就是所谓的断点续传

  • 相关阅读:
    Langchain使用介绍之-文档加载
    文本分类之DPCNN的原理(Pytorch实现)
    sqlmap防御以及文件读写
    机器学习:在线学习和离线学习区别
    基于开放共享的自主研发—MaxCompute 持续增强生态与开放性建设
    搭建个人网站大体流程
    as86 - 8086..80386 处理器的汇编器
    python教程:if else和where true 流程控制正确使用教程
    16:00面试,16:08就出来了,问的问题有点变态。。。
    typeScript--[接口属性interface]
  • 原文地址:https://blog.csdn.net/m0_56799642/article/details/133189431