Skip to content

Commit

Permalink
增加时间窗口功能
Browse files Browse the repository at this point in the history
  • Loading branch information
chengwu(吴承) committed Apr 4, 2018
1 parent a4d80ad commit 34b3788
Show file tree
Hide file tree
Showing 18 changed files with 201 additions and 137 deletions.
13 changes: 9 additions & 4 deletions conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ storageClass=Standard
cosPath=/
# 是否使用HTTPS传输(传输速度较慢,适用于对传输安全要求高的场景), on开启, off关闭
https=off
#临时目录,用于运行过程中,临时文件的存储, 主要用于友商数据迁移到COS, 因为迁移会现将数据下载到临时目录,再进行上传后删除.对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c
tmpFolder=E:\\code\\java\\workspace\\cos_migrate_tool\\tmp
# 临时目录,用于运行过程中,临时文件的存储, 主要用于友商数据迁移到COS, 因为迁移会现将数据下载到临时目录,再进行上传后删除.对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c
# 默认存储在工具下的tmp目录, 请确保磁盘空间充足,取决于要迁移的文件的大小与并发度。
tmpFolder=./tmp
# 小文件阈值的字节,大于等于这个阈值使用分块上传,否则使用简单上传, 默认5MB
smallFileThreshold=5242880
# 小文件(文件小于smallFileThreshold)的并发度,使用简单上传
Expand All @@ -38,6 +39,10 @@ entireFileMd5Attached=on
daemonMode=off
# 表示每一轮同步结束后,多久进行下一轮同步,单位为秒
daemonModeInterVal=60
# 表示任务执行的时间窗口, 单位为小时, 满足部分客户要求在指定时间段内执行,比如3,21, 表示在凌晨03:00到晚上21:59之间执行任务。
# 如果当前时间不在时间窗口内,则会进入睡眠状态,暂停迁移,直到下一个时间窗口内自动再继续执行。
# 但每一个任务都是 先判断时间是否在迁移窗口,然后开始迁移,有可能判断的时候 在时间窗口,但是迁移过程中有可能跨过时间窗口, 即存在少量的迁移在时间窗口外执行。
executeTimeWindow=0,24


# 从本地迁移到COS配置分节
Expand All @@ -50,7 +55,7 @@ exeludes=

## 从阿里迁移到COS的配置分节
[migrateAli]
bucket=mubucket-test
bucket=mybucket-test
accessKeyId=xxxxxxxxxx
accessKeySecret=yyyyyyyyyyy
#友商的地址
Expand All @@ -67,7 +72,7 @@ proxyPort=
[migrateQiniu]
# 从七牛迁移到COS的配置分节
# 七牛的bucket名称
bucket=assda
bucket=mybucket
# 七牛的账户信息
accessKeyId=xxxxxxxxxx
accessKeySecret=yyyyyyyyyyyyyyyy
Expand Down
Binary file modified dep/cos_migrate_tool-1.0-jar-with-dependencies.jar
Binary file not shown.
38 changes: 38 additions & 0 deletions src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class CommonConfig {
private long smallFileThreshold = 5 * 1024 * 1024;
private boolean damonMode = false;
private long damonInterVal = 60;
private int timeWindowBegin = 0;
private int timeWindowEnd = 24;

public String getTempFolderPath() {
return tempFolderPath;
Expand Down Expand Up @@ -255,4 +257,40 @@ public boolean isDamonMode() {
public long getDamonInterVal() {
return damonInterVal;
}

public void setTimeWindowsStr(String timeWindowStr) {
timeWindowStr = timeWindowStr.trim();
String[] timeWindowArray = timeWindowStr.split(",");
if (timeWindowArray.length != 2) {
throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,21");
}
try {
int number = Integer.valueOf(timeWindowArray[0]);
if (number < 0 || number >= 24) {
throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,10");
}
this.timeWindowBegin = number;

number = Integer.valueOf(timeWindowArray[1]);
if (number < 0 || number > 24) {
throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,10");
}
this.timeWindowEnd = number;

if (this.timeWindowEnd < this.timeWindowBegin) {
throw new IllegalArgumentException("executeTimeWindow is invalid, the legal example 3,10");
}

} catch (NumberFormatException e) {
throw new IllegalArgumentException("invalid executeTimeWindow");
}
}

public int getTimeWindowBegin() {
return timeWindowBegin;
}

public int getTimeWindowEnd() {
return timeWindowEnd;
}
}
22 changes: 15 additions & 7 deletions src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ConfigParser {
private static final String COMMON_ENTIRE_FILE_MD5_ATTACHED = "entireFileMd5Attached";
private static final String COMMON_DAEMON_MODE = "daemonMode";
private static final String COMMON_DAEMON_MODE_INTERVAL = "daemonModeInterVal";
private static final String COMMON_EXECUTE_TIME_WINDOW = "executeTimeWindow";

private static final String LOCAL_SECTION_NAME = "migrateLocal";
private static final String LOCAL_LOCALPATH = "localPath";
Expand Down Expand Up @@ -300,14 +301,17 @@ private boolean checkCommonConfig(Preferences prefs) {
if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_ENTIRE_FILE_MD5_ATTACHED)) {
return false;
}

if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE)) {
return false;
}

if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE_INTERVAL)) {
return false;
}
if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_EXECUTE_TIME_WINDOW)) {
return false;
}
return true;
}

Expand Down Expand Up @@ -404,22 +408,26 @@ private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) {
getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_STORAGE_CLASS);
assert (storageClassStr != null);
commonConfig.setStorageClass(storageClassStr);


String entireFileMd5AttachedStr =
getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_ENTIRE_FILE_MD5_ATTACHED);
assert (entireFileMd5AttachedStr != null);
commonConfig.setEntireFileMd5Attached(entireFileMd5AttachedStr);

String daemonModeStr =
getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE);

String daemonModeStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE);
assert (daemonModeStr != null);
commonConfig.setDaemonMode(daemonModeStr);

String daemonModeInterValStr =
getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE_INTERVAL);
assert (daemonModeInterValStr != null);
commonConfig.setDaemonModeInterVal(daemonModeInterValStr);
commonConfig.setDaemonModeInterVal(daemonModeInterValStr);

String timeWindowStr =
getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_EXECUTE_TIME_WINDOW);
assert (timeWindowStr != null);
commonConfig.setTimeWindowsStr(timeWindowStr);

} catch (Exception e) {
System.err.println(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.qcloud.cos_migrate_tool.record;

import com.qcloud.cos_migrate_tool.config.MigrateType;
import com.qcloud.cos_migrate_tool.utils.HeadAttr;

public class MigrateUrllistRecordElement extends RecordElement {
private String bucketName;
private String cosPath;
private String url;
private long fileSize;
private HeadAttr headAttr;

public MigrateUrllistRecordElement(MigrateType migrateType, String bucketName, String cosPath,
String url, long fileSize) {
String url, HeadAttr headAttr) {
super(migrateType);
this.bucketName = bucketName;
this.cosPath = cosPath;
this.url = url;
this.fileSize = fileSize;
this.headAttr = headAttr;
}

@Override
Expand All @@ -26,7 +27,8 @@ public String buildKey() {

@Override
public String buildValue() {
String value = String.format("[fileSize: %d]", fileSize);
String value = String.format("[fileSize: %d], [lastModify: %s]", headAttr.fileSize,
headAttr.lastModify);
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

public class MigrateAliTask extends Task {

private CopyFromAliConfig config;
private OSSClient ossClient;
private String srcKey;
private long fileSize;
Expand All @@ -31,10 +30,7 @@ public class MigrateAliTask extends Task {
public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcKey,
long fileSize, String etag, TransferManager smallFileTransfer,
TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) {
super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(),
recordDb);

this.config = config;
super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb);
this.ossClient = ossClient;
this.srcKey = srcKey;
this.fileSize = fileSize;
Expand All @@ -46,7 +42,7 @@ public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcK
}

private String buildCOSPath() {
String srcPrefix = config.getSrcPrefix();
String srcPrefix = ((CopyFromAliConfig)config).getSrcPrefix();
int lastDelimiter = srcPrefix.lastIndexOf("/");
String keyName = srcKey.substring(lastDelimiter + 1);
String cosPrefix = config.getCosPath();
Expand Down Expand Up @@ -143,7 +139,7 @@ public void doTask() {
GetObjectProgressListener downloadProgressListener =
new GetObjectProgressListener(srcKey);
ObjectMetadata objMeta = ossClient.getObject(
new GetObjectRequest(config.getSrcBucket(), srcKey)
new GetObjectRequest(((CopyFromAliConfig)config).getSrcBucket(), srcKey)
.<GetObjectRequest>withProgressListener(downloadProgressListener),
new File(localPath));
if (!downloadProgressListener.isSucceed()) {
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

public class MigrateAwsTask extends Task {

private CopyFromAwsConfig config;
private String srcKey;
private long fileSize;
private String etag;
Expand All @@ -29,21 +28,18 @@ public class MigrateAwsTask extends Task {
public MigrateAwsTask(CopyFromAwsConfig config, AmazonS3 s3Client, String srcKey, long fileSize,
String etag, TransferManager smallFileTransfer, TransferManager bigFileTransfer,
RecordDb recordDb, Semaphore semaphore) {
super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(),
recordDb);
this.config = config;
super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb);
this.s3Client = s3Client;
this.srcKey = srcKey;
this.fileSize = fileSize;
this.etag = etag;
if (srcKey.startsWith("/")) {
this.srcKey = srcKey.substring(1);
}

}

private String buildCOSPath() {
String srcPrefix = config.getSrcPrefix();
String srcPrefix = ((CopyFromAwsConfig) config).getSrcPrefix();
int lastDelimiter = srcPrefix.lastIndexOf("/");
String keyName = srcKey.substring(lastDelimiter + 1);
String cosPrefix = config.getCosPath();
Expand Down Expand Up @@ -140,20 +136,25 @@ public void doTask() {
try {
GetObjectProgressListener getObjectProgressListener =
new GetObjectProgressListener(srcKey);
s3Client.getObject(new GetObjectRequest(config.getSrcBucket(), srcKey)
.<GetObjectRequest>withGeneralProgressListener(getObjectProgressListener),
s3Client.getObject(
new GetObjectRequest(((CopyFromAwsConfig) config).getSrcBucket(), srcKey)
.<GetObjectRequest>withGeneralProgressListener(
getObjectProgressListener),
localFile);
if (!localFile.exists()) {
String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey());
String printMsg =
String.format("[fail] [task_info: %s]", awsRecordElement.buildKey());
System.out.println(printMsg);
log.error("[fail] [taskInfo: {}] [srcKey: {}] [download localfile failed, localFile {} not exist]",
log.error(
"[fail] [taskInfo: {}] [srcKey: {}] [download localfile failed, localFile {} not exist]",
awsRecordElement.buildKey(), srcKey, localPath);
TaskStatics.instance.addFailCnt();
return;
}

if (localFile.length() != this.fileSize) {
String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey());
String printMsg =
String.format("[fail] [task_info: %s]", awsRecordElement.buildKey());
System.out.println(printMsg);
log.error("[fail] [taskInfo: {}] [download size {} not equal meta size {}]",
awsRecordElement.buildKey(), localFile.length(), this.fileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.qcloud.cos.region.Region;
import com.qcloud.cos.transfer.Copy;
import com.qcloud.cos.transfer.TransferManager;
import com.qcloud.cos_migrate_tool.config.CopyBucketConfig;
import com.qcloud.cos_migrate_tool.meta.TaskStatics;
import com.qcloud.cos_migrate_tool.record.MigrateCopyBucketRecordElement;
import com.qcloud.cos_migrate_tool.record.RecordDb;
Expand All @@ -22,17 +23,16 @@ public class MigrateCopyBucketTask extends Task {
private final long srcSize;
private final String srcEtag;

public MigrateCopyBucketTask(Semaphore semaphore, TransferManager smallFileTransfer,
TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb, COSClient srcCOSClient,
String destRegion, String destBucketName, String destKey, String srcRegion,
String srcBucketName, String srcKey, long srcSize, String srcEtag) {
super(semaphore, smallFileTransfer, bigFileTransfer, smallFileThreshold, recordDb);
public MigrateCopyBucketTask(Semaphore semaphore, CopyBucketConfig config,
TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb,
COSClient srcCOSClient, String srcKey, long srcSize, String srcEtag, String destKey) {
super(semaphore, config, smallFileTransfer, bigFileTransfer, recordDb);
this.srcCOSClient = srcCOSClient;
this.destRegion = destRegion;
this.destBucketName = destBucketName;
this.destRegion = config.getRegion();
this.destBucketName = config.getBucketName();
this.destKey = destKey;
this.srcRegion = srcRegion;
this.srcBucketName = srcBucketName;
this.srcRegion = config.getSrcRegion();
this.srcBucketName = config.getSrcBucket();
this.srcKey = srcKey;
this.srcSize = srcSize;
this.srcEtag = srcEtag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ public void buildTask() {
String keyName = srcKey.substring(lastDelimiter);
String copyDestKey = config.getCosPath() + keyName;

MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore,
smallFileTransferManager, bigFileTransferManager,
config.getSmallFileThreshold(), recordDb, srcCosClient,
config.getRegion(), config.getBucketName(), copyDestKey, srcRegion,
srcBucketName, srcKey, srcSize, srcEtag);
MigrateCopyBucketTask task =
new MigrateCopyBucketTask(semaphore, (CopyBucketConfig) config,
smallFileTransferManager, bigFileTransferManager, recordDb,
srcCosClient, srcKey, srcSize, srcEtag, copyDestKey);
AddTask(task);
}
if (!objectListing.isTruncated()) {
Expand Down
Loading

0 comments on commit 34b3788

Please sign in to comment.