Skip to content

Commit

Permalink
list resume
Browse files Browse the repository at this point in the history
增加功能:接着最后一次运行的结果,继续往下遍历源的文件列表
  • Loading branch information
kitmanzheng committed Jan 21, 2020
1 parent eba04fa commit dfdec5d
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 45 deletions.
3 changes: 3 additions & 0 deletions conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ executeTimeWindow=00:00,24:00
# 迁移成功的结果,按日期归档此目录,为空即不输出。格式每一行为:绝对路径\t文件大小\t最后修改时间,该目录需要存在。
outputFinishedFileFolder=./result

# 是否接着最后一次运行的结果,继续往下遍历源的文件列表
resume=false

# 从本地迁移到COS配置分节
[migrateLocal]
# 本地路径, 表示将该路径下的数据都迁移到COS, 对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c
Expand Down
Binary file modified dep/cos_migrate_tool-1.0-jar-with-dependencies.jar
Binary file not shown.
13 changes: 13 additions & 0 deletions src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ public class CommonConfig {
private String batchTaskPath = "";
private boolean realTimeCompare = false;
private String outputFinishedFilePath = "";
private boolean isResume = false;

public void setResume(String isResume) {
if (isResume.compareToIgnoreCase("true") == 0) {
this.isResume = true;
} else if (isResume.compareToIgnoreCase("false") != 0) {
throw new IllegalArgumentException("resume invalid.should be true or false");
}
}

public boolean isResume() {
return this.isResume;
}

public void setRealTimeCompare(String realTimeCompare) throws IllegalArgumentException {
if (realTimeCompare.equalsIgnoreCase("on")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class ConfigParser {
private static final String COMMON_BATCH_TASK_PATH = "batchTaskPath";
private static final String COMMON_REAL_TIME_COMPARE = "realTimeCompare";
private static final String COMMON_OUTPUT_FINISHED_FILE = "outputFinishedFileFolder";
private static final String COMMON_RESUME = "resume";

private static final String LOCAL_SECTION_NAME = "migrateLocal";
private static final String LOCAL_LOCALPATH = "localPath";
Expand Down Expand Up @@ -546,7 +547,11 @@ private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) {
commonConfig.setOutputFinishedFilePath(finishedFileFolder);
}


String resume = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_RESUME);
if (resume!=null && !resume.isEmpty()) {
commonConfig.setResume(resume);
}

} catch (Exception e) {
System.err.println(e.getMessage());
log.error(e.getMessage());
Expand Down
88 changes: 87 additions & 1 deletion src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.qcloud.cos_migrate_tool.record;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;

import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
Expand Down Expand Up @@ -38,12 +42,14 @@ public enum QUERY_RESULT {
private RocksDB db;
private Options options;
private final String requestIdPrefix = "x-cos-requestId-";
private String dbFolder;


public RecordDb() {}

public boolean init(String historyDbFolder, String comment) {

try {
dbFolder = historyDbFolder;
options = new Options();
options.setCreateIfMissing(true);
options.setWriteBufferSize(16 * SizeUnit.MB).setMaxWriteBufferNumber(4)
Expand All @@ -70,6 +76,86 @@ public boolean init(String historyDbFolder, String comment) {
}
return true;
}

public boolean saveListProgress(String prefix, String marker) {
String value = prefix + "|" + marker;
return saveKV("listProgress", value);
}


public String[] getListProgress() {
String value = queryKV("listProgress");
if (value == null) {
return null;
}

int i = value.lastIndexOf("|");
if (i < 0) {
return null;
}

String prefix = value.substring(0, i);
String marker = "";
if (i != value.length()-1) {
marker = value.substring(i+1, value.length()-1);
}

String[] arr1 = new String[] {prefix, marker};
return arr1;

}

public boolean saveDirProgress(String curDir, String lastItr, LinkedList<String> dirList) {

String progressFile = this.dbFolder + "/PROGRESS";

try {
BufferedOutputStream bos =
new BufferedOutputStream(new FileOutputStream(progressFile, false));
bos.write(lastItr.getBytes(ENCODING_TYPE));
bos.write("\n".getBytes(ENCODING_TYPE));
bos.write(curDir.getBytes(ENCODING_TYPE));
bos.write("\n".getBytes(ENCODING_TYPE));
for (String x: dirList) {
bos.write(x.getBytes(ENCODING_TYPE));
bos.write("\n".getBytes(ENCODING_TYPE));
}

bos.close();
} catch (FileNotFoundException e) {
log.error(e.toString());
return false;
} catch (IOException e) {
log.error(e.toString());
return false;
}

return true;
}

public LinkedList<String> getDirProgress() {
String progressFile = this.dbFolder + "/PROGRESS";
LinkedList<String> result = new LinkedList<String>();
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(progressFile)));
String line;
while ((line = bufferedReader.readLine()) != null) {
result.addLast(line);
log.info("[{}]", line);
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
//log.error(e.toString());
//e.printStackTrace();
return null;
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e.toString());
e.printStackTrace();
return null;
}
return result;
}

// 保存记录
public boolean saveRecord(RecordElement recordElement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,18 @@ public String buildTaskDbFolderPath() {
}

public void buildTask() {
final int maxKeys = 200;
final int maxKeys = 1000;
final String keyPrefix = this.srcPrefix;
String nextMarker = "";
ObjectListing objectListing;

int retry_num = 0;

String[] progress = this.recordDb.getListProgress();
if (config.isResume() && progress != null) {
nextMarker = progress[1];
}

do {
try {
do {
Expand All @@ -103,13 +108,17 @@ public void buildTask() {
// AddTask
MigrateAliTask task = new MigrateAliTask(config, ossClient,
com.qcloud.cos.utils.UrlEncoderUtils.urlDecode(s.getKey()),
s.getSize(), s.getETag(), s.getLastModified(), smallFileTransferManager,
bigFileTransferManager, recordDb, semaphore);
s.getSize(), s.getETag(), s.getLastModified(),
smallFileTransferManager, bigFileTransferManager, recordDb,
semaphore);

AddTask(task);
AddTask(task);
}
nextMarker = com.qcloud.cos.utils.UrlEncoderUtils
.urlDecode(objectListing.getNextMarker());
if (nextMarker != null) {
this.recordDb.saveListProgress(keyPrefix, nextMarker);
}
} while (objectListing.isTruncated());

TaskStatics.instance.setListFinished(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,29 +90,40 @@ public String buildTaskDbFolderPath() {
}

public void buildTask() {

String nextMarker = "";
String[] progress = this.recordDb.getListProgress();
if (config.isResume() && progress != null) {
nextMarker = progress[1];
}
try {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(srcBucket);
listObjectsRequest.setPrefix(srcPrefix);
listObjectsRequest.setMarker(nextMarker);
ObjectListing objectListing = null;
do {

objectListing = s3Client.listObjects(listObjectsRequest);
for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
// AddTask
MigrateAwsTask task = new MigrateAwsTask(config, s3Client,
objectSummary.getKey(), objectSummary.getSize(),
objectSummary.getETag(), smallFileTransferManager,
bigFileTransferManager, recordDb, semaphore);
log.info("list key: {}, size: {}, etag: {}", objectSummary.getKey(), objectSummary.getSize(), objectSummary.getETag());
log.info("list key: {}, size: {}, etag: {}", objectSummary.getKey(),
objectSummary.getSize(), objectSummary.getETag());

AddTask(task);
AddTask(task);
}
nextMarker = objectListing.getNextMarker();
listObjectsRequest.setMarker(nextMarker);
if (nextMarker != null) {
this.recordDb.saveListProgress(srcPrefix, nextMarker);
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());

TaskStatics.instance.setListFinished(true);

} catch (AmazonServiceException ase) {
log.error("list fail AmazonServiceException errorcode: {}, msg: {}", ase.getErrorCode(),
ase.getMessage());
Expand All @@ -122,7 +133,7 @@ public void buildTask() {
TaskStatics.instance.setListFinished(false);
} catch (Exception e) {
log.error(e.getMessage());
TaskStatics.instance.setListFinished(false);
TaskStatics.instance.setListFinished(false);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ public void buildTask() {
e1.printStackTrace();
return;
}

BufferedReader bufferedReader = new BufferedReader(read);
String srcKey = null;

try {
while ((srcKey = bufferedReader.readLine()) != null){
while ((srcKey = bufferedReader.readLine()) != null) {

srcKey = UrlEncoderUtils.urlDecode(srcKey);

String copyDestKey = null;
Expand All @@ -116,18 +116,18 @@ public void buildTask() {
} else {
copyDestKey = config.getCosPath() + srcKey;
}
MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore,
(CopyBucketConfig) config, smallFileTransferManager,
bigFileTransferManager, recordDb, srcCosClient, srcKey, 0,
"", copyDestKey);
AddTask(task);

MigrateCopyBucketTask task =
new MigrateCopyBucketTask(semaphore, (CopyBucketConfig) config,
smallFileTransferManager, bigFileTransferManager, recordDb,
srcCosClient, srcKey, 0, "", copyDestKey);

AddTask(task);

}

TaskStatics.instance.setListFinished(true);

} catch (IOException e) {
log.error(e.toString());
TaskStatics.instance.setListFinished(false);
Expand All @@ -137,7 +137,7 @@ public void buildTask() {
e.printStackTrace();
TaskStatics.instance.setListFinished(false);
}

try {
bufferedReader.close();
} catch (IOException e) {
Expand All @@ -148,43 +148,51 @@ public void buildTask() {
} catch (IOException e) {
e.printStackTrace();
}


} else {
String nextMarker = "";
String[] progress = this.recordDb.getListProgress();
if (config.isResume() && progress != null) {
nextMarker = progress[1];
}

ListObjectsRequest listObjectsRequest =
new ListObjectsRequest(srcBucketName, srcCosPath, null, null, 1000);
new ListObjectsRequest(srcBucketName, srcCosPath, nextMarker, null, 1000);

ObjectListing objectListing;
int retry_num = 0;

do {
try {
while (true) {
listObjectsRequest.setMarker(nextMarker);
objectListing = srcCosClient.listObjects(listObjectsRequest);
List<COSObjectSummary> cosObjectSummaries =
objectListing.getObjectSummaries();

for (COSObjectSummary cosObjectSummary : cosObjectSummaries) {
String srcKey = cosObjectSummary.getKey();
String srcEtag = cosObjectSummary.getETag();
long srcSize = cosObjectSummary.getSize();
String keyName = srcKey.substring(lastDelimiter);
String copyDestKey = config.getCosPath() + keyName;

MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore,
(CopyBucketConfig) config, smallFileTransferManager,
bigFileTransferManager, recordDb, srcCosClient, srcKey, srcSize,
srcEtag, copyDestKey);

AddTask(task);
}

nextMarker = objectListing.getNextMarker();
if (nextMarker != null) {
this.recordDb.saveListProgress(srcCosPath, nextMarker);
}
if (!objectListing.isTruncated()) {
break;
}

listObjectsRequest.setMarker(objectListing.getNextMarker());

}

TaskStatics.instance.setListFinished(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ private String buildCOSPath() {

@Override
public void doTask() {


String cosPath = buildCOSPath();

this.etag = this.lastModify.toString();

MigrateCompetitorRecordElement upyunRecordElement = new MigrateCompetitorRecordElement(
MigrateType.MIGRATE_FROM_UPYUN, config.getBucketName(), cosPath, etag, fileSize);

Expand Down
Loading

0 comments on commit dfdec5d

Please sign in to comment.