diff --git a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar index 0092019..7140be0 100644 Binary files a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar and b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar differ diff --git a/opbin/dump_requestid.sh b/opbin/dump_requestid.sh new file mode 100644 index 0000000..28c29d6 --- /dev/null +++ b/opbin/dump_requestid.sh @@ -0,0 +1,12 @@ +#!/bin/bash +export LANG=en_US.utf8 + +cur_dir=$(cd `dirname $0`; cd ..; pwd) +echo ${cur_dir} +cd ${cur_dir} +cp_path=${cur_dir}/src/main/resources:${cur_dir}/dep/* + +export RUN_MODE='DUMP_REQUESTID' +export DUMP_REQUESTID_FILE="${cur_dir}/dump-requestid.txt" +echo "try to dump requestid to ${DUMP_REQUESTID_FILE}" +java -Dfile.encoding=UTF-8 $@ -cp "$cp_path" com.qcloud.cos_migrate_tool.app.App diff --git a/opbin/query_requestid.sh b/opbin/query_requestid.sh new file mode 100644 index 0000000..813b2ee --- /dev/null +++ b/opbin/query_requestid.sh @@ -0,0 +1,11 @@ +#!/bin/bash +export LANG=en_US.utf8 + +cur_dir=$(cd `dirname $0`; cd ..; pwd) +cd ${cur_dir} +cp_path=${cur_dir}/src/main/resources:${cur_dir}/dep/* + +export RUN_MODE='QUERY_REQUESTID' +export QUERY_REQUESTID_KEY="/create_test_data.sh" +echo "try to query requestid ${QUERY_REQUESID_KEY}" +java -Dfile.encoding=UTF-8 $@ -cp "$cp_path" com.qcloud.cos_migrate_tool.app.App diff --git a/pom.xml b/pom.xml index bb22ab6..3666750 100644 --- a/pom.xml +++ b/pom.xml @@ -53,10 +53,16 @@ 1.7.25 + + + + + + - org.iq80.leveldb - leveldb - 0.9 + org.rocksdb + rocksdbjni + 5.14.2 diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java index 44604e3..8538bdb 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java @@ -30,6 +30,8 @@ public class CommonConfig { private int timeWindowBegin = 0; private int timeWindowEnd = 24; private String endpointSuffix = null; + private String cosProxyHost = ""; + private int cosProxyPort = -1; public String getTempFolderPath() { return tempFolderPath; @@ -314,4 +316,20 @@ public int getTimeWindowBegin() { public int getTimeWindowEnd() { return timeWindowEnd; } + + public void setProxyHost(String host) { + this.cosProxyHost = host; + } + + public void setProxyPort(int port) { + this.cosProxyPort = port; + } + + public String getProxyHost() { + return this.cosProxyHost; + } + + public int getProxyPort() { + return this.cosProxyPort; + } } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java index f43f240..e2b5d34 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java @@ -41,6 +41,8 @@ public class ConfigParser { private static final String COMMON_DAEMON_MODE = "daemonMode"; private static final String COMMON_DAEMON_MODE_INTERVAL = "daemonModeInterVal"; private static final String COMMON_EXECUTE_TIME_WINDOW = "executeTimeWindow"; + private static final String COMMON_PROXY_HOST = "proxyHost"; + private static final String COMMON_PROXY_PORT = "proxyPort"; private static final String LOCAL_SECTION_NAME = "migrateLocal"; private static final String LOCAL_LOCALPATH = "localPath"; @@ -438,6 +440,24 @@ private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) { if (endPointSuffixStr != null && !endPointSuffixStr.trim().isEmpty()) { commonConfig.setEndpointSuffix(endPointSuffixStr); } + + String proxyHost = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_PROXY_HOST); + if (proxyHost != null && !proxyHost.trim().isEmpty()) { + commonConfig.setProxyHost(proxyHost); + } + + int port = -1; + String portStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_PROXY_PORT); + + if (portStr != null && !portStr.isEmpty()) { + port = Integer.valueOf(portStr); + if (port > 0) { + commonConfig.setProxyPort(port); + } else { + throw new Exception("invalid cos proxy port"); + } + } + } catch (Exception e) { System.err.println(e.getMessage()); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java index 6b5c2b2..2eb1475 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java @@ -1,11 +1,22 @@ package com.qcloud.cos_migrate_tool.record; -import org.iq80.leveldb.*; +import java.io.BufferedOutputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.rocksdb.FlushOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.util.SizeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.iq80.leveldb.impl.Iq80DBFactory.*; -import java.io.*; +import com.qcloud.cos.utils.IOUtils; /** * HistoryRecordDb 里存储已经上传的记录, key是Element的key, vaule是Element的value @@ -19,24 +30,27 @@ public class RecordDb { public static final Logger log = LoggerFactory.getLogger(RecordDb.class); private static final String ENCODING_TYPE = "UTF-8"; - private static final int CACHE_SIZE = 128 << 20; - private DB db; + private RocksDB db; + private Options options; + private final String requestIdPrefix = "x-cos-requestId-"; public RecordDb() {} public boolean init(String historyDbFolder, String comment) { - Options options = new Options(); - options.cacheSize(CACHE_SIZE); - options.createIfMissing(true); try { - db = factory.open(new File(historyDbFolder), options); - } catch (IOException e) { + options = new Options(); + options.setCreateIfMissing(true); + options.setWriteBufferSize(16 * SizeUnit.MB).setMaxWriteBufferNumber(4) + .setMaxBackgroundCompactions(4); + db = RocksDB.open(options, historyDbFolder); + } catch (RocksDBException e) { log.error(e.toString()); return false; } + String commentFile = historyDbFolder + "/README"; try { BufferedOutputStream bos = @@ -50,7 +64,6 @@ public boolean init(String historyDbFolder, String comment) { log.error(e.toString()); return false; } - return true; } @@ -61,6 +74,62 @@ public boolean saveRecord(RecordElement recordElement) { return saveKV(key, value); } + public boolean saveRequestId(String cosKey, String requestId) { + String dbKey = requestIdPrefix + cosKey; + if (requestId == null) { + log.warn("requestId is null for cosKey " + cosKey); + return saveKV(dbKey, "Null"); + } else { + return saveKV(dbKey, requestId); + } + } + + public void dumpRequestId(String saveFilePath) { + ReadOptions readOptions = null; + RocksIterator rocksIterator = null; + BufferedOutputStream bos = null; + try { + bos = new BufferedOutputStream(new FileOutputStream(saveFilePath)); + readOptions = new ReadOptions(); + rocksIterator = db.newIterator(readOptions); + rocksIterator.seek(requestIdPrefix.getBytes(ENCODING_TYPE)); + while (rocksIterator.isValid()) { + String key = new String(rocksIterator.key(), ENCODING_TYPE).trim(); + key = key.substring(requestIdPrefix.length()); + String value = new String(rocksIterator.value(), ENCODING_TYPE).trim(); + String content = String.format("%s \t %s\n", key, value); + bos.write(content.getBytes(ENCODING_TYPE)); + rocksIterator.next(); + } + } catch (Exception e) { + final String errMsg = "dumpRequestId error."; + System.err.println(errMsg); + log.error(errMsg, e); + } finally { + if (readOptions != null) { + readOptions.close(); + } + if (rocksIterator != null) { + rocksIterator.close(); + } + if (bos != null) { + IOUtils.closeQuietly(bos, log); + } + } + } + + public void queryRequestId(String cosKey) { + String dbKey = requestIdPrefix + cosKey; + String requestIdValue = queryKV(dbKey); + if (requestIdValue == null) { + requestIdValue = "Null"; + } + String infoMsg = String.format("query requestid, [key: %s], [requestid: %s]", cosKey, + requestIdValue); + System.out.println(infoMsg); + log.info(infoMsg); + } + public String buildMultipartUploadSavePointKey(String bucketName, String cosKey, String localFilePath, long mtime, long partSize, long mutlipartUploadThreshold) { String key = String.format( @@ -112,7 +181,7 @@ private String queryKV(String key) { byte[] valueByte; try { valueByte = db.get(key.getBytes(ENCODING_TYPE)); - } catch (DBException e) { + } catch (RocksDBException e) { log.error("query db failed, key:{}, exception: {}", key, e.toString()); return null; } catch (UnsupportedEncodingException e) { @@ -137,7 +206,7 @@ private boolean saveKV(String key, String value) { try { db.put(key.getBytes(ENCODING_TYPE), value.getBytes(ENCODING_TYPE)); return true; - } catch (DBException e) { + } catch (RocksDBException e) { log.error("update db failed, key:{}, value:{}, exception: {}", key, value, e.toString()); return false; @@ -147,12 +216,12 @@ private boolean saveKV(String key, String value) { return false; } } - + private boolean deleteKey(String key) { try { db.delete(key.getBytes(ENCODING_TYPE)); return true; - } catch (DBException e) { + } catch (RocksDBException e) { log.error("update db failed, key:{}, exception: {}", key, e.toString()); return false; } catch (UnsupportedEncodingException e) { @@ -164,8 +233,15 @@ private boolean deleteKey(String key) { public void shutdown() { if (db != null) { try { + FlushOptions flushOptions = new FlushOptions(); + flushOptions.setWaitForFlush(true); + db.flush(flushOptions); + flushOptions.close(); db.close(); - } catch (IOException e) { + if (options != null) { + options.close(); + } + } catch (RocksDBException e) { log.error("close db occur a exception: " + e.toString()); } } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateAliTaskExecutor.java.swp b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateAliTaskExecutor.java.swp new file mode 100644 index 0000000..6e30529 Binary files /dev/null and b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateAliTaskExecutor.java.swp differ diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateAwsTaskExecutor.java.swp b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateAwsTaskExecutor.java.swp new file mode 100644 index 0000000..d612dbd Binary files /dev/null and b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateAwsTaskExecutor.java.swp differ diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateQiniuTaskExecutor.java.swp b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateQiniuTaskExecutor.java.swp new file mode 100644 index 0000000..aa0ad06 Binary files /dev/null and b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateQiniuTaskExecutor.java.swp differ diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateUrllistTaskExecutor.java.swp b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateUrllistTaskExecutor.java.swp new file mode 100644 index 0000000..274c7fe Binary files /dev/null and b/src/main/java/com/qcloud/cos_migrate_tool/task/.MigrateUrllistTaskExecutor.java.swp differ diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java index 29a13fb..5546e97 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java @@ -43,7 +43,7 @@ public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcK } private String buildCOSPath() { - String srcPrefix = ((CopyFromAliConfig)config).getSrcPrefix(); + String srcPrefix = ((CopyFromAliConfig) config).getSrcPrefix(); int lastDelimiter = srcPrefix.lastIndexOf("/"); String keyName = srcKey.substring(lastDelimiter + 1); String cosPrefix = config.getCosPath(); @@ -134,7 +134,7 @@ public void doTask() { TaskStatics.instance.addSkipCnt(); return; } - + Map userMetaMap = null; try { // download @@ -142,7 +142,7 @@ public void doTask() { GetObjectProgressListener downloadProgressListener = new GetObjectProgressListener(srcKey); ObjectMetadata objMeta = ossClient.getObject( - new GetObjectRequest(((CopyFromAliConfig)config).getSrcBucket(), srcKey) + new GetObjectRequest(((CopyFromAliConfig) config).getSrcBucket(), srcKey) .withProgressListener(downloadProgressListener), new File(localPath)); if (!downloadProgressListener.isSucceed()) { @@ -211,11 +211,12 @@ public void doTask() { } try { - uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), - config.isEntireFileMd5Attached(), userMetaMap); + String requestId = uploadFile(config.getBucketName(), cosPath, localFile, + config.getStorageClass(), config.isEntireFileMd5Attached(), userMetaMap); saveRecord(ossRecordElement); + saveRequestId(cosPath, requestId); TaskStatics.instance.addSuccessCnt(); - String printMsg = String.format("[ok] task_info: %s", ossRecordElement.buildKey()); + String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, ossRecordElement.buildKey()); System.out.println(printMsg); log.info(printMsg); } catch (Exception e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java index 51798b9..4cac86d 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java @@ -135,7 +135,7 @@ public void doTask() { // 下载object到文件 String localPath = config.getTempFolderPath() + UUID.randomUUID().toString(); File localFile = new File(localPath); - Map userMetaMap; + Map userMetaMap; try { GetObjectProgressListener getObjectProgressListener = new GetObjectProgressListener(srcKey); @@ -154,7 +154,7 @@ public void doTask() { TaskStatics.instance.addFailCnt(); return; } - + userMetaMap = objectMetadata.getUserMetadata(); if (localFile.length() != this.fileSize) { String printMsg = @@ -187,11 +187,12 @@ public void doTask() { // upload file try { - uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), - config.isEntireFileMd5Attached(), userMetaMap); + String requestId = uploadFile(config.getBucketName(), cosPath, localFile, + config.getStorageClass(), config.isEntireFileMd5Attached(), userMetaMap); saveRecord(awsRecordElement); + saveRequestId(cosPath, requestId); TaskStatics.instance.addSuccessCnt(); - String printMsg = String.format("[ok] task_info: %s", awsRecordElement.buildKey()); + String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, awsRecordElement.buildKey()); System.out.println(printMsg); log.info(printMsg); } catch (Exception e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java index e1d5b13..005fbee 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java @@ -4,6 +4,8 @@ import com.qcloud.cos.COSClient; import com.qcloud.cos.model.CopyObjectRequest; +import com.qcloud.cos.model.CopyObjectResult; +import com.qcloud.cos.model.CopyResult; import com.qcloud.cos.region.Region; import com.qcloud.cos.transfer.Copy; import com.qcloud.cos.transfer.TransferManager; @@ -53,14 +55,16 @@ public void doTask() { } CopyObjectRequest copyObjectRequest = new CopyObjectRequest(new Region(srcRegion), srcBucketName, srcKey, destBucketName, destKey); - copyObjectRequest.setSourceEndpointSuffix(srcEndpointSuffx); + // copyObjectRequest.setSourceEndpointSuffix(srcEndpointSuffx); try { Copy copy = smallFileTransfer.copy(copyObjectRequest, srcCOSClient, null); - copy.waitForCompletion(); + CopyResult copyResult = copy.waitForCopyResult(); + String requestId = copyResult.getRequestId(); saveRecord(migrateCopyBucketRecordElement); + saveRequestId(destKey, requestId); TaskStatics.instance.addSuccessCnt(); String printMsg = - String.format("[ok] task_info: %s", migrateCopyBucketRecordElement.buildKey()); + String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, migrateCopyBucketRecordElement.buildKey()); System.out.println(printMsg); log.info(printMsg); } catch (Exception e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java index 072acb2..e8a1926 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java @@ -73,11 +73,12 @@ public void doTask() { } try { - uploadFile(bucketName, cosPath, localFile, storageClass, entireMd5Attached, null); + String requestId = uploadFile(bucketName, cosPath, localFile, storageClass, entireMd5Attached, null); saveRecord(migrateLocalRecordElement); + saveRequestId(cosPath, requestId); TaskStatics.instance.addSuccessCnt(); String printMsg = - String.format("[ok] task_info: %s", migrateLocalRecordElement.buildKey()); + String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, migrateLocalRecordElement.buildKey()); System.out.println(printMsg); log.info(printMsg); } catch (Exception e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java index ae27bcb..a7625e5 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java @@ -99,11 +99,12 @@ public void doTask() { try { - uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), + String requestId = uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), config.isEntireFileMd5Attached(), null); saveRecord(qiniuRecordElement); + saveRequestId(cosPath, requestId); TaskStatics.instance.addSuccessCnt(); - String printMsg = String.format("[ok] task_info: %s", qiniuRecordElement.buildKey()); + String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, qiniuRecordElement.buildKey()); System.out.println(printMsg); log.info(printMsg); } catch (Exception e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java index fa380e7..c764006 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java @@ -102,11 +102,12 @@ public void doTask() { } try { - uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), + String requestId = uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), config.isEntireFileMd5Attached(), headAttr.userMetaMap); saveRecord(urllistRecordElement); + saveRequestId(cosPath, requestId); TaskStatics.instance.addSuccessCnt(); - String printMsg = String.format("[ok] task_info: %s", urllistRecordElement.buildKey()); + String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, urllistRecordElement.buildKey()); System.out.println(printMsg); log.info(printMsg); } catch (Exception e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java index d00cc6e..d6b84a6 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java @@ -9,13 +9,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.qcloud.cos.COSClient; import com.qcloud.cos.exception.CosServiceException; -import com.qcloud.cos.model.ListMultipartUploadsRequest; import com.qcloud.cos.model.ListPartsRequest; import com.qcloud.cos.model.ObjectMetadata; import com.qcloud.cos.model.PutObjectRequest; import com.qcloud.cos.model.StorageClass; +import com.qcloud.cos.model.UploadResult; import com.qcloud.cos.transfer.PersistableUpload; import com.qcloud.cos.transfer.Transfer.TransferState; import com.qcloud.cos.transfer.TransferManager; @@ -36,6 +35,7 @@ public abstract class Task implements Runnable { protected long smallFileThreshold; private RecordDb recordDb; protected CommonConfig config; + public Task(Semaphore semaphore, CommonConfig config, TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb) { @@ -62,6 +62,10 @@ public boolean isExist(RecordElement recordElement) { public void saveRecord(RecordElement recordElement) { recordDb.saveRecord(recordElement); } + + public void saveRequestId(String key, String requestId) { + recordDb.saveRequestId(key, requestId); + } private void printTransferProgress(TransferProgress progress, String key) { long byteSent = progress.getBytesTransferred(); @@ -77,18 +81,14 @@ private void printTransferProgress(TransferProgress progress, String key) { System.out.println(printMsg); } - public void showTransferProgress(Upload upload, boolean multipart, String key, long mtime) - throws InterruptedException { + public String showTransferProgressAndGetRequestId(Upload upload, boolean multipart, String key, + long mtime) throws InterruptedException { boolean pointSaveFlag = false; long printCount = 0; TransferProgress progress = upload.getProgress(); do { ++printCount; - try { - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } + Thread.sleep(100); long byteSent = progress.getBytesTransferred(); if (printCount % 20 == 0) { @@ -140,7 +140,8 @@ public void showTransferProgress(Upload upload, boolean multipart, String key, l } } } - upload.waitForUploadResult(); + UploadResult uploadResult = upload.waitForUploadResult(); + return uploadResult.getRequestId(); } private boolean isMultipartUploadIdValid(String bucketName, String cosKey, String uploadId) { @@ -154,7 +155,7 @@ private boolean isMultipartUploadIdValid(String bucketName, String cosKey, Strin } - private void uploadBigFile(PutObjectRequest putObjectRequest) throws InterruptedException { + private String uploadBigFile(PutObjectRequest putObjectRequest) throws InterruptedException { String bucketName = putObjectRequest.getBucketName(); String cosKey = putObjectRequest.getKey(); String localPath = putObjectRequest.getFile().getAbsolutePath(); @@ -174,19 +175,19 @@ private void uploadBigFile(PutObjectRequest putObjectRequest) throws Interrupted } else { upload = this.bigFileTransfer.upload(putObjectRequest); } - showTransferProgress(upload, true, cosKey, mtime); + return showTransferProgressAndGetRequestId(upload, true, cosKey, mtime); } - private void uploadSmallFile(PutObjectRequest putObjectRequest) throws InterruptedException { + private String uploadSmallFile(PutObjectRequest putObjectRequest) throws InterruptedException { Upload upload = smallFileTransfer.upload(putObjectRequest); - showTransferProgress(upload, false, putObjectRequest.getKey(), + return showTransferProgressAndGetRequestId(upload, false, putObjectRequest.getKey(), putObjectRequest.getFile().lastModified()); } - public void uploadFile(String bucketName, String cosPath, File localFile, + public String uploadFile(String bucketName, String cosPath, File localFile, StorageClass storageClass, boolean entireMd5Attached, Map userMetaMap) throws Exception { PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, cosPath, localFile); @@ -209,11 +210,10 @@ public void uploadFile(String bucketName, String cosPath, File localFile, while (retryTime < maxRetry) { try { if (localFile.length() >= smallFileThreshold) { - uploadBigFile(putObjectRequest); + return uploadBigFile(putObjectRequest); } else { - uploadSmallFile(putObjectRequest); + return uploadSmallFile(putObjectRequest); } - return; } catch (Exception e) { log.warn("upload failed, ready to retry. retryTime:" + retryTime, e); ++retryTime; @@ -224,6 +224,7 @@ public void uploadFile(String bucketName, String cosPath, File localFile, } } } + return null; } public abstract void doTask(); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java index 405501e..8e97568 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java @@ -37,6 +37,10 @@ public abstract class TaskExecutor { protected TransferManager smallFileTransferManager; protected TransferManager bigFileTransferManager; + enum RUN_MODE { + NORMAL, DUMP_REQUESTID, QUERY_REQUESTID; + } + public TaskExecutor(MigrateType migrateType, CommonConfig config) { this.migrateType = migrateType; this.config = config; @@ -53,7 +57,18 @@ public TaskExecutor(MigrateType migrateType, CommonConfig config) { clientConfig.setEndPointSuffix(config.getEndpointSuffix()); } clientConfig.setUserAgent("cos-migrate-tool-v1.0"); + System.out.println("ppppppppppppppppppppppppppppppp"); + System.out.println(config.getProxyHost()); + System.out.println(config.getProxyPort()); + System.out.println("ppppppppppppppppppppppppppppppp"); + if (!config.getProxyHost().isEmpty() && config.getProxyPort() > 0) { + System.out.println(config.getProxyHost()); + clientConfig.setHttpProxyIp(config.getProxyHost()); + clientConfig.setHttpProxyPort(config.getProxyPort()); + } + this.cosClient = new COSClient(cred, clientConfig); + this.smallFileTransferManager = new TransferManager(this.cosClient, Executors.newFixedThreadPool(config.getSmallFileExecutorNumber())); this.smallFileTransferManager.getConfiguration() @@ -93,10 +108,44 @@ protected void AddTask(Task task) throws InterruptedException { } } - // 用于产生任务 public abstract void buildTask(); + private RUN_MODE getRunMode() { + final String runMode = "RUN_MODE"; + String debugModeValue = System.getenv(runMode); + if (debugModeValue == null || debugModeValue.equalsIgnoreCase("NORMAL")) { + return RUN_MODE.NORMAL; + } else if (debugModeValue.equalsIgnoreCase("QUERY_REQUESTID")) { + return RUN_MODE.QUERY_REQUESTID; + } else if (debugModeValue.equalsIgnoreCase("DUMP_REQUESTID")) { + return RUN_MODE.DUMP_REQUESTID; + } + return RUN_MODE.NORMAL; + } + + private String getDumpRequestIdFilePath() { + final String dumpRequestIdFile = "DUMP_REQUESTID_FILE"; + String dumpFilePath = System.getenv(dumpRequestIdFile); + if (dumpFilePath == null) { + String errMsg = "env " + dumpRequestIdFile + " is null"; + System.err.println(errMsg); + log.error(errMsg); + } + return dumpFilePath; + } + + private String getQueryKey() { + final String queryKey = "QUERY_REQUESTID_KEY"; + String queryKeyValue = System.getenv(queryKey); + if (queryKeyValue == null) { + String errMsg = "env " + queryKey + " is null"; + System.err.println(errMsg); + log.error(errMsg); + } + return queryKeyValue; + } + public void run() { if (!initRecord()) { String errMsg = "init db error, may be another process with same config is running "; @@ -104,7 +153,21 @@ public void run() { System.err.println(errMsg); return; } - buildTask(); + + RUN_MODE runMode = getRunMode(); + if (runMode.equals(RUN_MODE.NORMAL)) { + buildTask(); + } else if (runMode.equals(RUN_MODE.DUMP_REQUESTID)) { + String dumpFilePath = getDumpRequestIdFilePath(); + if (dumpFilePath != null) { + recordDb.dumpRequestId(dumpFilePath); + } + } else if (runMode.equals(RUN_MODE.QUERY_REQUESTID)) { + String queryKey = getQueryKey(); + if (queryKey != null) { + recordDb.queryRequestId(queryKey); + } + } } public void waitTaskOver() { @@ -115,7 +178,9 @@ public void waitTaskOver() { this.smallFileTransferManager.shutdownNow(); this.bigFileTransferManager.shutdownNow(); this.cosClient.shutdown(); - printTaskStaticsInfo(); + if (getRunMode().equals(RUN_MODE.NORMAL)) { + printTaskStaticsInfo(); + } } catch (InterruptedException e) { log.error("waitTaskOver is interrupted!", e); System.err.println("waitTaskOver is interrupted!"); @@ -147,7 +212,8 @@ public void printTaskStaticsInfo() { printStr = String.format("%30s : %d", "migrate_skip", TaskStatics.instance.getSkipCnt()); System.out.println(printStr); log.info(printStr); - printStr = String.format("%30s : %d", "migrate_condition_not_match", TaskStatics.instance.getConditionNotMatchCnt()); + printStr = String.format("%30s : %d", "migrate_condition_not_match", + TaskStatics.instance.getConditionNotMatchCnt()); System.out.println(printStr); log.info(printStr); printStr = String.format("%30s : %s", "start_time", TaskStatics.instance.getStartTimeStr()); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java b/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java index c5f4980..83da2ea 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java @@ -13,6 +13,7 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.config.RequestConfig; @@ -47,6 +48,7 @@ private Downloader() { this.connectionManager.setValidateAfterInactivity(1); HttpClientBuilder httpClientBuilder = HttpClients.custom().setConnectionManager(connectionManager); + this.httpClient = httpClientBuilder.build(); this.requestConfig = RequestConfig.custom().setConnectionRequestTimeout(30 * 1000) .setConnectTimeout(30 * 1000).setSocketTimeout(30 * 1000).build(); @@ -63,30 +65,29 @@ public HeadAttr headFile(String url) { while (retry < maxRetryCount) { HttpHead httpHead = null; try { - StringBuffer urlBuffer = new StringBuffer(); + StringBuffer urlBuffer = new StringBuffer(); URL encodeUrl = new URL(url); - + urlBuffer.append(encodeUrl.getProtocol()).append("://").append(encodeUrl.getHost()); if (encodeUrl.getPath().startsWith("/")) { - urlBuffer.append("/").append(UrlEncoderUtils.encodeEscapeDelimiter(encodeUrl.getPath()).substring(1).replaceAll("/", "%2f")); + urlBuffer.append("/") + .append(UrlEncoderUtils.encodeEscapeDelimiter(encodeUrl.getPath()) + .substring(1).replaceAll("/", "%2f")); } else { - urlBuffer.append("/").append(UrlEncoderUtils.encodeEscapeDelimiter(encodeUrl.getPath()).replaceAll("/", "%2f")); + urlBuffer.append("/").append(UrlEncoderUtils + .encodeEscapeDelimiter(encodeUrl.getPath()).replaceAll("/", "%2f")); } - + if (encodeUrl.getQuery() != null) { - urlBuffer.append("?").append(URLEncoder.encode(encodeUrl.getQuery(),"UTF-8")); + urlBuffer.append("?").append(encodeUrl.getQuery()); } - - + httpHead = new HttpHead(urlBuffer.toString()); } catch (MalformedURLException e) { log.error("headFile url fail,url:{},msg:{}", url, e.getMessage()); return null; - } catch (UnsupportedEncodingException e) { - log.error("urlencode fail str:{}", url); - return null; - } + } httpHead.setConfig(requestConfig); httpHead.setHeader("Accept", "*/*"); @@ -125,20 +126,24 @@ public HeadAttr headFile(String url) { Header header = httpResponse.getFirstHeader("Last-Modified"); headAttr.lastModify = header.getValue(); } - + Header[] allHeaders = httpResponse.getAllHeaders(); final String ossUserMetaPrefix = "x-oss-meta-"; final String awsUserMetaPrefix = "x-amz-meta-"; for (Header headerElement : allHeaders) { String headerName = headerElement.getName(); String headerValue = headerElement.getValue(); - if (headerName.startsWith(ossUserMetaPrefix) && !headerName.equals(ossUserMetaPrefix)) { - headAttr.userMetaMap.put(headerName.substring(ossUserMetaPrefix.length()), headerValue); - } else if (headerName.startsWith(awsUserMetaPrefix) && !headerName.equals(awsUserMetaPrefix)) { - headAttr.userMetaMap.put(headerName.substring(awsUserMetaPrefix.length()), headerValue); + if (headerName.startsWith(ossUserMetaPrefix) + && !headerName.equals(ossUserMetaPrefix)) { + headAttr.userMetaMap.put(headerName.substring(ossUserMetaPrefix.length()), + headerValue); + } else if (headerName.startsWith(awsUserMetaPrefix) + && !headerName.equals(awsUserMetaPrefix)) { + headAttr.userMetaMap.put(headerName.substring(awsUserMetaPrefix.length()), + headerValue); } } - + return headAttr; } catch (Exception e) { log.error("head file attr fail, url: {}, retry: {}/{}, exception: {}", url, retry, @@ -172,30 +177,30 @@ public boolean downFile(String url, File localFile) { while (retry < maxRetryCount) { HttpGet httpGet = null; try { - StringBuffer urlBuffer = new StringBuffer(); + StringBuffer urlBuffer = new StringBuffer(); URL encodeUrl = new URL(url); - + urlBuffer.append(encodeUrl.getProtocol()).append("://").append(encodeUrl.getHost()); if (encodeUrl.getPath().startsWith("/")) { - urlBuffer.append("/").append(UrlEncoderUtils.encodeEscapeDelimiter(encodeUrl.getPath()).substring(1).replaceAll("/", "%2f")); + urlBuffer.append("/") + .append(UrlEncoderUtils.encodeEscapeDelimiter(encodeUrl.getPath()) + .substring(1).replaceAll("/", "%2f")); } else { - urlBuffer.append("/").append(UrlEncoderUtils.encodeEscapeDelimiter(encodeUrl.getPath()).replaceAll("/", "%2f")); + urlBuffer.append("/").append(UrlEncoderUtils + .encodeEscapeDelimiter(encodeUrl.getPath()).replaceAll("/", "%2f")); } - + if (encodeUrl.getQuery() != null) { - urlBuffer.append("?").append(URLEncoder.encode(encodeUrl.getQuery(),"UTF-8")); + urlBuffer.append("?").append(encodeUrl.getQuery()); } - + httpGet = new HttpGet(urlBuffer.toString()); - + } catch (MalformedURLException e) { log.error("downFile url fail, url:{}, msg:{}", url, e.getMessage()); return false; - } catch (UnsupportedEncodingException e) { - log.error("urlencode fail str:{}", url); - return false; - } + } httpGet.setConfig(requestConfig); httpGet.setHeader("Accept", "*/*"); diff --git a/start_cos_sync.bat b/start_cos_sync.bat new file mode 100644 index 0000000..18e267a --- /dev/null +++ b/start_cos_sync.bat @@ -0,0 +1,6 @@ +@echo off +set cur_dir=%CD% +cd %cur_dir% +set my_java_cp=.;%cur_dir%\dep\*;%cur_dir%\src\main\resources +java -Dfile.encoding=UTF-8 -cp "%my_java_cp%" com.qcloud.cos_migrate_tool.app.App +pause>nul diff --git a/start_cos_sync.sh b/start_cos_sync.sh new file mode 100644 index 0000000..0dc994d --- /dev/null +++ b/start_cos_sync.sh @@ -0,0 +1,8 @@ +#!/bin/bash +export LANG=en_US.utf8 + +cur_dir=$(cd `dirname $0`;pwd) +cd ${cur_dir} +cp_path=${cur_dir}/src/main/resources:${cur_dir}/dep/* + +java -Dfile.encoding=UTF-8 $@ -cp "$cp_path" com.qcloud.cos_migrate_tool.app.App