diff --git a/conf/config.ini b/conf/config.ini index 5d6b50d..073c173 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -23,8 +23,9 @@ storageClass=Standard cosPath=/ # 是否使用HTTPS传输(传输速度较慢,适用于对传输安全要求高的场景), on开启, off关闭 https=off -#临时目录,用于运行过程中,临时文件的存储, 主要用于友商数据迁移到COS, 因为迁移会现将数据下载到临时目录,再进行上传后删除.对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c -tmpFolder=E:\\code\\java\\workspace\\cos_migrate_tool\\tmp +# 临时目录,用于运行过程中,临时文件的存储, 主要用于友商数据迁移到COS, 因为迁移会现将数据下载到临时目录,再进行上传后删除.对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c +# 默认存储在工具下的tmp目录, 请确保磁盘空间充足,取决于要迁移的文件的大小与并发度。 +tmpFolder=./tmp # 小文件阈值的字节,大于等于这个阈值使用分块上传,否则使用简单上传, 默认5MB smallFileThreshold=5242880 # 小文件(文件小于smallFileThreshold)的并发度,使用简单上传 @@ -38,6 +39,10 @@ entireFileMd5Attached=on daemonMode=off # 表示每一轮同步结束后,多久进行下一轮同步,单位为秒 daemonModeInterVal=60 +# 表示任务执行的时间窗口, 单位为小时, 满足部分客户要求在指定时间段内执行,比如3,21, 表示在凌晨03:00到晚上21:59之间执行任务。 +# 如果当前时间不在时间窗口内,则会进入睡眠状态,暂停迁移,直到下一个时间窗口内自动再继续执行。 +# 但每一个任务都是 先判断时间是否在迁移窗口,然后开始迁移,有可能判断的时候 在时间窗口,但是迁移过程中有可能跨过时间窗口, 即存在少量的迁移在时间窗口外执行。 +executeTimeWindow=0,24 # 从本地迁移到COS配置分节 @@ -50,7 +55,7 @@ exeludes= ## 从阿里迁移到COS的配置分节 [migrateAli] -bucket=mubucket-test +bucket=mybucket-test accessKeyId=xxxxxxxxxx accessKeySecret=yyyyyyyyyyy #友商的地址 @@ -67,7 +72,7 @@ proxyPort= [migrateQiniu] # 从七牛迁移到COS的配置分节 # 七牛的bucket名称 -bucket=assda +bucket=mybucket # 七牛的账户信息 accessKeyId=xxxxxxxxxx accessKeySecret=yyyyyyyyyyyyyyyy diff --git a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar index 878e1a7..3ba6942 100644 Binary files a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar and b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar differ diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java index f644d4d..18e5ca1 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java @@ -27,6 +27,8 @@ public class CommonConfig { private long smallFileThreshold = 5 * 1024 * 1024; private boolean damonMode = false; private long damonInterVal = 60; + private int timeWindowBegin = 0; + private int timeWindowEnd = 24; public String getTempFolderPath() { return tempFolderPath; @@ -255,4 +257,40 @@ public boolean isDamonMode() { public long getDamonInterVal() { return damonInterVal; } + + public void setTimeWindowsStr(String timeWindowStr) { + timeWindowStr = timeWindowStr.trim(); + String[] timeWindowArray = timeWindowStr.split(","); + if (timeWindowArray.length != 2) { + throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,21"); + } + try { + int number = Integer.valueOf(timeWindowArray[0]); + if (number < 0 || number >= 24) { + throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,10"); + } + this.timeWindowBegin = number; + + number = Integer.valueOf(timeWindowArray[1]); + if (number < 0 || number > 24) { + throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,10"); + } + this.timeWindowEnd = number; + + if (this.timeWindowEnd < this.timeWindowBegin) { + throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,10"); + } + + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid executeTimeWindow"); + } + } + + public int getTimeWindowBegin() { + return timeWindowBegin; + } + + public int getTimeWindowEnd() { + return timeWindowEnd; + } } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java index a8744a1..708d7bc 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java @@ -39,6 +39,7 @@ public class ConfigParser { private static final String COMMON_ENTIRE_FILE_MD5_ATTACHED = "entireFileMd5Attached"; private static final String COMMON_DAEMON_MODE = "daemonMode"; private static final String COMMON_DAEMON_MODE_INTERVAL = "daemonModeInterVal"; + private static final String COMMON_EXECUTE_TIME_WINDOW = "executeTimeWindow"; private static final String LOCAL_SECTION_NAME = "migrateLocal"; private static final String LOCAL_LOCALPATH = "localPath"; @@ -300,14 +301,17 @@ private boolean checkCommonConfig(Preferences prefs) { if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_ENTIRE_FILE_MD5_ATTACHED)) { return false; } - + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE)) { return false; } - + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE_INTERVAL)) { return false; } + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_EXECUTE_TIME_WINDOW)) { + return false; + } return true; } @@ -404,22 +408,26 @@ private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) { getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_STORAGE_CLASS); assert (storageClassStr != null); commonConfig.setStorageClass(storageClassStr); - + String entireFileMd5AttachedStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_ENTIRE_FILE_MD5_ATTACHED); assert (entireFileMd5AttachedStr != null); commonConfig.setEntireFileMd5Attached(entireFileMd5AttachedStr); - - String daemonModeStr = - getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE); + + String daemonModeStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE); assert (daemonModeStr != null); commonConfig.setDaemonMode(daemonModeStr); String daemonModeInterValStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE_INTERVAL); assert (daemonModeInterValStr != null); - commonConfig.setDaemonModeInterVal(daemonModeInterValStr); + commonConfig.setDaemonModeInterVal(daemonModeInterValStr); + + String timeWindowStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_EXECUTE_TIME_WINDOW); + assert (timeWindowStr != null); + commonConfig.setTimeWindowsStr(timeWindowStr); } catch (Exception e) { System.err.println(e.getMessage()); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java index 3794d21..442ffd1 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java @@ -1,20 +1,21 @@ package com.qcloud.cos_migrate_tool.record; import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.HeadAttr; public class MigrateUrllistRecordElement extends RecordElement { private String bucketName; private String cosPath; private String url; - private long fileSize; + private HeadAttr headAttr; public MigrateUrllistRecordElement(MigrateType migrateType, String bucketName, String cosPath, - String url, long fileSize) { + String url, HeadAttr headAttr) { super(migrateType); this.bucketName = bucketName; this.cosPath = cosPath; this.url = url; - this.fileSize = fileSize; + this.headAttr = headAttr; } @Override @@ -26,7 +27,8 @@ public String buildKey() { @Override public String buildValue() { - String value = String.format("[fileSize: %d]", fileSize); + String value = String.format("[fileSize: %d], [lastModify: %s]", headAttr.fileSize, + headAttr.lastModify); return value; } } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java index 2cb4b01..52f9136 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java @@ -22,7 +22,6 @@ public class MigrateAliTask extends Task { - private CopyFromAliConfig config; private OSSClient ossClient; private String srcKey; private long fileSize; @@ -31,10 +30,7 @@ public class MigrateAliTask extends Task { public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcKey, long fileSize, String etag, TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) { - super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), - recordDb); - - this.config = config; + super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb); this.ossClient = ossClient; this.srcKey = srcKey; this.fileSize = fileSize; @@ -46,7 +42,7 @@ public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcK } private String buildCOSPath() { - String srcPrefix = config.getSrcPrefix(); + String srcPrefix = ((CopyFromAliConfig)config).getSrcPrefix(); int lastDelimiter = srcPrefix.lastIndexOf("/"); String keyName = srcKey.substring(lastDelimiter + 1); String cosPrefix = config.getCosPath(); @@ -143,7 +139,7 @@ public void doTask() { GetObjectProgressListener downloadProgressListener = new GetObjectProgressListener(srcKey); ObjectMetadata objMeta = ossClient.getObject( - new GetObjectRequest(config.getSrcBucket(), srcKey) + new GetObjectRequest(((CopyFromAliConfig)config).getSrcBucket(), srcKey) .withProgressListener(downloadProgressListener), new File(localPath)); if (!downloadProgressListener.isSucceed()) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java index f0a626f..519a9d5 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java @@ -20,7 +20,6 @@ public class MigrateAwsTask extends Task { - private CopyFromAwsConfig config; private String srcKey; private long fileSize; private String etag; @@ -29,9 +28,7 @@ public class MigrateAwsTask extends Task { public MigrateAwsTask(CopyFromAwsConfig config, AmazonS3 s3Client, String srcKey, long fileSize, String etag, TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) { - super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), - recordDb); - this.config = config; + super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb); this.s3Client = s3Client; this.srcKey = srcKey; this.fileSize = fileSize; @@ -39,11 +36,10 @@ public MigrateAwsTask(CopyFromAwsConfig config, AmazonS3 s3Client, String srcKey if (srcKey.startsWith("/")) { this.srcKey = srcKey.substring(1); } - } private String buildCOSPath() { - String srcPrefix = config.getSrcPrefix(); + String srcPrefix = ((CopyFromAwsConfig) config).getSrcPrefix(); int lastDelimiter = srcPrefix.lastIndexOf("/"); String keyName = srcKey.substring(lastDelimiter + 1); String cosPrefix = config.getCosPath(); @@ -140,20 +136,25 @@ public void doTask() { try { GetObjectProgressListener getObjectProgressListener = new GetObjectProgressListener(srcKey); - s3Client.getObject(new GetObjectRequest(config.getSrcBucket(), srcKey) - .withGeneralProgressListener(getObjectProgressListener), + s3Client.getObject( + new GetObjectRequest(((CopyFromAwsConfig) config).getSrcBucket(), srcKey) + .withGeneralProgressListener( + getObjectProgressListener), localFile); if (!localFile.exists()) { - String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + String printMsg = + String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); System.out.println(printMsg); - log.error("[fail] [taskInfo: {}] [srcKey: {}] [download localfile failed, localFile {} not exist]", + log.error( + "[fail] [taskInfo: {}] [srcKey: {}] [download localfile failed, localFile {} not exist]", awsRecordElement.buildKey(), srcKey, localPath); TaskStatics.instance.addFailCnt(); return; } if (localFile.length() != this.fileSize) { - String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + String printMsg = + String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); System.out.println(printMsg); log.error("[fail] [taskInfo: {}] [download size {} not equal meta size {}]", awsRecordElement.buildKey(), localFile.length(), this.fileSize); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java index c207cc6..715a426 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java @@ -7,6 +7,7 @@ import com.qcloud.cos.region.Region; import com.qcloud.cos.transfer.Copy; import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CopyBucketConfig; import com.qcloud.cos_migrate_tool.meta.TaskStatics; import com.qcloud.cos_migrate_tool.record.MigrateCopyBucketRecordElement; import com.qcloud.cos_migrate_tool.record.RecordDb; @@ -22,17 +23,16 @@ public class MigrateCopyBucketTask extends Task { private final long srcSize; private final String srcEtag; - public MigrateCopyBucketTask(Semaphore semaphore, TransferManager smallFileTransfer, - TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb, COSClient srcCOSClient, - String destRegion, String destBucketName, String destKey, String srcRegion, - String srcBucketName, String srcKey, long srcSize, String srcEtag) { - super(semaphore, smallFileTransfer, bigFileTransfer, smallFileThreshold, recordDb); + public MigrateCopyBucketTask(Semaphore semaphore, CopyBucketConfig config, + TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, + COSClient srcCOSClient, String srcKey, long srcSize, String srcEtag, String destKey) { + super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb); this.srcCOSClient = srcCOSClient; - this.destRegion = destRegion; - this.destBucketName = destBucketName; + this.destRegion = config.getRegion(); + this.destBucketName = config.getBucketName(); this.destKey = destKey; - this.srcRegion = srcRegion; - this.srcBucketName = srcBucketName; + this.srcRegion = config.getSrcRegion(); + this.srcBucketName = config.getSrcBucket(); this.srcKey = srcKey; this.srcSize = srcSize; this.srcEtag = srcEtag; diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java index 4fe7048..18ed603 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java @@ -82,11 +82,10 @@ public void buildTask() { String keyName = srcKey.substring(lastDelimiter); String copyDestKey = config.getCosPath() + keyName; - MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore, - smallFileTransferManager, bigFileTransferManager, - config.getSmallFileThreshold(), recordDb, srcCosClient, - config.getRegion(), config.getBucketName(), copyDestKey, srcRegion, - srcBucketName, srcKey, srcSize, srcEtag); + MigrateCopyBucketTask task = + new MigrateCopyBucketTask(semaphore, (CopyBucketConfig) config, + smallFileTransferManager, bigFileTransferManager, recordDb, + srcCosClient, srcKey, srcSize, srcEtag, copyDestKey); AddTask(task); } if (!objectListing.isTruncated()) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java index 768c313..298469c 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java @@ -8,6 +8,8 @@ import com.qcloud.cos.model.StorageClass; import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CommonConfig; +import com.qcloud.cos_migrate_tool.config.CopyFromLocalConfig; import com.qcloud.cos_migrate_tool.meta.TaskStatics; import com.qcloud.cos_migrate_tool.record.MigrateLocalRecordElement; import com.qcloud.cos_migrate_tool.record.RecordDb; @@ -24,16 +26,16 @@ public class MigrateLocalTask extends Task { private boolean entireMd5Attached; - public MigrateLocalTask(String bucketName, String localFolder, String cosFolder, File localFile, - StorageClass storageClass, TransferManager smallFileTransfer, - TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb, Semaphore semaphore, boolean entireMd5Attached) { - super(semaphore, smallFileTransfer, bigFileTransfer, smallFileThreshold, recordDb); - this.bucketName = bucketName; - this.localFolder = localFolder; - this.cosFolder = cosFolder; + public MigrateLocalTask(Semaphore semaphore, CommonConfig commonConfig, + TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, + File localFile) { + super(semaphore, commonConfig, smallFileTransfer, bigFileTransfer, recordDb); + this.bucketName = commonConfig.getBucketName(); + this.localFolder = ((CopyFromLocalConfig) commonConfig).getLocalPath(); + this.cosFolder = commonConfig.getCosPath(); this.localFile = localFile; - this.storageClass = storageClass; - this.entireMd5Attached = entireMd5Attached; + this.storageClass = commonConfig.getStorageClass(); + this.entireMd5Attached = commonConfig.isEntireFileMd5Attached(); } private String buildCOSPath(String localPath) { @@ -57,37 +59,14 @@ public void doTask() { return; } - try { - uploadFile(bucketName, cosPath, localFile, storageClass, entireMd5Attached); - saveRecord(migrateLocalRecordElement); - TaskStatics.instance.addSuccessCnt(); - String printMsg = String.format("[ok] task_info: %s", migrateLocalRecordElement.buildKey()); - System.out.println(printMsg); - log.info(printMsg); - } catch (Exception e) { - String printMsg = String.format("[fail] task_info: %s", migrateLocalRecordElement.buildKey()); - System.out.println(printMsg); - log.error("fail! task_info: [key: {}], [value: {}], exception: {}", migrateLocalRecordElement.buildKey(), - migrateLocalRecordElement.buildValue(), e.toString()); - TaskStatics.instance.addFailCnt(); - } - - /* - PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, cosPath, localFile); - putObjectRequest.setStorageClass(storageClass); - Upload upload = null; try { - if (fileSize >= SMALL_FILE_THRESHOLD) { - upload = bigFileTransfer.upload(putObjectRequest); - } else { - upload = smallFileTransfer.upload(putObjectRequest); - } - upload.waitForCompletion(); - recordDb.saveRecord(migrateLocalRecordElement); + uploadFile(bucketName, cosPath, localFile, storageClass, entireMd5Attached); + saveRecord(migrateLocalRecordElement); TaskStatics.instance.addSuccessCnt(); String printMsg = String.format("[ok] task_info: %s", migrateLocalRecordElement.buildKey()); System.out.println(printMsg); + log.info(printMsg); } catch (Exception e) { String printMsg = String.format("[fail] task_info: %s", migrateLocalRecordElement.buildKey()); @@ -95,8 +74,7 @@ public void doTask() { log.error("fail! task_info: [key: {}], [value: {}], exception: {}", migrateLocalRecordElement.buildKey(), migrateLocalRecordElement.buildValue(), e.toString()); - TaskStatics.instance.addSkipCnt(); + TaskStatics.instance.addFailCnt(); } - */ } } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java index aec9b5b..206a7eb 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java @@ -69,20 +69,12 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) String localPath = SystemUtils.formatLocalPath(file.toString()); String filePath = ""; - if (filePath.contains(".w.")) { - return super.visitFile(file, attrs); - } try { if (!((CopyFromLocalConfig) config).isExcludes(localPath)) { File localFile = new File(file.toString()); - StorageClass storageClass = - ((CopyFromLocalConfig) config).getStorageClass(); - boolean entireFileMd5Attached = config.isEntireFileMd5Attached(); - long smallFileThreshold = config.getSmallFileThreshold(); - MigrateLocalTask migrateLocalTask = new MigrateLocalTask(bucketName, - localFolder, cosFolder, localFile, storageClass, - smallFileTransferManager, bigFileTransferManager, - smallFileThreshold, recordDb, semaphore, entireFileMd5Attached); + MigrateLocalTask migrateLocalTask = + new MigrateLocalTask(semaphore, config, smallFileTransferManager, + bigFileTransferManager, recordDb, localFile); AddTask(migrateLocalTask); } } catch (InterruptedException e) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java index 2afedf4..5d769a2 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java @@ -14,8 +14,6 @@ import com.qiniu.util.Auth; public class MigrateQiniuTask extends Task { - - private CopyFromQiniuConfig config; private Auth auth; private String srcKey; private long fileSize; @@ -24,8 +22,7 @@ public class MigrateQiniuTask extends Task { public MigrateQiniuTask(CopyFromQiniuConfig config, Auth auth, String srcKey, long fileSize, String etag, TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) { - super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), recordDb); - + super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb); this.config = config; this.srcKey = srcKey; this.fileSize = fileSize; @@ -37,7 +34,7 @@ public MigrateQiniuTask(CopyFromQiniuConfig config, Auth auth, String srcKey, lo } private String buildCOSPath() { - String srcPrefix = config.getSrcPrefix(); + String srcPrefix = ((CopyFromQiniuConfig)config).getSrcPrefix(); int lastDelimiter = srcPrefix.lastIndexOf("/"); String keyName = srcKey.substring(lastDelimiter + 1); String cosPrefix = config.getCosPath(); @@ -63,7 +60,7 @@ public void doTask() { } // generate download url - String baseUrl = "http://" + config.getSrcEndpoint() + "/" + srcKey; + String baseUrl = "http://" + ((CopyFromQiniuConfig)config).getSrcEndpoint() + "/" + srcKey; String url = auth.privateDownloadUrl(baseUrl, 3600); File localFile = new File(localPath); diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java index 8870a40..7dad041 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java @@ -11,20 +11,17 @@ import com.qcloud.cos_migrate_tool.record.MigrateUrllistRecordElement; import com.qcloud.cos_migrate_tool.record.RecordDb; import com.qcloud.cos_migrate_tool.utils.Downloader; +import com.qcloud.cos_migrate_tool.utils.HeadAttr; public class MigrateUrllistTask extends Task { - private CopyFromUrllistConfig config; private String url; private String srcKey; public MigrateUrllistTask(CopyFromUrllistConfig config, String url, String srcKey, TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) { - super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), - recordDb); - - this.config = config; + super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb); this.url = url; this.srcKey = srcKey; if (srcKey.startsWith("/")) { @@ -46,15 +43,27 @@ public void doTask() { String cosPath = buildCOSPath(); String localPath = config.getTempFolderPath() + ThreadLocalRandom.current().nextLong(); - long fileSize = -1; + HeadAttr headAttr = null; try { - fileSize = Downloader.instance.headFile(url); + headAttr = Downloader.instance.headFile(url); } catch (Exception e) { - log.error("head file fail, url: {}, msg:{}", url, e.getMessage()); + String printMsg = String.format("head url attr fail, url: %s", url); + System.err.println(printMsg); + log.error(printMsg, e); + TaskStatics.instance.addFailCnt(); + return; + } + + if (headAttr == null) { + String printMsg = String.format("head url attr fail, url: %s", url); + System.err.println(printMsg); + log.error(printMsg); + TaskStatics.instance.addFailCnt(); + return; } MigrateUrllistRecordElement urllistRecordElement = new MigrateUrllistRecordElement( - MigrateType.MIGRATE_FROM_URLLIST, config.getBucketName(), cosPath, url, fileSize); + MigrateType.MIGRATE_FROM_URLLIST, config.getBucketName(), cosPath, url, headAttr); if (isExist(urllistRecordElement)) { TaskStatics.instance.addSkipCnt(); return; diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java index d83a1b8..df7309a 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java @@ -56,7 +56,7 @@ public void buildTask() { SimpleFileVisitor finder = new SimpleFileVisitor() { public FileVisitResult visitFile(Path urllistPath, BasicFileAttributes attrs) throws IOException { - System.out.println(urllistPath.toString()); + log.info("ready to migrate url file {}", urllistPath.toAbsolutePath().toString()); File urllistFile = urllistPath.toFile(); FileInputStream fis = null; try { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java index 346bfab..9c2d0c7 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java @@ -3,6 +3,7 @@ import java.io.File; import java.util.concurrent.Semaphore; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +16,7 @@ import com.qcloud.cos.transfer.TransferProgress; import com.qcloud.cos.transfer.Upload; import com.qcloud.cos.utils.Md5Utils; +import com.qcloud.cos_migrate_tool.config.CommonConfig; import com.qcloud.cos_migrate_tool.record.RecordDb; import com.qcloud.cos_migrate_tool.record.RecordElement; @@ -27,14 +29,16 @@ public abstract class Task implements Runnable { protected TransferManager bigFileTransfer; protected long smallFileThreshold; private RecordDb recordDb; + protected CommonConfig config; - public Task(Semaphore semaphore, TransferManager smallFileTransfer, - TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb) { + public Task(Semaphore semaphore, CommonConfig config, TransferManager smallFileTransfer, + TransferManager bigFileTransfer, RecordDb recordDb) { super(); this.semaphore = semaphore; + this.config = config; this.smallFileTransfer = smallFileTransfer; this.bigFileTransfer = bigFileTransfer; - this.smallFileThreshold = smallFileThreshold; + this.smallFileThreshold = config.getSmallFileThreshold(); this.recordDb = recordDb; } @@ -171,10 +175,29 @@ public void uploadFile(String bucketName, String cosPath, File localFile, public abstract void doTask(); + private void checkTimeWindows() throws InterruptedException { + int timeWindowBegin = config.getTimeWindowBegin(); + int timeWindowEnd = config.getTimeWindowEnd(); + while (true) { + DateTime dateTime = DateTime.now(); + int hour = dateTime.getHourOfDay(); + if (hour >= timeWindowBegin && hour <= timeWindowEnd) { + return; + } + String printTips = String.format("currentTime %s, wait next time window [%d, %d]", + dateTime.toString("yyyy-MM-dd HH:mm:ss"), timeWindowBegin, timeWindowEnd); + System.out.println(printTips); + log.info(printTips); + Thread.sleep(60000); + } + } public void run() { try { + checkTimeWindows(); doTask(); + } catch (InterruptedException e) { + log.error("task is interrupted", e); } finally { semaphore.release(); } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java b/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java index a375549..0f89e14 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java @@ -54,10 +54,11 @@ private Downloader() { this.idleConnectionMonitor.start(); } - public long headFile(String url) { + public HeadAttr headFile(String url) { int retry = 0; int maxRetryCount = 5; + HeadAttr headAttr = new HeadAttr(); while (retry < maxRetryCount) { HttpHead httpHead = null; try { @@ -67,10 +68,11 @@ public long headFile(String url) { httpHead = new HttpHead(uri); } catch (URISyntaxException e) { String errMsg = "Invalid url:" + url; - log.error(errMsg); + log.error(errMsg, e); + return null; } catch (MalformedURLException e) { log.error("headFile url fail,url:{},msg:{}", url, e.getMessage()); - return -1; + return null; } httpHead.setConfig(requestConfig); @@ -88,31 +90,37 @@ public long headFile(String url) { throw new Exception(errMsg); } - if (!httpResponse.containsHeader("content-length")) { - log.info("not find content-length"); - return -1; - } - - Header header = httpResponse.getFirstHeader("content-length"); - - try { - long contentLength = Long.valueOf(header.getValue()); - if (contentLength < 0) { - throw new IllegalArgumentException("The minimum of content-length is 0"); + if (httpResponse.containsHeader("content-length")) { + Header header = httpResponse.getFirstHeader("content-length"); + long contentLength = -1; + try { + contentLength = Long.valueOf(header.getValue()); + if (contentLength < 0) { + log.error("invalid contentlength, url {}, contentLength {}", url, + header.getValue()); + return null; + } + headAttr.fileSize = contentLength; + } catch (NumberFormatException e) { + log.error("invalid contentlength, url {}, contentLength {}", url, + header.getValue()); + return null; } - return contentLength; - } catch (NumberFormatException e) { - throw new IllegalArgumentException("invalid content-length"); } + if (httpResponse.containsHeader("Last-Modified")) { + Header header = httpResponse.getFirstHeader("Last-Modified"); + headAttr.lastModify = header.getValue(); + } + return headAttr; } catch (Exception e) { - log.error(e.toString()); + log.error("head file attr fail, url: {}, retry: {}/{}, exception: {}", url, retry, + maxRetryCount, e.toString()); httpHead.abort(); - return -1; + ++retry; } - } - return -1; + return null; } private void showDownloadProgress(String url, long byteTotal, long byteDownloadSofar) { @@ -146,7 +154,7 @@ public boolean downFile(String url, File localFile) { log.error(errMsg); return false; } catch (MalformedURLException e) { - log.error("downFile url fail,url:{},msg:{}", url, e.getMessage()); + log.error("downFile url fail, url:{}, msg:{}", url, e.getMessage()); return false; } @@ -200,7 +208,8 @@ public boolean downFile(String url, File localFile) { } } } catch (Exception e) { - log.error(e.toString()); + log.error("download file failed, url: {}, retry: {}/{}, exception: {}", url, retry, + maxRetryCount, e.toString()); httpGet.abort(); localFile.delete(); } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/utils/HeadAttr.java b/src/main/java/com/qcloud/cos_migrate_tool/utils/HeadAttr.java new file mode 100644 index 0000000..9494670 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/utils/HeadAttr.java @@ -0,0 +1,7 @@ +package com.qcloud.cos_migrate_tool.utils; + +public class HeadAttr { + public long fileSize = -1; + public String lastModify = ""; + +} diff --git a/tmp/.gitkeep b/tmp/.gitkeep new file mode 100644 index 0000000..e69de29