• 实现分片上传、断点续传、秒传 (JS+NodeJS)(TypeScript)


    一、引入及效果

            上传文件是一个很常见的操作,但是当文件很大时,上传花费的时间会非常长,上传的操作就会具有不确定性,如果不小心连接断开,那么文件就需要重新上传,导致浪费时间和网络资源

            所以,当涉及到大文件上传时,分片上传和断点续传就显得很有必要。把文件分成多个分片,一片片上传,最终服务端再进行合并操作。如果遇到网络中断的问题,已上传的分片就无需上传了,实现断点续传

            使用效果:   photo.lhbbb.top/video/slice.mp4 (将分片大小设置为了1*1024B,便于观察)

    二、思路

    1.文件唯一标识

            因为需要进行多个分片的上传、续传等操作,我们需要知道这个文件的唯一标识,这样后端才能知道这些分片属于哪个文件,断点续传的时候也能判断这个文件是否曾经上传过。

            方法一:可以通过计算文件的md5,来得到文件的唯一标识。(文件被修改过 或 不同文件的唯一标识都是不同的)

            优点:文件标识的唯一性可以保证,且有现成的计算md5的库可以调用

            缺点:计算文件的md5是一个非常耗时的操作,文件越大,花费的时间越多。JS又是单线程的,计算md5的过程中无法执行其它操作。

            方法二:大文件计算md5花费的时间很长,就退而求其次, 使用 文件名+文件最后修改时间+文件大小 作为唯一标识。

            优点:计算速度快,因为这些数据在文件的File对象上都有。

            缺点:有极小概率可能导致标识重复。

            综合方法:可以这样:文件大小低于某个阈值,使用方法一,文件过大时,使用方法二

    2.前端

    1. 计算唯一标识:通过input标签,拿到File文件之后,我们就可以计算唯一标识了。

    2. 秒传:计算好唯一标识后,拿着这个唯一标识去请求后端接口,看看该文件是否曾经上传过,如果上传过,就直接显示“上传成功”,也就是实现秒传

    3. 断点续传:如果曾经没有完整上传成功过,就开始对文件进行分片。分好片后,拿着分片的信息和唯一标识,请求后端接口,看看这个文件及其分片是否 “上传了,但没完全上传”,如果符合,就进行断点续传。 (这里我实现的思路是,传给后端分片的总长度,后端去看后端文件夹中有几个分片,返回缺少的分片序号。也可以通过对每个分片进行计算唯一标识,更加稳妥,但是比较费时)

    4. 分片上传:如果这个文件完全没有上传过,就开始走正常的分片上传流程。遍历所有分片,把分片通过请求的方式发送到后端。 (注意:浏览器的请求并发数量一般是6个,如果一次性把所有分片请求都发送了,会导致后续的请求需要等待。对此,我们可以使用Promise并发池进行控制,减缓HTTP压力)

    5. 完整性校验:全部分片上传完毕后,需要进行完整性校验,避免某个分片遇到故障没有成功上传。发送请求给后端,后端来判断分片数量是否正常,若不正常就上传缺少的分片。

    6. 发送合并请求:完整性校验通过后,发送合并请求,后端进行文件合并,返回文件访问路径。

    总结: 计算唯一标识 --> 若上传过,则秒传 -->  分片 --> 若上传过部分,则断点续传 --> 正常分片上传 --> 完整性校验 --> 合并请求

    3.后端

    1. 判断文件是否曾经完整上传过:根据文件md5,查看对应目录下的文件是否存在

    2. 判断文件是否上传过切片:根据文件md5,查看对应目录中缺少哪些切片

    3. 分片上传的接收:把分配存储到一个临时文件夹中,文件夹名字用 唯一标识 命名,方便以后查找。每个分片的命名用分片的序号,便于校验分片是否缺失。

    4. 完整性校验:根据md5和分片总数,去检查对应的临时文件夹下缺少哪些文件序号,返回缺少的序号数组。

    5. 合并文件:按照分片序号顺序,把分片写入一个新文件中,然后返回这个新文件的访问路径

    三、前端实现

    秒传和断点续传放最后讲,就是一个函数执行顺序的问题。

    一些用到的ts类型如下:

    1. /**进度条函数的类型,参数是进度,是一个小数,需要 *100 才是正常进度条 */
    2. type onProgress = (progress: number) => any
    3. /**切片的类型 */
    4. interface sliceType {
    5. /**分片的序号 */
    6. flag: number;
    7. /**分片的二进制数据 */
    8. blob: Blob;
    9. }

    1. 计算文件唯一标识

    这里计算md5使用了一个库,需要npm安装:  npm i spark-md5

    1. /**获得文件的md5,用来作为唯一索引 - 文件过大时,获取md5会很长很长时间,可以考虑用 文件名+文件最后修改时间+文件大小 做“唯一”标识*/
    2. const getMd5 = (file: File) => {
    3. return new Promise<string>(async (resolve, reject) => {
    4. try {
    5. const reader = new FileReader();
    6. reader.readAsArrayBuffer(file);
    7. // 当文件读取完成时,计算文件MD5值
    8. reader.onloadend = function (e) {
    9. if (!e.target?.result) {
    10. return reject('文件读取失败')
    11. }
    12. const spark = new SparkMD5.ArrayBuffer()
    13. spark.append(e.target.result as ArrayBuffer)
    14. const md5 = spark.end()
    15. resolve(md5)
    16. }
    17. } catch (error) {
    18. reject(error)
    19. }
    20. })
    21. }

    2. 对文件分片

    File对象的原型对象(Blob)上有一个slice方法,我们可以通过这个来进行分片

    传入文件对象和需要的单个分片的大小,单位字节  (比如传入 5*1024*1024 就是5MB)

    得到一个分片数组,每个分片包含分片序号和对应的Blob

    1. /**文件切片 */
    2. const fileSlice = (file: File, chunkSize: number) => {
    3. const result: sliceType[] = []
    4. let index = 0
    5. for (let nowSize = 0; nowSize < file.size; nowSize += chunkSize) {
    6. result.push({
    7. flag: index,
    8. blob: file.slice(nowSize, nowSize + chunkSize),
    9. })
    10. index++
    11. }
    12. return result
    13. }

    3. 把分片上传

    1. /**生成上传切片的promise函数 */
    2. const getSliceUploadPromise = (slice: sliceType, md5: string) => {
    3. const formData = new FormData();
    4. formData.append(`file`, slice.blob);
    5. formData.append(`index`, String(slice.flag));
    6. formData.append(`md5`, md5);
    7. return () => request.postByForm('/sliceUpload/upload', formData) //这里填写自己封装的请求函数
    8. }
    9. /**把所有切片上传 */
    10. const sliceUpload = async (sliceList: sliceType[], md5: string, onProgress: onProgress) => {
    11. const taskList: (() => Promise<string>)[] = []
    12. const length = sliceList.length
    13. for (let i = 0; i < length; i++) {
    14. taskList.push(getSliceUploadPromise(sliceList[i], md5))
    15. }
    16. //使用并发池优化,避免堵塞
    17. const res = await promisePool<string, string>(taskList, 5, (count) => onProgress(count / length))
    18. return res
    19. }

    为了避免请求拥塞,这里使用promise并发池进行优化,最大并发数量5。(而不是一次性把所有请求都发出去)

    1. /**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
    2. * @param taskList 任务列表
    3. * @param max 最大并发数量
    4. * @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条
    5. * @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
    6. * @template T 泛型T会自动填写,是promise成功的结果
    7. * @template Err 此泛型是promise错误的结果 (因为 成功和失败都会放入res,所以加个泛型可以便于ts判断)
    8. */
    9. const promisePool = async Err>(taskList: (() => Promise)[], max: number, oneFinishCallback?: (count: number, res: T | Err) => any) => {
    10. return new Promise<ArrayErr>>(async (resolve, reject) => {
    11. type resType = T | Err
    12. try {
    13. const length = taskList.length
    14. const pool: Promise[] = []//并发池
    15. let count = 0//当前结束了几个
    16. const res = new Array(length)
    17. for (let i = 0; i < length; i++) {
    18. let task = taskList[i]();
    19. //成功和失败都要执行的函数
    20. const handler = (_res: resType) => {
    21. pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务
    22. res[i] = _res //放入结果数组
    23. count++
    24. oneFinishCallback && oneFinishCallback(count, _res)
    25. if (count === length) {
    26. return resolve(res)
    27. }
    28. }
    29. task.then((data) => {
    30. handler(data)
    31. console.log(`第${i}个任务完成,结果为`, data);
    32. }, (err) => {
    33. handler(err)
    34. console.log(`第${i}个任务失败,原因为`, err);
    35. })
    36. pool.push(task);
    37. if (pool.length === max) {
    38. //Promise.race:返回最快执行的promise
    39. //利用Promise.race来看获得哪个任务完成的信号
    40. //搭配await,一旦发现有任务完成了,就继续for循环,把并发池塞满
    41. await Promise.race(pool)
    42. }
    43. }
    44. } catch (error) {
    45. console.error('promise并发池出错', error);
    46. reject(error)
    47. }
    48. })
    49. }

    4. 完整性校验 (同时适配断点续传)

    分片上传完毕后,就要进行完整性校验,判断分片是否完整。

    getArr函数:发送请求给后端,后端返回缺少的分片序号数组 (返回空数组代表分片完整)
    完整性校验:根据缺少的分片序号,把缺少的分片继续上传 (这里兼容了断点续传)。同时限制了重试次数,当超过5次仍然没有完整上传,就判断此次上传失败。

    1. /**请求后端,获得缺少的分片序号数组*/
    2. const getArr = async () => (await request.post('/sliceUpload/integrityCheck', { count: sliceList.length, md5 })).missingArr
    3. /**完整性校验,缺少的继续上传 (断点续传) */
    4. const integrityCheck = async (sliceList: sliceType[], md5: string, onProgress: onProgress) => {
    5. let maxTest = 5 //最大尝试次数,避免无限尝试
    6. /**缺少的序号数组 */
    7. let missingArr: number[] = await getArr()
    8. /**总分片数量 */
    9. const sliceListLength = sliceList.length
    10. onProgress((sliceListLength - missingArr.length) / sliceListLength)//更新进度条
    11. while (missingArr.length) {
    12. const tasks: (() => Promise<string>)[] = []
    13. for (let i = 0; i < missingArr.length; i++) {
    14. tasks.push(getSliceUploadPromise(sliceList[missingArr[i]], md5))
    15. }
    16. //使用并发池优化请求
    17. await promisePool<string, string>(tasks, 5, (count) => onProgress((sliceListLength - (missingArr.length - count)) / sliceListLength))//这里的count是缺少的部分中,完成的数量,作为进度条。
    18. missingArr = await getArr() //上传完成后,再次进行完整性校验。
    19. maxTest--
    20. if (maxTest === 0) {
    21. return Promise.reject('尝试五次仍未上传成功')
    22. }
    23. }
    24. }

    5. 发送合并请求

    通过完整性校验后,就来到了最后一步,合并切片 (这里主要是后端干活,前端不做过多解释)

    1. /**合并切片,拿到路径 */
    2. const merge = async (file: File, md5: string) => {
    3. const suffix = getSuffix(file.name) //拿到后缀
    4. const path = await request.post('/sliceUpload/merge', { md5, suffix })
    5. return path
    6. }

    6.秒传

    对秒传的判断,在第一步和第二步之间 (计算好唯一标识后就能进行秒传判断了)(这里主要是后端干活,前端不做过多解释)

    1. /**秒传 - 判断文件是否上传过,如果上传过了就直接返回文件路径,不用分片即上传 */
    2. const isUploaded = async (file: File, md5: string) => {
    3. const res: isUploadedRes = await request.post('/sliceUpload/isUploaded', { md5, suffix })
    4. return res
    5. }
    6. // 注:关于 /sliceUpload/isUploaded 接口的返回值:
    7. type isUploadedRes = {
    8. /**是否上传完整了 */
    9. flag: boolean
    10. /**如果上传过,路径是什么 (没上传的话为空) */
    11. path: string
    12. /**(仅在flag为false有效)是否上传过,但没上传完整? 为true就走断点续传 (请调用完整性校验接口) */
    13. noComplete: boolean
    14. }

    7.完整流程

    搭配上面的函数观看。整体流程需要根据后端给的接口返回值来进行修改,但整体思路不变

    1. /**分片上传 - 只支持单文件
    2. * @param file 文件
    3. * @param chunkSize 一个分片的大小
    4. * @param setTip 可以用于文字提示
    5. * @param onProgress 上传进度的回调,参数是进度
    6. * @returns 上传文件的url
    7. */
    8. export async function uploadBySlice(file: File, chunkSize: number, setTip: (tip: string) => any , onProgress: onProgress) {
    9. setTip("正在计算md5");
    10. const md5 = await getMd5(file)
    11. const isUploadedFlag = await isUploaded(file, md5)
    12. if (isUploadedFlag.flag) {//已经上传过,就直接返回路径,实现秒传
    13. onProgress(1)
    14. setTip('文件上传成功')
    15. return isUploadedFlag.path
    16. } else {
    17. setTip('正在进行切片')
    18. const sliceList = fileSlice(file, chunkSize)
    19. console.log('切片', sliceList);
    20. if (!isUploadedFlag.noComplete) { //没传递过的文件,才走完整的上传流程 (断点续传)
    21. setTip('正在进行文件上传')
    22. await sliceUpload(sliceList, md5, onProgress)
    23. } else {
    24. setTip('正在进行断点续传')
    25. }
    26. await integrityCheck(sliceList, md5, onProgress)//不管是断点续传还是正常上传,都走这个函数,进行复用 (正常上传的也要校验完整性,断点续传的也要根据校验的完整性来继续上传)
    27. setTip("正在合并文件")
    28. const path = await merge(file, md5)
    29. setTip("文件上传成功!")
    30. return path
    31. }
    32. }

    四、后端实现

    这里使用 NextJS (NodeJS) 做后端,由于各个框架使用不同,仅写出关键步骤

    下面代码中使用的一些,在nodejs中关于文件操作的函数:

    1. //本文件进行一些I/O操作
    2. import fs from 'fs'
    3. import path from 'path'
    4. import 'server-only'//代表仅服务端可使用
    5. /**写入Buffer文件在指定路径下。路径不存在时将会创建路径 */
    6. export const writeFile = (filePath: string, buffer: Buffer) => {
    7. return new Promise<string>(async (resolve, reject) => {
    8. try {
    9. const directory = path.dirname(filePath);
    10. fs.mkdir(directory, { recursive: true }, (err) => {
    11. if (err) {
    12. reject(err);
    13. } else {
    14. fs.writeFile(filePath, buffer, (err) => {
    15. if (err) {
    16. reject(err);
    17. } else {
    18. resolve(filePath);
    19. }
    20. });
    21. }
    22. });
    23. } catch (error) {
    24. reject(error)
    25. }
    26. })
    27. }
    28. /**删除指定路径的文件 */
    29. export const deleteFile = (path: string) => {
    30. return new Promise<void>(async (resolve, reject) => {
    31. try {
    32. fs.unlink(path, (err) => {
    33. if (err) reject(err)
    34. else resolve()
    35. })
    36. } catch (error) {
    37. reject(error)
    38. }
    39. })
    40. }
    41. /**删除指定文件夹及其所有文件。 */
    42. export const deleteFolderRecursive = (folderPath: string) => {
    43. if (fs.existsSync(folderPath)) {
    44. fs.readdirSync(folderPath).forEach((file) => {
    45. const currentPath = `${folderPath}/${file}`;
    46. if (fs.lstatSync(currentPath).isDirectory()) {
    47. // 递归删除子文件夹
    48. deleteFolderRecursive(currentPath);
    49. } else {
    50. // 删除文件
    51. fs.unlinkSync(currentPath);
    52. }
    53. });
    54. // 删除空文件夹
    55. fs.rmdirSync(folderPath);
    56. }
    57. };
    58. /**在文件末尾追加,不存在的话会新增目录 */
    59. export const appendToFile = (text: string | Buffer, filePath: string, errFn?: (err: NodeJS.ErrnoException | null) => void) => {
    60. return new Promise<void>(async (resolve, reject) => {
    61. try {
    62. const directory = path.dirname(filePath);
    63. fs.mkdir(directory, { recursive: true }, (err) => {
    64. if (err) {
    65. reject(err);
    66. return;
    67. }
    68. fs.appendFile(filePath, text, (err) => {
    69. if (err) {
    70. errFn?.(err);
    71. reject(err);
    72. } else {
    73. resolve();
    74. }
    75. });
    76. });
    77. } catch (error) {
    78. reject(error)
    79. }
    80. })
    81. }
    82. /**获得路径文件夹下的所有文件 */
    83. export const getDir = (directoryPath: string) => {
    84. return new PromiseDirent[]>(async (resolve, reject) => {
    85. try {
    86. fs.readdir(directoryPath, { withFileTypes: true }, (err, files) => {
    87. if (err) {
    88. reject(err);
    89. return;
    90. }
    91. resolve(files)
    92. })
    93. } catch (error) {
    94. reject(error)
    95. }
    96. })
    97. }
    98. /**判断文件(文件夹)是否存在。 */
    99. export const fileExists = (filePath: string): boolean => {
    100. return fs.existsSync(filePath);
    101. };

    1.判断文件是否曾经上传过

    /sliceUpload/isUploaded 接口

    1. const { md5, suffix } = await xxxxx() //获取body或query的参数,进行参数校验
    2. const realPath = `https://xxx.com/xxxxxx/${md5}${suffix}` //外界访问的路径
    3. const targetFilePath = `/aaaa/${md5}${suffix}` ; //这里是文件存放的路径 (如果曾经完整上传过)
    4. /**是否完整的上传过 */
    5. const flag = fileExists(targetFilePath)
    6. /**临时文件夹下是否有这个md5的临时文件夹,有就代表可以进行断点续传 */
    7. const noComplete = fileExists(getAbsPath(`/temp/${md5}`))
    8. return resFn.success({
    9. flag: flag,
    10. path: flag ? realPath : "",
    11. noComplete
    12. });

    2.接收分片上传的文件

    /sliceUpload/upload 接口

    1. const [files, otherData] = await getFormData(request) //从formdata中取出文件和其它数据 (自行根据框架封装)
    2. if (!files[0]) throw '文件不存在'
    3. await writeFile(`/temp/${otherData.md5}/${otherData.index}`, files[0]) //存放在 /temp/{md5}/ 目录下
    4. return resFn.success('操作成功');

    3.完整性校验接口

    /sliceUpload/integrityCheck

    1. const { count, md5 } = await xxxx(request) //获取前端传递来的数据
    2. const files = await getDir(`/temp/${md5}/`) //拿到这个临时文件夹下的所有文件
    3. const judgeSet = new Set(Array.from({ length: count }, (k, i) => i)) //根据分片总数,生成一个set (假设分片总数为10,那么这里就是 0,1,2...,10 的一个set)
    4. //把文件夹中有的文件序号,从set中删除
    5. files.forEach((file) => {
    6. judgeSet.delete(parseInt(file.name))
    7. })
    8. //返回缺少的序号,空数组代表通过完整性校验
    9. return resFn.success({
    10. missingArr: [...judgeSet]
    11. });

    4.合并文件接口

    /sliceUpload/merge

    1. const { md5, suffix } = await xxx() //拿到前端传来的参数
    2. const tempDirName = `/temp/${md5}/` //临时文件夹的路径
    3. const files = await getDir(tempDirName)//拿到临时文件夹下的全部文件
    4. files.sort((a, b) => parseInt(a.name) - parseInt(b.name));// 按照数字顺序排序文件名数组
    5. //按照切片顺序,把切片的文件写入新文件
    6. for (let i = 0; i < files.length; i++) {
    7. const file = files[i];
    8. const content = fs.readFileSync(`/temp/${md5}/${file.name}`);//切片的位置
    9. await appendToFile(content, `/xxxx/${md5}${suffix}`) // 写入在服务器上存放文件的路径
    10. }
    11. deleteFolderRecursive(tempDirName)//删除temp文件夹下的切片文件
    12. const realPath = `http://xxxx.com/xxxxx/${md5}${suffix}`//外界访问的路径
    13. return resFn.success(realPath);

    五、总结

            主要是需要有思路,思路有了就很好写了。上面的思路是自己想的,所以可能有些地方不完善,如果有不对的地方欢迎指出

  • 相关阅读:
    Ngnix优化
    micro-app-源码解析4-数据通信篇-终篇
    (项目笔记)opencv人脸识别
    蓝桥杯备战刷题three(自用)
    知识图谱从入门到应用——知识图谱的知识表示:向量表示方法
    Java ~ Reference ~ SoftReference
    企业im即时通讯软件私有化部署,确保信息安全与高效办公
    艾美捷QuickTiter 逆转录病毒定量试剂盒的制备方案
    自学斯坦福计算机全部课程参考
    手游帧率不稳定的原因是什么?
  • 原文地址:https://blog.csdn.net/m0_64130892/article/details/134089516