断点续传是指在文件传输过程中,当传输中断或失败时,能够恢复传输并继续从上次中断的位置继续传输。
RandomAccessFile是Java提供的一个用于文件读写的类,它可以对文件进行随机访问,即可以直接跳转到文件的任意位置进行读写操作。
seek()方法 | 将文件指针移动到指定位置。这样可以在文件中随机访问不同的位置。(绝对定位) |
skipBytes()方法 | 用于将文件指针向后移动指定的字节数。这个方法可以用于跳过一定数量的字节,以便在文件中定位到需要读取或写入的位置。(相对定位) |
read()方法 | 从当前指针处读取数据,也可以使用readFully()方法读取一定长度的数据。 |
write()方法 | 向文件中指定位置写入数据,也可以使用writeBytes()方法将字节数组写入文件。 |
RandomAccessFile的seek()方法是断点续传的关键.我们将文件资源分成若干块,每个线程负责完成每块资源的传输。
实现断点续传可以通过以下步骤来完成:
File
类的length()
方法获取。RandomAccessFile
类来进行文件读写操作,并设置以读或写方式打开文件。seek(long position)
方法将文件指针定位到起始传输位置。实现代码
- import java.io.*;
- import java.nio.charset.StandardCharsets;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.StringJoiner;
- import java.util.concurrent.ConcurrentHashMap;
-
- public class FileUtils {
- /**
- * 支持断点续传
- * @src 拷贝的原文件
- * @desc 拷贝的位置
- * @threadNum 开启的线程数
- */
- public static void transportFile(File src, File desc, int threadNum) throws Exception {
- // 每一个线程读取的大小
- Long part = (long)Math.ceil(src.length() / threadNum);
- // 存储多个线程、用于阻塞主线程
- List
list = new ArrayList<>(); -
- // 定义一个基于多线程 的 hashmap 高并发map
- final Map
map = new ConcurrentHashMap<>(); - // 读取 日志文件中的数据
- String[] $data = null ;
-
- String logName = desc.getCanonicalPath() + ".log";
-
- File fl = new File(logName);
-
- if (fl.exists()) {
- BufferedReader reader = new BufferedReader(new FileReader(fl));
- String data = reader.readLine();
- // 拆分 字符串
- $data = data.split(",");
- reader.close();
- }
-
- final String[] _data = $data ;
-
- for (int i = 0; i < threadNum; i++) {
- final int k = i ;
- Thread thread = new Thread(() -> {
- // 线程具体要做的事情
- RandomAccessFile log = null ;
- try {
- RandomAccessFile in = new RandomAccessFile(src, "r");
- RandomAccessFile out = new RandomAccessFile(desc, "rw");
-
- log = new RandomAccessFile(logName, "rw");
- // 从指定位置读
- in.seek(_data ==null ?k * part : Long.parseLong(_data[k]) );
- out.seek(_data ==null ?k * part : Long.parseLong(_data[k]) );
-
- byte[] bytes = new byte[1024 * 2];
- int len = -1, plen = 0;
-
- while (true) {
- len = in.read(bytes);
-
- if (len == -1) {
- break;
- }
- // 如果不等于 -1 , 则 累加求和
- plen += len;
-
- // 将读取的字节数,放入 到 map 中
- map.put(k, plen + (_data ==null ?k * part : Long.parseLong(_data[k])) );
-
- // 将读取到的数据、进行写入
- out.write(bytes, 0, len);
- // 将 map 中的数据进行写入文件中
- log.seek(0); // 直接覆盖全部文件
- StringJoiner joiner = new StringJoiner(",");
- map.forEach((key, val)-> joiner.add(String.valueOf(val)));
- log.write(joiner.toString().getBytes(StandardCharsets.UTF_8));
-
- if (plen + (_data ==null ? k * part : Long.parseLong(_data[k])) >= (k+1) * part ) {
- break;
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- try {
- if (log !=null) log.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- });
- thread.start();
- // 把这5个线程保存到集合中
- list.add(thread);
- }
- for(Thread t : list) {
- t.join(); // 将线程加入,并阻塞主线程
- }
- // 读取完成后、将日志文件删除即可
- new File(logName).delete();
- }
-
- /**
- * 支持断点续传
- * @src 拷贝的原文件
- * @desc 拷贝的位置
- */
- public static void transportFile(File src, File desc) throws Exception {
- transportFile(src, desc, 5);
- }
-
- public static void transportFile(String src, String desc) throws Exception {
- transportFile(new File(src), new File(desc));
- }
- }
需要上传的文件资源
将iso资源文件上传到指定目录
在上传文件过程中中断程序,模拟文件资源因为网络原因中断了
当前文件资源还未上传完全,并且有一个日志去标志文件上传进度
日志文件标志了每个线程进度
重新执行程序,模拟重新上传文件
剩余文件上传完毕,进度日志也被删除