Skip to content

Commit

Permalink
update upyun
Browse files Browse the repository at this point in the history
1. add retry
2. compare md5
  • Loading branch information
kitmanzheng committed Dec 31, 2019
1 parent 37a3264 commit 1d667e6
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 118 deletions.
Binary file modified dep/cos_migrate_tool-1.0-jar-with-dependencies.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class ConfigParser {
private static final String OSS_PROXY_HOST = "proxyHost";
private static final String OSS_PROXY_PORT = "proxyPort";
private static final String OSS_URL_LIST = "uriList";
private static final String UPYUN_COMPARE_MD5 = "compareMd5";

private static final String QINIU_NEED_SIGN = "needSign";

Expand Down Expand Up @@ -690,7 +691,23 @@ private boolean initCopyFromUpyunConfig(Preferences prefs, CopyFromUpyunConfig c
if (!initCopyFromCompetitorConfig(prefs, copyUpyunConfig)) {
return false;
}




String compareMd5 = getConfigValue(prefs, UPYUN_SECTION_NAME, UPYUN_COMPARE_MD5);
if (compareMd5 != null) {
if (compareMd5.compareToIgnoreCase("off") == 0) {
copyUpyunConfig.setCompareMd5(false);
} else if (compareMd5.compareToIgnoreCase("on") == 0) {
copyUpyunConfig.setCompareMd5(true);
} else {
String errMsg = "upyun section compareMd5 invalid,need to be \"on\" or \"off\".\n";
System.err.println(errMsg);
log.error(errMsg);
return false;
}
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,13 @@


public class CopyFromUpyunConfig extends CopyFromCompetitorConfig {

private boolean compareMd5 = true;

public boolean isCompareMd5() {
return compareMd5;
}

public void setCompareMd5(boolean compareMd5) {
this.compareMd5 = compareMd5;
}
}
116 changes: 87 additions & 29 deletions src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUpyunTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,23 @@ public class MigrateUpyunTask extends Task {
private String contentType;

public MigrateUpyunTask(CopyFromUpyunConfig config, UpYun upyun, String srcKey, long fileSize,
Date lastModify, String contentType, TransferManager smallFileTransfer, TransferManager bigFileTransfer,
RecordDb recordDb, Semaphore semaphore) {
Date lastModify, String contentType, TransferManager smallFileTransfer,
TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) {
super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb);
this.upyun = upyun;
//this.upyun = upyun;

//又拍云sdk多线程有坑,headers不对
this.upyun = new UpYun(config.getSrcBucket(), config.getSrcAccessKeyId(), config.getSrcAccessKeySecret());
this.upyun.setTimeout(60);
this.upyun.setApiDomain(UpYun.ED_AUTO);

if (!config.getSrcProxyHost().isEmpty() && config.getSrcProxyPort() > 0) {
System.setProperty("java.net.useSystemProxies", "true");
System.setProperty("http.proxyHost", config.getSrcProxyHost());
System.setProperty("http.proxyPort", Integer.toString(config.getSrcProxyPort()));
}


this.srcKey = srcKey;
this.fileSize = fileSize;
this.contentType = contentType;
Expand Down Expand Up @@ -73,7 +86,6 @@ public void doTask() {
String cosPath = buildCOSPath();

this.etag = this.lastModify.toString();
// System.out.println(this.etag);

MigrateCompetitorRecordElement upyunRecordElement = new MigrateCompetitorRecordElement(
MigrateType.MIGRATE_FROM_UPYUN, config.getBucketName(), cosPath, etag, fileSize);
Expand Down Expand Up @@ -118,30 +130,73 @@ public void doTask() {

String localPath = config.getTempFolderPath() + UUID.randomUUID().toString();
File localFile = new File(localPath);
try {
boolean success =
upyun.readFile(UrlEncoderUtils.encodeEscapeDelimiter(this.srcKey), localFile);
if (!success) {
String errMsg = String.format("[fail] taskInfo: %s, No Exception",
upyunRecordElement.buildKey());
System.err.println(errMsg);
log.error(errMsg);
TaskStatics.instance.addFailCnt();
int retry_limit = 5;
boolean download_success = false;
String contentMd5 = "";

do {
try {

if (((CopyFromUpyunConfig) config).isCompareMd5()) {
Map<String, String> headers = this.upyun.getFileInfo(UrlEncoderUtils.encodeEscapeDelimiter(this.srcKey));
if (headers == null || !headers.containsKey("Content-MD5")) {
String errMsg = String
.format("[fail] taskInfo: %s, can't get fileinfo or content-md5", upyunRecordElement.buildKey());
System.err.println(errMsg);
log.error(errMsg);
TaskStatics.instance.addFailCnt();
return;
}

contentMd5 = headers.get("Content-MD5");
}

download_success = this.upyun
.readFile(UrlEncoderUtils.encodeEscapeDelimiter(this.srcKey), localFile);
if (!download_success) {
String errMsg = String.format("[fail] taskInfo: %s, No Exception",
upyunRecordElement.buildKey());
System.err.println(errMsg);
log.error(errMsg);
TaskStatics.instance.addFailCnt();
if (localFile.exists()) {
localFile.delete();
}
}


} catch (Exception e) {
download_success = false;
retry_limit--;

try {
Thread.sleep(300);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

if (retry_limit == 0) {
String errMsg =
String.format("[fail] taskInfo: %s, Caught an Exception, error msg: %s",
upyunRecordElement.buildKey(), e.toString());
System.err.println(errMsg);
log.error(errMsg);
TaskStatics.instance.addFailCnt();
if (localFile.exists()) {
localFile.delete();
}
return;
}

if (localFile.exists()) {
localFile.delete();
}

}
} catch (Exception e) {
String errMsg = String.format("[fail] taskInfo: %s, Caught an Exception, error msg: %s",
upyunRecordElement.buildKey(), e.toString());
System.err.println(errMsg);
log.error(errMsg);
TaskStatics.instance.addFailCnt();
if (localFile.exists()) {
localFile.delete();
}
return;
}


} while (!download_success && retry_limit > 0);

// upload

Expand All @@ -166,8 +221,12 @@ public void doTask() {
}

com.qcloud.cos.model.ObjectMetadata cosMetadata = new com.qcloud.cos.model.ObjectMetadata();
cosMetadata.setContentType(this.contentType);

if (((CopyFromUpyunConfig) config).isCompareMd5()) {
cosMetadata.addUserMetadata("upyun-etag", contentMd5);
}

cosMetadata.setContentType(contentType);

try {
String requestId = uploadFile(config.getBucketName(), cosPath, localFile,
config.getStorageClass(), config.isEntireFileMd5Attached(), cosMetadata, null);
Expand All @@ -183,10 +242,9 @@ public void doTask() {
System.out.println(printMsg);
log.info(printMsg);
} catch (Exception e) {
String printMsg = String.format("[fail] task_info: %s", upyunRecordElement.buildKey());
String printMsg = String.format("[fail] task_info: %s exception: %s", upyunRecordElement.buildKey(), e.toString());
System.err.println(printMsg);
log.error("[fail] task_info: {}, exception: {}", upyunRecordElement.buildKey(),
e.toString());
log.error(printMsg);
TaskStatics.instance.addFailCnt();
} finally {
localFile.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public MigrateUpyunTaskExecutor(CopyFromUpyunConfig config) {
this.upyun = new UpYun(this.srcBucket, this.srcAccessKeyId, this.srcAccessKeySecret);
upyun.setTimeout(60);
upyun.setApiDomain(UpYun.ED_AUTO);

if (!config.getSrcProxyHost().isEmpty() && config.getSrcProxyPort() > 0) {
System.out.println("use proxy");
System.setProperty("java.net.useSystemProxies", "true");
Expand Down Expand Up @@ -80,102 +80,60 @@ public String buildTaskDbFolderPath() {

public void buildTask() {

try {
LinkedList<String> dirList = new LinkedList<String>();
dirList.add("");
while (!dirList.isEmpty()) {
String curDir = dirList.removeFirst();
String lastItr = "";

FolderItemIter folderItemIter;
do {
Map<String, String> params = new HashMap<String, String>();

params.put("x-list-iter", lastItr);
params.put("x-list-limit", "1000");

folderItemIter = upyun.readDirIter(curDir, params);
lastItr = folderItemIter.iter;
for (int i = 0; i < folderItemIter.files.size(); ++i) {
if (folderItemIter.files.get(i).type.equals("folder")) {

dirList.add(curDir + "/" + folderItemIter.files.get(i).name);
} else {
MigrateUpyunTask task = new MigrateUpyunTask(config, upyun,
curDir + "/" + folderItemIter.files.get(i).name,
folderItemIter.files.get(i).size,
folderItemIter.files.get(i).date, folderItemIter.files.get(i).type, smallFileTransferManager,
bigFileTransferManager, recordDb, semaphore);

AddTask(task);
}
}
} while (folderItemIter.files.size() > 0);
}

TaskStatics.instance.setListFinished(true);

} catch (Exception e) {
log.error(e.getMessage());
TaskStatics.instance.setListFinished(false);
}



/*
final int maxKeys = 200;
final String keyPrefix = this.srcPrefix;
String nextMarker = "";
ObjectListing objectListing;
int retry_num = 0;
LinkedList<String> dirList = new LinkedList<String>();
dirList.add("");

do {
try {
do {
objectListing = ossClient.listObjects(new ListObjectsRequest(this.srcBucket)
.withPrefix(keyPrefix).withMarker(nextMarker).withMaxKeys(maxKeys)
.withEncodingType("url"));
log.info("list next marker: " + nextMarker);
List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
for (OSSObjectSummary s : sums) {
// AddTask
MigrateAliTask task = new MigrateAliTask(config, ossClient,
com.qcloud.cos.utils.UrlEncoderUtils.urlDecode(s.getKey()),
s.getSize(), s.getETag(), s.getLastModified(), smallFileTransferManager,
bigFileTransferManager, recordDb, semaphore);
AddTask(task);
}
nextMarker = com.qcloud.cos.utils.UrlEncoderUtils
.urlDecode(objectListing.getNextMarker());
} while (objectListing.isTruncated());
while (!dirList.isEmpty()) {
String curDir = dirList.removeFirst();
String lastItr = "";

FolderItemIter folderItemIter;
do {
Map<String, String> params = new HashMap<String, String>();

params.put("x-list-iter", lastItr);
params.put("x-list-limit", "1000");

folderItemIter = upyun.readDirIter(curDir, params);
lastItr = folderItemIter.iter;
for (int i = 0; i < folderItemIter.files.size(); ++i) {
if (folderItemIter.files.get(i).type.equals("folder")) {

dirList.add(curDir + "/" + folderItemIter.files.get(i).name);
} else {
MigrateUpyunTask task = new MigrateUpyunTask(config, null,
curDir + "/" + folderItemIter.files.get(i).name,
folderItemIter.files.get(i).size,
folderItemIter.files.get(i).date,
folderItemIter.files.get(i).type, smallFileTransferManager,
bigFileTransferManager, recordDb, semaphore);

AddTask(task);
}
}
} while (folderItemIter.files.size() > 0);
}

TaskStatics.instance.setListFinished(true);
return;
} catch (OSSException e) {
log.error("list fail msg: {}", e.getMessage());
TaskStatics.instance.setListFinished(false);
if (e.getErrorCode().equalsIgnoreCase("AccessDenied")) {
System.out.println(e.getMessage());
break;
}
} catch (ClientException e) {
log.error("list fail msg: {}", e.getMessage());
TaskStatics.instance.setListFinished(false);
if (e.getErrorCode().equalsIgnoreCase("AccessDenied")) {
System.out.println(e.getMessage());
break;
}

} catch (Exception e) {
log.error(e.getMessage());
log.error("retry_time:{}, Exception:{}", retry_num,e.getMessage());
TaskStatics.instance.setListFinished(false);
}

try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

retry_num++;
} while (retry_num < 20);
*/
} while (retry_num < 300);

}

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/qcloud/cos_migrate_tool/task/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.qcloud.cos.transfer.Upload;
import com.qcloud.cos.utils.Md5Utils;
import com.qcloud.cos_migrate_tool.config.CommonConfig;
import com.qcloud.cos_migrate_tool.config.ConfigParser;
import com.qcloud.cos_migrate_tool.config.MigrateType;
import com.qcloud.cos_migrate_tool.record.RecordDb;
import com.qcloud.cos_migrate_tool.record.RecordDb.QUERY_RESULT;
import com.qcloud.cos_migrate_tool.record.RecordElement;
Expand Down Expand Up @@ -213,6 +215,15 @@ public String uploadFile(String bucketName, String cosPath, File localFile,

if (entireMd5Attached) {
String md5 = Md5Utils.md5Hex(localFile);

String upyunTag = objectMetadata.getUserMetaDataOf("upyun-etag");
if (upyunTag != null) {
if (!md5.equalsIgnoreCase(upyunTag)) {
String exceptionMsg = String.format("md5 is not match upyun[%s] local[%s]",
upyunTag, md5);
throw new Exception(exceptionMsg);
}
}
objectMetadata.addUserMetadata("md5", md5);
}

Expand Down

0 comments on commit 1d667e6

Please sign in to comment.