diff --git a/conf/config.ini b/conf/config.ini index fd9e579..5d34a5a 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -52,6 +52,9 @@ executeTimeWindow=00:00,24:00 # 迁移成功的结果,按日期归档此目录,为空即不输出。格式每一行为:绝对路径\t文件大小\t最后修改时间,该目录需要存在。 outputFinishedFileFolder=./result +# 是否接着最后一次运行的结果,继续往下遍历源的文件列表 +resume=false + # 从本地迁移到COS配置分节 [migrateLocal] # 本地路径, 表示将该路径下的数据都迁移到COS, 对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c diff --git a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar index 516386b..d4c93e7 100644 Binary files a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar and b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar differ diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java index 51aa042..e60380f 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java @@ -36,6 +36,19 @@ public class CommonConfig { private String batchTaskPath = ""; private boolean realTimeCompare = false; private String outputFinishedFilePath = ""; + private boolean isResume = false; + + public void setResume(String isResume) { + if (isResume.compareToIgnoreCase("true") == 0) { + this.isResume = true; + } else if (isResume.compareToIgnoreCase("false") != 0) { + throw new IllegalArgumentException("resume invalid.should be true or false"); + } + } + + public boolean isResume() { + return this.isResume; + } public void setRealTimeCompare(String realTimeCompare) throws IllegalArgumentException { if (realTimeCompare.equalsIgnoreCase("on")) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java index 998ab3d..156a050 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java @@ -52,6 +52,7 @@ public class ConfigParser { private static final String COMMON_BATCH_TASK_PATH = "batchTaskPath"; private static final String COMMON_REAL_TIME_COMPARE = "realTimeCompare"; private static final String COMMON_OUTPUT_FINISHED_FILE = "outputFinishedFileFolder"; + private static final String COMMON_RESUME = "resume"; private static final String LOCAL_SECTION_NAME = "migrateLocal"; private static final String LOCAL_LOCALPATH = "localPath"; @@ -546,7 +547,11 @@ private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) { commonConfig.setOutputFinishedFilePath(finishedFileFolder); } - + String resume = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_RESUME); + if (resume!=null && !resume.isEmpty()) { + commonConfig.setResume(resume); + } + } catch (Exception e) { System.err.println(e.getMessage()); log.error(e.getMessage()); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java index 5b0e882..defe394 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java @@ -1,10 +1,14 @@ package com.qcloud.cos_migrate_tool.record; import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; +import java.util.LinkedList; import org.rocksdb.FlushOptions; import org.rocksdb.Options; @@ -38,12 +42,14 @@ public enum QUERY_RESULT { private RocksDB db; private Options options; private final String requestIdPrefix = "x-cos-requestId-"; + private String dbFolder; + public RecordDb() {} public boolean init(String historyDbFolder, String comment) { - try { + dbFolder = historyDbFolder; options = new Options(); options.setCreateIfMissing(true); options.setWriteBufferSize(16 * SizeUnit.MB).setMaxWriteBufferNumber(4) @@ -70,6 +76,86 @@ public boolean init(String historyDbFolder, String comment) { } return true; } + + public boolean saveListProgress(String prefix, String marker) { + String value = prefix + "|" + marker; + return saveKV("listProgress", value); + } + + + public String[] getListProgress() { + String value = queryKV("listProgress"); + if (value == null) { + return null; + } + + int i = value.lastIndexOf("|"); + if (i < 0) { + return null; + } + + String prefix = value.substring(0, i); + String marker = ""; + if (i != value.length()-1) { + marker = value.substring(i+1, value.length()-1); + } + + String[] arr1 = new String[] {prefix, marker}; + return arr1; + + } + + public boolean saveDirProgress(String curDir, String lastItr, LinkedList dirList) { + + String progressFile = this.dbFolder + "/PROGRESS"; + + try { + BufferedOutputStream bos = + new BufferedOutputStream(new FileOutputStream(progressFile, false)); + bos.write(lastItr.getBytes(ENCODING_TYPE)); + bos.write("\n".getBytes(ENCODING_TYPE)); + bos.write(curDir.getBytes(ENCODING_TYPE)); + bos.write("\n".getBytes(ENCODING_TYPE)); + for (String x: dirList) { + bos.write(x.getBytes(ENCODING_TYPE)); + bos.write("\n".getBytes(ENCODING_TYPE)); + } + + bos.close(); + } catch (FileNotFoundException e) { + log.error(e.toString()); + return false; + } catch (IOException e) { + log.error(e.toString()); + return false; + } + + return true; + } + + public LinkedList getDirProgress() { + String progressFile = this.dbFolder + "/PROGRESS"; + LinkedList result = new LinkedList(); + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(progressFile))); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.addLast(line); + log.info("[{}]", line); + } + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + //log.error(e.toString()); + //e.printStackTrace(); + return null; + } catch (IOException e) { + // TODO Auto-generated catch block + log.error(e.toString()); + e.printStackTrace(); + return null; + } + return result; + } // 保存记录 public boolean saveRecord(RecordElement recordElement) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java index 92c087a..2677d36 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java @@ -84,13 +84,18 @@ public String buildTaskDbFolderPath() { } public void buildTask() { - final int maxKeys = 200; + final int maxKeys = 1000; final String keyPrefix = this.srcPrefix; String nextMarker = ""; ObjectListing objectListing; int retry_num = 0; + String[] progress = this.recordDb.getListProgress(); + if (config.isResume() && progress != null) { + nextMarker = progress[1]; + } + do { try { do { @@ -103,13 +108,17 @@ public void buildTask() { // AddTask MigrateAliTask task = new MigrateAliTask(config, ossClient, com.qcloud.cos.utils.UrlEncoderUtils.urlDecode(s.getKey()), - s.getSize(), s.getETag(), s.getLastModified(), smallFileTransferManager, - bigFileTransferManager, recordDb, semaphore); + s.getSize(), s.getETag(), s.getLastModified(), + smallFileTransferManager, bigFileTransferManager, recordDb, + semaphore); - AddTask(task); + AddTask(task); } nextMarker = com.qcloud.cos.utils.UrlEncoderUtils .urlDecode(objectListing.getNextMarker()); + if (nextMarker != null) { + this.recordDb.saveListProgress(keyPrefix, nextMarker); + } } while (objectListing.isTruncated()); TaskStatics.instance.setListFinished(true); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java index 31dc00b..64fbf7e 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java @@ -90,13 +90,19 @@ public String buildTaskDbFolderPath() { } public void buildTask() { - + String nextMarker = ""; + String[] progress = this.recordDb.getListProgress(); + if (config.isResume() && progress != null) { + nextMarker = progress[1]; + } try { ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); listObjectsRequest.setBucketName(srcBucket); listObjectsRequest.setPrefix(srcPrefix); + listObjectsRequest.setMarker(nextMarker); ObjectListing objectListing = null; do { + objectListing = s3Client.listObjects(listObjectsRequest); for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { // AddTask @@ -104,15 +110,20 @@ public void buildTask() { objectSummary.getKey(), objectSummary.getSize(), objectSummary.getETag(), smallFileTransferManager, bigFileTransferManager, recordDb, semaphore); - log.info("list key: {}, size: {}, etag: {}", objectSummary.getKey(), objectSummary.getSize(), objectSummary.getETag()); + log.info("list key: {}, size: {}, etag: {}", objectSummary.getKey(), + objectSummary.getSize(), objectSummary.getETag()); - AddTask(task); + AddTask(task); + } + nextMarker = objectListing.getNextMarker(); + listObjectsRequest.setMarker(nextMarker); + if (nextMarker != null) { + this.recordDb.saveListProgress(srcPrefix, nextMarker); } - listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); - + TaskStatics.instance.setListFinished(true); - + } catch (AmazonServiceException ase) { log.error("list fail AmazonServiceException errorcode: {}, msg: {}", ase.getErrorCode(), ase.getMessage()); @@ -122,7 +133,7 @@ public void buildTask() { TaskStatics.instance.setListFinished(false); } catch (Exception e) { log.error(e.getMessage()); - TaskStatics.instance.setListFinished(false); + TaskStatics.instance.setListFinished(false); } } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java index 8732d53..adfd556 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java @@ -100,13 +100,13 @@ public void buildTask() { e1.printStackTrace(); return; } - + BufferedReader bufferedReader = new BufferedReader(read); String srcKey = null; try { - while ((srcKey = bufferedReader.readLine()) != null){ - + while ((srcKey = bufferedReader.readLine()) != null) { + srcKey = UrlEncoderUtils.urlDecode(srcKey); String copyDestKey = null; @@ -116,18 +116,18 @@ public void buildTask() { } else { copyDestKey = config.getCosPath() + srcKey; } - - MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore, - (CopyBucketConfig) config, smallFileTransferManager, - bigFileTransferManager, recordDb, srcCosClient, srcKey, 0, - "", copyDestKey); - - AddTask(task); - + + MigrateCopyBucketTask task = + new MigrateCopyBucketTask(semaphore, (CopyBucketConfig) config, + smallFileTransferManager, bigFileTransferManager, recordDb, + srcCosClient, srcKey, 0, "", copyDestKey); + + AddTask(task); + } - + TaskStatics.instance.setListFinished(true); - + } catch (IOException e) { log.error(e.toString()); TaskStatics.instance.setListFinished(false); @@ -137,7 +137,7 @@ public void buildTask() { e.printStackTrace(); TaskStatics.instance.setListFinished(false); } - + try { bufferedReader.close(); } catch (IOException e) { @@ -148,12 +148,17 @@ public void buildTask() { } catch (IOException e) { e.printStackTrace(); } - + } else { + String nextMarker = ""; + String[] progress = this.recordDb.getListProgress(); + if (config.isResume() && progress != null) { + nextMarker = progress[1]; + } ListObjectsRequest listObjectsRequest = - new ListObjectsRequest(srcBucketName, srcCosPath, null, null, 1000); + new ListObjectsRequest(srcBucketName, srcCosPath, nextMarker, null, 1000); ObjectListing objectListing; int retry_num = 0; @@ -161,30 +166,33 @@ public void buildTask() { do { try { while (true) { + listObjectsRequest.setMarker(nextMarker); objectListing = srcCosClient.listObjects(listObjectsRequest); List cosObjectSummaries = objectListing.getObjectSummaries(); - + for (COSObjectSummary cosObjectSummary : cosObjectSummaries) { String srcKey = cosObjectSummary.getKey(); String srcEtag = cosObjectSummary.getETag(); long srcSize = cosObjectSummary.getSize(); String keyName = srcKey.substring(lastDelimiter); String copyDestKey = config.getCosPath() + keyName; - + MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore, (CopyBucketConfig) config, smallFileTransferManager, bigFileTransferManager, recordDb, srcCosClient, srcKey, srcSize, srcEtag, copyDestKey); - + AddTask(task); } - + nextMarker = objectListing.getNextMarker(); + if (nextMarker != null) { + this.recordDb.saveListProgress(srcCosPath, nextMarker); + } if (!objectListing.isTruncated()) { break; } - - listObjectsRequest.setMarker(objectListing.getNextMarker()); + } TaskStatics.instance.setListFinished(true); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTask.java index d256b1e..5d7e044 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTask.java @@ -82,11 +82,12 @@ private String buildCOSPath() { @Override public void doTask() { + String cosPath = buildCOSPath(); this.etag = this.lastModify.toString(); - + MigrateCompetitorRecordElement upyunRecordElement = new MigrateCompetitorRecordElement( MigrateType.MIGRATE_FROM_UPYUN, config.getBucketName(), cosPath, etag, fileSize); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTaskExecutor.java index dcd697d..90921c5 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTaskExecutor.java @@ -58,8 +58,8 @@ public MigrateUpyunTaskExecutor(CopyFromUpyunConfig config) { System.setProperty("http.proxyHost", config.getSrcProxyHost()); System.setProperty("http.proxyPort", Integer.toString(config.getSrcProxyPort())); } - - + + } @Override @@ -85,12 +85,21 @@ public void buildTask() { int retry_num = 0; LinkedList dirList = new LinkedList(); LinkedList itrList = new LinkedList(); - if (this.srcPrefix.isEmpty()) { - dirList.add("/"); + + LinkedList progress = this.recordDb.getDirProgress(); + + if (config.isResume() && progress!=null) { + itrList.add(progress.removeFirst()); + dirList = progress; + } else { - dirList.add(this.srcPrefix); + if (this.srcPrefix.isEmpty()) { + dirList.add("/"); + } else { + dirList.add(this.srcPrefix); + } + itrList.add(""); } - itrList.add(""); do { String curDir = ""; @@ -112,13 +121,13 @@ public void buildTask() { if (!config.isAscendingOrder()) { params.put("x-list-order", "desc"); } - + folderItemIter = upyun.readDirIter(curDir, params); lastItr = folderItemIter.iter; - for (int i = 0; i < folderItemIter.files.size(); ++i) { - if (folderItemIter.files.get(i).type.equals("folder")) { + for (int i = 0; i < folderItemIter.files.size(); ++i) { + if (folderItemIter.files.get(i).type.equals("folder")) { dirList.add(curDir + folderItemIter.files.get(i).name + "/"); } else { MigrateUpyunTask task = new MigrateUpyunTask(config, null, @@ -131,6 +140,9 @@ public void buildTask() { AddTask(task); } } + + this.recordDb.saveDirProgress(curDir, lastItr, dirList); + } while (folderItemIter.files.size() > 0); }