diff --git a/README b/README new file mode 100644 index 0000000..8bc24d7 --- /dev/null +++ b/README @@ -0,0 +1,29 @@ +# 迁移工具 + +## 功能说明 + +迁移工具集成了有关COS数据迁移的功能, 目前支持以下四大类迁移 +- 本地数据迁移到COS, 功能同之前的本地同步工具 +- 友商数据迁移到COS, 目前支持aws s3, 阿里云oss, 七牛存储 +- 根据url下载列表进行下载迁移 +- COS的bucket数据相互复制, 支持跨账号跨地域的数据复制 + +## 运行依赖 +- JDK1.7或以上, 有关JDK的安装请参考[JAVA安装与配置](https://cloud.tencent.com/document/product/436/10865) +- linux或windows环境, 推荐linux + +# 使用范例 +1 配置全部通过配置文件读入 +sh start_migrate.sh +2 指定部分配置项以命令行为主. +sh start_migrate.sh -DmigrateLocal.localPath=/test_data/aaa/ -Dcommon.cosPath=/aaa +sh start_migrate.sh -DmigrateAws.prefix=/test_data/bbb/ -Dcommon.cosPath=/bbb + +## 迁移机制 + +迁移工具是有状态的,已经迁移成功的会记录在db目录下,以KV的形式存储在leveldb文件中. +每次迁移前对要迁移的路径, 先查找下DB中是否存在, 如果存在,且属性和db中存在的一致, 则跳过迁移, 否则进行迁移。这里的属性根据迁移类型的不同而不同,对于本地迁移,会判断mtime。对于友商与bucket复制,会判断源文件的etag和长度是否与db一致。 +因此,我们参照的db中是否有过迁移成功的记录,而不是查找COS,如果绕过了迁移工具,通过别的方式(比如coscmd或者控制台)删除修改了文件,那么运行迁移工具由于不会察觉到这种变化,是不会重新迁移的。 + +## 其他 +请参照COS迁移工具官网文档 \ No newline at end of file diff --git a/conf/config.ini b/conf/config.ini new file mode 100644 index 0000000..5d6b50d --- /dev/null +++ b/conf/config.ini @@ -0,0 +1,117 @@ +# 配置迁移类型 +# 目前支持四大类, 这里的存储类型和之后的分节名称一致 +# 1 从本地迁移, migrateLocal(本地迁移工具, 同之前的本地同步工具) +# 2 从友商迁移, migrateAws(从aws迁移), migrateAli(从阿里迁移), migrateQiniu(从七牛迁移), +# 3 从url列表迁移, migrateUrl(这些url都是可以直接下载的,将要迁移的url放到一个文件或者多个文件里) +# 4 COS的bucket复制. migrateBucketCopy(将COS一个bucket下的数据复制到另外一个bucket, 支持跨账号跨地域,前提是账户需要对源bucket源bucket有可读权限,对目的bucket有putObjectCopy权限) +[migrateType] +type=migrateLocal + +# 迁移工具的公共配置分节,包含了要迁移到得目的COS的账户信息 +[common] +# 用户的秘钥 secret_id (可在 https://console.qcloud.com/capi 查看) +secretId=AKIDXXXXXXXXXXXXXXXXX +# 用户的秘钥 secret_key (可在 https://console.qcloud.com/capi 查看) +secretKey=gcYYYYYYYYYYYYYYYYYYYYYY +# 目的Bucket的名称, 命名规则为{name}-{appid},即bucket名必须包含appid, 例如movie-1251000000 +bucketName=mybucket-1251600000 +# 目的bucket的region信息. COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224 +region=ap-guangzhou +# 存储类型, 标准(Standard), 低频(Standard_IA) +storageClass=Standard +# 要迁移到的cos路径, /表示迁移到bucket的根路径下, /aaa/bbb/表示要迁移到bucket的/aaa/bbb/下面, 如果/aaa/bbb/不存在,则会自动建立 +cosPath=/ +# 是否使用HTTPS传输(传输速度较慢,适用于对传输安全要求高的场景), on开启, off关闭 +https=off +#临时目录,用于运行过程中,临时文件的存储, 主要用于友商数据迁移到COS, 因为迁移会现将数据下载到临时目录,再进行上传后删除.对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c +tmpFolder=E:\\code\\java\\workspace\\cos_migrate_tool\\tmp +# 小文件阈值的字节,大于等于这个阈值使用分块上传,否则使用简单上传, 默认5MB +smallFileThreshold=5242880 +# 小文件(文件小于smallFileThreshold)的并发度,使用简单上传 +smallFileExecutorNum=64 +# 大文件(文件大于等于smallFileThreshold)的并发度,使用分块上传 +bigFileExecutorNum=8 +# 表示迁移工具将全文的MD5计算后,存入文件的自定义头部x-cos-meta-md5中, 用于后续的校验,因为COS的分块上传的大文件的etag不是全文的md5 +entireFileMd5Attached=on +# 表示是否启用damon模式,damon表示程序会循环不停的去执行同步,每一轮同步的间隔由damonModeInterVal参数设置 +# 如果启用damon模式, 则设置为on, 否则为off +daemonMode=off +# 表示每一轮同步结束后,多久进行下一轮同步,单位为秒 +daemonModeInterVal=60 + + +# 从本地迁移到COS配置分节 +[migrateLocal] +# 本地路径, 表示将该路径下的数据都迁移到COS, 对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c +localPath=E:\\code\\java\\workspace\\cos_migrate_tool\\test_data +# 要排除的目录或者文件的绝对路径, 表示将localPath下面某些目录或者文件不进行迁移,多个绝对路径之前用分号分割,不填表示localpath下面的全部迁移 +exeludes= + + +## 从阿里迁移到COS的配置分节 +[migrateAli] +bucket=mubucket-test +accessKeyId=xxxxxxxxxx +accessKeySecret=yyyyyyyyyyy +#友商的地址 +endPoint=oss-cn-shenzhen.aliyuncs.com +# 要迁移的路径的前缀, 如果是迁移bucket下所有的数据, 则prefix为空 +prefix= +# 如果要使用代理进行访问,则填写代理IP地址 +proxyHost= +# 代理的端口 +proxyPort= + + +# 从七牛迁移到COS +[migrateQiniu] +# 从七牛迁移到COS的配置分节 +# 七牛的bucket名称 +bucket=assda +# 七牛的账户信息 +accessKeyId=xxxxxxxxxx +accessKeySecret=yyyyyyyyyyyyyyyy +# 七牛的下载地址, 对应downloadDomain +endPoint=wwww.bkt.clouddn.com +# 要迁移的路径的前缀, 如果是迁移bucket下所有的数据, 则prefix为空 +prefix= +# 如果要使用代理进行访问,则填写代理地址 +proxyHost= +# 代理端口 +proxyPort= + +# 从通过URL列表进行迁移,URL列表里面填写的源文件的下载路径,该路径可以直接下载下来,迁移工具会将其再上传到COS上去 +[migrateUrl] +# 使用url列表迁移,如果urllistPath填的是目录,那么就会把这个目录下所有文件都当作urllist文件去扫描迁移 +# 对于linux绝对路径, 如/a/b/c, 对于windows绝对路径,注意分隔符为两个反斜杠,如E:\\a\\b\\c +urllistPath=/data/mydata/url + +## 从AWS迁移到COS的配置分节 +[migrateAws] +# aws的bucket +bucket=aws-emr-test +accessKeyId=xxxxxxxx +accessKeySecret=yyyyyyyyyy +# aws的endpoint地址 +endPoint=s3.us-east-1.amazonaws.com +# 要迁移的路径的前缀, 如果是迁移所有的,则prefix为空 +prefix= +# 迁移的代理地址 +proxyHost= +# 代理端口 +proxyPort= + +# bucket copy +[migrateBucketCopy] +# 源 bucket的region信息. COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224 +srcRegion=ap-shanghai +# 源 Bucket的名称, 命名规则为{name}-{appid},即bucket名必须包含appid, 例如movie-1251000000 +srcBucketName=mysrcbucket-1251668555 +# 源bucket隶属的用户的秘钥 secret_id (可在 https://console.qcloud.com/capi 查看) +# 因为bucket copy支持跨账号,所以如果是另外一个账户的数据,则srcSecretId和common中的secretId不同 +# 如果是同一客户的数据, 则srcSecretId和common中的secretId相同 +srcSecretId=xxxxxxxxxxx +# 源bucket隶属的用户的秘钥 secret_key(可在 https://console.qcloud.com/capi 查看) +srcSecretKey=yyyyyyyyyyyyyyyy +# 要迁移的cos路径的 +srcCosPath=/ diff --git a/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar new file mode 100644 index 0000000..878e1a7 Binary files /dev/null and b/dep/cos_migrate_tool-1.0-jar-with-dependencies.jar differ diff --git a/opbin/rebuild.bat b/opbin/rebuild.bat new file mode 100644 index 0000000..4587904 --- /dev/null +++ b/opbin/rebuild.bat @@ -0,0 +1,12 @@ +:: 双击运行脚本, 生成的可执行程序在上一层目录的dep下 +:: 此脚本用于重新编译生成迁移工具的JAR包 +:: 依赖mvn, 请确保网络畅通, 可以连接mvn仓库 +@echo off +set cur_dir=%CD% +cd %cur_dir% +cd .. +call mvn clean compile assembly:single +move /Y target\*.jar dep\ +rd /s /q target +echo "rebuild over!" +pause>nul diff --git a/opbin/rebuild.sh b/opbin/rebuild.sh new file mode 100644 index 0000000..d07dbed --- /dev/null +++ b/opbin/rebuild.sh @@ -0,0 +1,10 @@ +#!/bin/bash +# 此脚本用于重新编译生成同步的JAR包 +# 依赖mvn, 请确保网络畅通, 可以连接mvn仓库 +cur_dir=$(cd `dirname $0`;pwd) +cd ${cur_dir} +cd .. +mvn clean compile assembly:single +mv target/*.jar dep/ +rm -rf target +cd - diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..b4881f2 --- /dev/null +++ b/pom.xml @@ -0,0 +1,119 @@ + + 4.0.0 + com.qcloud + cos_migrate_tool + 1.0 + jar + + UTF-8 + + + + + + com.qiniu + qiniu-java-sdk + 7.2.11 + + + com.squareup.okhttp3 + okhttp + 3.3.1 + compile + + + com.google.code.gson + gson + 2.6.2 + compile + + + com.qiniu + happy-dns-java + 0.1.4 + compile + + + + com.qcloud + cos_api + 5.4.2 + + + + org.ini4j + ini4j + 0.5.4 + + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + + org.iq80.leveldb + leveldb + 0.9 + + + + + com.fasterxml.jackson.core + jackson-databind + 2.9.4 + + + com.fasterxml.jackson.core + jackson-core + + + + + + com.aliyun.oss + aliyun-sdk-oss + 2.8.3 + + + org.apache.httpcomponents + httpclient + + + + + + com.amazonaws + aws-java-sdk-s3 + 1.11.271 + + + com.fasterxml.jackson.core + jackson-databind + + + + + + + + + + + maven-assembly-plugin + + + + com.qcloud.cos_migrate_tool.app.App + + + + jar-with-dependencies + + + + + + \ No newline at end of file diff --git a/src/main/java/com/qcloud/cos_migrate_tool/app/App.java b/src/main/java/com/qcloud/cos_migrate_tool/app/App.java new file mode 100644 index 0000000..6a10ec8 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/app/App.java @@ -0,0 +1,74 @@ +package com.qcloud.cos_migrate_tool.app; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos_migrate_tool.config.CommonConfig; +import com.qcloud.cos_migrate_tool.config.ConfigParser; +import com.qcloud.cos_migrate_tool.config.CopyBucketConfig; +import com.qcloud.cos_migrate_tool.config.CopyFromAliConfig; +import com.qcloud.cos_migrate_tool.config.CopyFromAwsConfig; +import com.qcloud.cos_migrate_tool.config.CopyFromLocalConfig; +import com.qcloud.cos_migrate_tool.config.CopyFromQiniuConfig; +import com.qcloud.cos_migrate_tool.config.CopyFromUrllistConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.task.MigrateAliTaskExecutor; +import com.qcloud.cos_migrate_tool.task.MigrateAwsTaskExecutor; +import com.qcloud.cos_migrate_tool.task.MigrateCopyBucketTaskExecutor; +import com.qcloud.cos_migrate_tool.task.MigrateLocalTaskExecutor; +import com.qcloud.cos_migrate_tool.task.MigrateQiniuTaskExecutor; +import com.qcloud.cos_migrate_tool.task.MigrateUrllistTaskExecutor; +import com.qcloud.cos_migrate_tool.task.TaskExecutor; + +public class App { + + private static final Logger log = LoggerFactory.getLogger(App.class); + + private static TaskExecutor buildTaskExecutor(CommonConfig config) { + if (ConfigParser.instance.getMigrateType().equals(MigrateType.MIGRATE_FROM_LOCAL)) { + return new MigrateLocalTaskExecutor((CopyFromLocalConfig) config); + } else if (ConfigParser.instance.getMigrateType().equals(MigrateType.MIGRATE_FROM_ALI)) { + return new MigrateAliTaskExecutor((CopyFromAliConfig) config); + } else if (ConfigParser.instance.getMigrateType().equals(MigrateType.MIGRATE_FROM_AWS)) { + return new MigrateAwsTaskExecutor((CopyFromAwsConfig) config); + } else if (ConfigParser.instance.getMigrateType() + .equals(MigrateType.MIGRATE_FROM_COS_BUCKET_COPY)) { + return new MigrateCopyBucketTaskExecutor((CopyBucketConfig) config); + } else if (ConfigParser.instance.getMigrateType() + .equals(MigrateType.MIGRATE_FROM_URLLIST)) { + return new MigrateUrllistTaskExecutor((CopyFromUrllistConfig) config); + } else if (ConfigParser.instance.getMigrateType().equals(MigrateType.MIGRATE_FROM_QINIU)) { + return new MigrateQiniuTaskExecutor((CopyFromQiniuConfig) config); + } else { + System.out.println("unknown migrate type"); + } + + return null; + } + + public static void main(String[] args) { + if (!ConfigParser.instance.parse()) { + return; + } + + CommonConfig config = ConfigParser.instance.getConfig(); + while (true) { + TaskStatics.instance.reset(); + + TaskExecutor taskExecutor = buildTaskExecutor(config); + taskExecutor.run(); + taskExecutor.waitTaskOver(); + + if (!config.isDamonMode()) + break; + + try { + Thread.sleep(config.getDamonInterVal() * 1000); + } catch (InterruptedException e) { + log.error("the program is interrupted!", e); + break; + } + } + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java new file mode 100644 index 0000000..f644d4d --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java @@ -0,0 +1,258 @@ +package com.qcloud.cos_migrate_tool.config; + +import java.io.File; +import java.util.regex.Pattern; + +import com.qcloud.cos.model.StorageClass; +import com.qcloud.cos_migrate_tool.utils.PathUtils; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +/** + * @author chengwu 定义common配置, 比如tempfile, COS的账户信息 + */ +public class CommonConfig { + + private String tempFolderPath; + private String bucketName; + private String region; + private String ak; + private String sk; + private String cosPath; + private boolean enableHttps; + private boolean entireFileMd5Attached; + private int taskExecutorNumber = 64; + private StorageClass storageClass = StorageClass.Standard; + private int smallFileExecutorNumber = 64; + private int bigFileExecutorNum = 4; + private long smallFileThreshold = 5 * 1024 * 1024; + private boolean damonMode = false; + private long damonInterVal = 60; + + public String getTempFolderPath() { + return tempFolderPath; + } + + public void setTempFileFolder(String tempFolderPath) { + tempFolderPath = tempFolderPath.trim(); + File tempFolder = new File(tempFolderPath); + if (!tempFolder.exists()) { + throw new IllegalArgumentException("tempFolderPath " + tempFolderPath + " not exist!"); + } + + if (tempFolderPath.endsWith("/") || tempFolderPath.endsWith("\\")) { + this.tempFolderPath = tempFolderPath; + } else { + this.tempFolderPath = tempFolderPath + "/"; + } + this.tempFolderPath = SystemUtils.formatLocalPath(this.tempFolderPath); + } + + public String getBucketName() { + return bucketName; + } + + public void setBucketName(String bucketName) { + bucketName = bucketName.trim(); + String parrtern = ".*-(125|100|20)[0-9]{3,}$"; + if (!Pattern.matches(parrtern, bucketName)) { + throw new IllegalArgumentException( + "bucketName must contain appid. example: test-1250001000"); + } + this.bucketName = bucketName; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) throws IllegalArgumentException { + if (region.trim().isEmpty()) { + throw new IllegalArgumentException("region value is missing"); + } + this.region = region.trim(); + } + + public String getAk() { + return ak; + } + + public void setAk(String ak) throws IllegalArgumentException { + ak = ak.trim(); + if (ak.isEmpty()) { + throw new IllegalArgumentException("secretId is missing"); + } + this.ak = ak; + } + + public String getSk() { + return sk; + } + + public void setSk(String sk) { + sk = sk.trim(); + if (sk.isEmpty()) { + throw new IllegalArgumentException("secretValue is missing"); + } + this.sk = sk; + } + + public String getCosPath() { + return cosPath; + } + + public void setCosPath(String cosPath) throws IllegalArgumentException { + if (!cosPath.startsWith("/")) { + throw new IllegalArgumentException("cospath must start with /"); + } + this.cosPath = PathUtils.formatCosFolderPath(cosPath); + } + + public boolean isEnableHttps() { + return enableHttps; + } + + public void setEnableHttps(String enableHttpsStr) throws IllegalArgumentException { + if (enableHttpsStr.equalsIgnoreCase("on")) { + this.enableHttps = true; + } else if (enableHttpsStr.equalsIgnoreCase("off")) { + this.enableHttps = false; + } else { + throw new IllegalArgumentException("invalid https config. only support on/off"); + } + } + + public void setTaskExecutorNumberStr(String taskExecutorNumberStr) + throws IllegalArgumentException { + taskExecutorNumberStr = taskExecutorNumberStr.trim(); + try { + int number = Integer.valueOf(taskExecutorNumberStr); + if (number <= 0) { + throw new IllegalArgumentException("taskExecutorNumber must be greater than 0"); + } + this.taskExecutorNumber = number; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid taskExecutorNumber"); + } + } + + public int getTaskExecutorNumber() { + return taskExecutorNumber; + } + + public StorageClass getStorageClass() { + return storageClass; + } + + public void setStorageClass(String storageClassStr) throws IllegalArgumentException { + try { + this.storageClass = StorageClass.valueOf(storageClassStr); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(storageClassStr + " is invalid storage class!"); + } + } + + + public int getSmallFileExecutorNumber() { + return smallFileExecutorNumber; + } + + public void setSmallFileUploadExecutorNum(String smallFileExecutorNumStr) + throws IllegalArgumentException { + smallFileExecutorNumStr = smallFileExecutorNumStr.trim(); + try { + int number = Integer.valueOf(smallFileExecutorNumStr); + if (number <= 0 && number > 1024) { + throw new IllegalArgumentException("legal smallFileExecutorNum is [1, 1024] "); + } + this.smallFileExecutorNumber = number; + this.taskExecutorNumber = this.smallFileExecutorNumber + this.bigFileExecutorNum; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid smallFileExecutorNum"); + } + } + + public int getBigFileExecutorNum() { + return bigFileExecutorNum; + } + + public void setBigFileUploadExecutorNum(String bigFileExecutorNumStr) { + bigFileExecutorNumStr = bigFileExecutorNumStr.trim(); + try { + int number = Integer.valueOf(bigFileExecutorNumStr); + if (number <= 0 && number > 64) { + throw new IllegalArgumentException("legal bigFileExecutorNum is [1, 64] "); + } + this.bigFileExecutorNum = number; + this.taskExecutorNumber = this.smallFileExecutorNumber + this.bigFileExecutorNum; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid bigFileExecutorNum"); + } + } + + public void setSmallFileThreshold(String smallFileThresholdStr) { + smallFileThresholdStr = smallFileThresholdStr.trim(); + try { + int number = Integer.valueOf(smallFileThresholdStr); + final long minSmallFile = 5 * 1024 * 1024; // 最小5MB + final long maxSmallFile = 5 * 1024 * 1024 * 1024; // 最大5GB + if (number < minSmallFile && number > maxSmallFile) { + throw new IllegalArgumentException(String.format( + "legal smallFileThreshold is [%d, %d], 5MB ~ 5GB", minSmallFile, maxSmallFile)); + } + this.smallFileThreshold = number; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid smallFileThreshold"); + } + } + + public long getSmallFileThreshold() { + return smallFileThreshold; + } + + public boolean isEntireFileMd5Attached() { + return entireFileMd5Attached; + } + + public void setEntireFileMd5Attached(String entireFileMd5AttachedStr) { + if (entireFileMd5AttachedStr.equalsIgnoreCase("on")) { + this.entireFileMd5Attached = true; + } else if (entireFileMd5AttachedStr.equalsIgnoreCase("off")) { + this.entireFileMd5Attached = false; + } else { + throw new IllegalArgumentException( + "invalid entireFileMd5Attached config. only support on/off"); + } + } + + public void setDaemonMode(String daemonModeStr) { + if (daemonModeStr.equalsIgnoreCase("on")) { + this.damonMode = true; + } else if (daemonModeStr.equalsIgnoreCase("off")) { + this.damonMode = false; + } else { + throw new IllegalArgumentException( + "invalid daemonMode config. only support on/off"); + } + } + + public void setDaemonModeInterVal(String daemonModeStr) { + daemonModeStr = daemonModeStr.trim(); + try { + int number = Integer.valueOf(daemonModeStr); + if (number <= 0) { + throw new IllegalArgumentException("damonInterVal must be greater than 0"); + } + this.damonInterVal = number; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid damonInterVal"); + } + } + + public boolean isDamonMode() { + return damonMode; + } + + public long getDamonInterVal() { + return damonInterVal; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java new file mode 100644 index 0000000..a8744a1 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java @@ -0,0 +1,610 @@ +package com.qcloud.cos_migrate_tool.config; + +import java.io.File; +import java.io.IOException; +import java.util.prefs.BackingStoreException; +import java.util.prefs.Preferences; + +import org.ini4j.Ini; +import org.ini4j.IniPreferences; +import org.ini4j.InvalidFileFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigParser { + public static final ConfigParser instance = new ConfigParser(); + + private static final Logger log = LoggerFactory.getLogger(ConfigParser.class); + + private MigrateType migrateType = null; + + private Ini ini; + private static final String configFilePath = "conf/config.ini"; + + private static final String MIGRATE_TYPE_SECTION_NAME = "migrateType"; + private static final String MIGRATE_TYPE = "type"; + + private static final String COMMON_SECTION_NAME = "common"; + private static final String COMMON_REGION = "region"; + private static final String COMMON_BUCKETNAME = "bucketName"; + private static final String COMMON_AK = "secretId"; + private static final String COMMON_SK = "secretKey"; + private static final String COMMON_COSPATH = "cosPath"; + private static final String COMMON_HTTPS = "https"; + private static final String COMMON_TMP = "tmpFolder"; + private static final String COMMON_SMALL_FILE_ThRESHOLD = "smallFileThreshold"; + private static final String COMMON_STORAGE_CLASS = "storageClass"; + private static final String COMMON_SMALL_FILE_EXECUTOR_NUM = "smallFileExecutorNum"; + private static final String COMMON_BIG_FILE_EXECUTOR_NUM = "bigFileExecutorNum"; + private static final String COMMON_ENTIRE_FILE_MD5_ATTACHED = "entireFileMd5Attached"; + private static final String COMMON_DAEMON_MODE = "daemonMode"; + private static final String COMMON_DAEMON_MODE_INTERVAL = "daemonModeInterVal"; + + private static final String LOCAL_SECTION_NAME = "migrateLocal"; + private static final String LOCAL_LOCALPATH = "localPath"; + private static final String LOCAL_EXECLUDE = "exeludes"; + + private static final String ALI_SECTION_NAME = "migrateAli"; + private static final String AWS_SECTION_NAME = "migrateAws"; + private static final String QINIU_SECTION_NAME = "migrateQiniu"; + private static final String OSS_BUCKET = "bucket"; + private static final String OSS_AK = "accessKeyId"; + private static final String OSS_SK = "accessKeySecret"; + private static final String OSS_PREFIX = "prefix"; + private static final String OSS_END_POINT = "endPoint"; + private static final String OSS_PROXY_HOST = "proxyHost"; + private static final String OSS_PROXY_PORT = "proxyPort"; + + private static final String COPY_BUCKET_SECTION_NAME = "migrateBucketCopy"; + private static final String COPY_SRC_REGION = "srcRegion"; + private static final String COPY_SRC_BUCKETNAME = "srcBucketName"; + private static final String COPY_SRC_SECRETID = "srcSecretId"; + private static final String COPY_SRC_SECRETKEY = "srcSecretKey"; + private static final String COPY_SRC_COSPATH = "srcCosPath"; + + private static final String URLLIST_SECTION_NAME = "migrateUrl"; + private static final String URLLIST_PATH = "urllistPath"; + + private CommonConfig config; + + private ConfigParser() {} + + public MigrateType getMigrateType() { + return migrateType; + } + + public CommonConfig getConfig() { + return config; + } + + private Preferences buildConfigPrefs() { + File configFile = new File(configFilePath); + if (!configFile.exists()) { + String errMsg = String.format("config %s not exist or readable!", configFilePath); + System.err.println(errMsg); + log.error(errMsg); + return null; + } + + try { + ini = new Ini(configFile); + } catch (InvalidFileFormatException e) { + String errMsg = String.format("invalid config format. config %s", configFilePath); + System.err.println(errMsg); + log.error(errMsg, e); + return null; + } catch (IOException e) { + String errMsg = String.format("read config failed. config %s", configFilePath); + System.err.println(errMsg); + log.error(errMsg, e); + return null; + } + + return new IniPreferences(ini); + } + + private boolean isNodeExist(Preferences preferences, String nodeName) { + try { + if (!preferences.nodeExists(MIGRATE_TYPE_SECTION_NAME)) { + String errMsg = String.format("section %s miss! you mustn't delete it", + MIGRATE_TYPE_SECTION_NAME); + System.err.println(errMsg); + log.error(errMsg); + return false; + } + } catch (BackingStoreException e) { + String errMsg = String.format("check section %s failed!", MIGRATE_TYPE_SECTION_NAME); + System.err.println(errMsg); + log.error(errMsg, e); + return false; + } + return true; + } + + private boolean isKeyExist(Preferences preferences, String sectionName, String key) { + if (!isNodeExist(preferences, sectionName) && !isPropExist(sectionName, key)) { + return false; + } + String configValue = getConfigValue(preferences, sectionName, key); + if (configValue == null) { + String errMsg = + String.format("config key miss. section: %s, key: %s", sectionName, key); + System.err.println(errMsg); + log.error(errMsg); + return false; + } + return true; + } + + private boolean isPropExist(String sectionName, String key) { + String cmdKey = sectionName + "." + key; + return System.getProperty(cmdKey) != null; + } + + private String getConfigValue(Preferences preferences, String sectionName, String key) { + String configFileValue = preferences.node(sectionName).get(key, null); + String configCmdValue = System.getProperty(sectionName + "." + key); + if (configCmdValue != null) { + return configCmdValue.trim(); + } + if (configFileValue != null) { + return configFileValue.trim(); + } + return null; + } + + public boolean parse() { + + Preferences prefs = buildConfigPrefs(); + if (prefs == null) { + return false; + } + + if (!checkMigrateTypeConfig(prefs)) { + return false; + } + + if (!initMigrateType(prefs)) { + return false; + } + + if (!checkCommonConfig(prefs)) { + return false; + } + + if (migrateType.equals(MigrateType.MIGRATE_FROM_LOCAL)) { + if (!checkMigrateLocalConfig(prefs)) { + return false; + } + config = new CopyFromLocalConfig(); + if (!initCopyFromLocalConfig(prefs, (CopyFromLocalConfig) config)) { + return false; + } + + } else if (migrateType.equals(MigrateType.MIGRATE_FROM_ALI)) { + // ali copy + if (!checkMigrateCompetitorConfig(prefs, MigrateType.MIGRATE_FROM_ALI)) { + return false; + } + config = new CopyFromAliConfig(); + if (!initCopyFromAliConfig(prefs, (CopyFromAliConfig) config)) { + return false; + } + + } else if (migrateType.equals(MigrateType.MIGRATE_FROM_AWS)) { + // aws copy + if (!checkMigrateCompetitorConfig(prefs, MigrateType.MIGRATE_FROM_AWS)) { + return false; + } + config = new CopyFromAwsConfig(); + if (!initCopyFromAwsConfig(prefs, (CopyFromAwsConfig) config)) { + return false; + } + + } else if (migrateType.equals(MigrateType.MIGRATE_FROM_QINIU)) { + // qiniu copy + if (!checkMigrateCompetitorConfig(prefs, MigrateType.MIGRATE_FROM_QINIU)) { + return false; + } + config = new CopyFromQiniuConfig(); + if (!initCopyFromQiniuConfig(prefs, (CopyFromQiniuConfig) config)) { + return false; + } + + } else if (migrateType.equals(MigrateType.MIGRATE_FROM_COS_BUCKET_COPY)) { + // bucket copy + if (!checkMigrateCopyBucketConfig(prefs)) { + return false; + } + config = new CopyBucketConfig(); + if (!initCopyBucketConfig(prefs, (CopyBucketConfig) config)) { + return false; + } + } else if (migrateType.equals(MigrateType.MIGRATE_FROM_URLLIST)) { + // url copy + config = new CopyFromUrllistConfig(); + if (!initCopyFromUrllistConfig(prefs, (CopyFromUrllistConfig) config)) { + return false; + } + } + return true; + } + + private boolean checkMigrateTypeConfig(Preferences prefs) { + if (!isKeyExist(prefs, MIGRATE_TYPE_SECTION_NAME, MIGRATE_TYPE)) { + return false; + } + return true; + } + + private boolean initMigrateType(Preferences prefs) { + try { + String migrateTypeStr = getConfigValue(prefs, MIGRATE_TYPE_SECTION_NAME, MIGRATE_TYPE); + assert (migrateTypeStr != null); + migrateType = MigrateType.fromValue(migrateTypeStr); + } catch (IllegalArgumentException e) { + String errMsg = String.format("invalid config. section:%s, key:%s", + MIGRATE_TYPE_SECTION_NAME, MIGRATE_TYPE); + System.err.println(errMsg); + log.error(errMsg); + return false; + } + return true; + } + + private boolean checkCommonConfig(Preferences prefs) { + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_REGION)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_BUCKETNAME)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_AK)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_SK)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_COSPATH)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_HTTPS)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_TMP)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_SMALL_FILE_ThRESHOLD)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_SMALL_FILE_EXECUTOR_NUM)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_BIG_FILE_EXECUTOR_NUM)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_STORAGE_CLASS)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_ENTIRE_FILE_MD5_ATTACHED)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE)) { + return false; + } + + if (!isKeyExist(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE_INTERVAL)) { + return false; + } + return true; + } + + private boolean checkMigrateLocalConfig(Preferences prefs) { + if (!isKeyExist(prefs, LOCAL_SECTION_NAME, LOCAL_LOCALPATH)) { + return false; + } + return true; + } + + private boolean checkMigrateCopyBucketConfig(Preferences prefs) { + if (!isKeyExist(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_REGION)) { + return false; + } + if (!isKeyExist(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_BUCKETNAME)) { + return false; + } + if (!isKeyExist(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_SECRETID)) { + return false; + } + if (!isKeyExist(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_SECRETKEY)) { + return false; + } + if (!isKeyExist(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_COSPATH)) { + return false; + } + return true; + } + + private boolean checkMigrateCompetitorConfig(Preferences prefs, MigrateType migrateType) { + String sectionName = migrateType.toString(); + if (!isKeyExist(prefs, sectionName, OSS_BUCKET)) { + return false; + } + if (!isKeyExist(prefs, sectionName, OSS_AK)) { + return false; + } + if (!isKeyExist(prefs, sectionName, OSS_SK)) { + return false; + } + if (!isKeyExist(prefs, sectionName, OSS_END_POINT)) { + return false; + } + return true; + } + + private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) { + + try { + String region = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_REGION); + assert (region != null); + commonConfig.setRegion(region); + + String bucketName = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_BUCKETNAME); + assert (bucketName != null); + commonConfig.setBucketName(bucketName); + + String ak = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_AK); + assert (ak != null); + commonConfig.setAk(ak); + + String sk = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_SK); + assert (sk != null); + commonConfig.setSk(sk); + + String cosPathConfig = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_COSPATH); + assert (cosPathConfig != null); + commonConfig.setCosPath(cosPathConfig); + + String enableHttpsStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_HTTPS); + assert (enableHttpsStr != null); + commonConfig.setEnableHttps(enableHttpsStr); + + String tmpFolder = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_TMP); + assert (tmpFolder != null); + commonConfig.setTempFileFolder(tmpFolder); + + String smallFileThreshold = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_SMALL_FILE_ThRESHOLD); + assert (smallFileThreshold != null); + commonConfig.setSmallFileThreshold(smallFileThreshold); + + String smallFileExecutorNumberStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_SMALL_FILE_EXECUTOR_NUM); + assert (smallFileExecutorNumberStr != null); + commonConfig.setSmallFileUploadExecutorNum(smallFileExecutorNumberStr); + + String bigFileExecutorNumberStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_BIG_FILE_EXECUTOR_NUM); + assert (bigFileExecutorNumberStr != null); + commonConfig.setBigFileUploadExecutorNum(bigFileExecutorNumberStr); + + String storageClassStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_STORAGE_CLASS); + assert (storageClassStr != null); + commonConfig.setStorageClass(storageClassStr); + + + String entireFileMd5AttachedStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_ENTIRE_FILE_MD5_ATTACHED); + assert (entireFileMd5AttachedStr != null); + commonConfig.setEntireFileMd5Attached(entireFileMd5AttachedStr); + + String daemonModeStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE); + assert (daemonModeStr != null); + commonConfig.setDaemonMode(daemonModeStr); + + String daemonModeInterValStr = + getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_DAEMON_MODE_INTERVAL); + assert (daemonModeInterValStr != null); + commonConfig.setDaemonModeInterVal(daemonModeInterValStr); + + } catch (Exception e) { + System.err.println(e.getMessage()); + log.error(e.getMessage()); + return false; + } + return true; + + } + + private boolean initCopyFromLocalConfig(Preferences prefs, + CopyFromLocalConfig copyLocalConfig) { + if (!initCommonConfig(prefs, copyLocalConfig)) { + return false; + } + try { + + String localPathConfig = getConfigValue(prefs, LOCAL_SECTION_NAME, LOCAL_LOCALPATH); + assert (localPathConfig != null); + copyLocalConfig.setLocalPath(localPathConfig); + + String excludes = getConfigValue(prefs, LOCAL_SECTION_NAME, LOCAL_EXECLUDE); + if (excludes != null && !excludes.trim().isEmpty()) { + copyLocalConfig.setExcludes(excludes); + } + + } catch (Exception e) { + System.err.println(e.getMessage()); + log.error(e.getMessage()); + return false; + } + return true; + } + + private boolean initCopyFromUrllistConfig(Preferences prefs, + CopyFromUrllistConfig copyUrllistConfig) { + if (!initCommonConfig(prefs, copyUrllistConfig)) { + return false; + } + try { + + String urllistPath = getConfigValue(prefs, URLLIST_SECTION_NAME, URLLIST_PATH); + assert (urllistPath != null); + copyUrllistConfig.setUrllistPath(urllistPath); + } catch (Exception e) { + System.err.println(e.getMessage()); + log.error(e.getMessage()); + return false; + } + return true; + } + + private boolean initCopyFromQiniuConfig(Preferences prefs, + CopyFromQiniuConfig copyQiniuConfig) { + if (!initCommonConfig(prefs, copyQiniuConfig)) { + return false; + } + + if (!initCopyFromCompetitorConfig(prefs, copyQiniuConfig)) { + return false; + } + + return true; + } + + private boolean initCopyFromAwsConfig(Preferences prefs, CopyFromAwsConfig copyAwsConfig) { + if (!initCommonConfig(prefs, copyAwsConfig)) { + return false; + } + + if (!initCopyFromCompetitorConfig(prefs, copyAwsConfig)) { + return false; + } + + return true; + } + + private boolean initCopyFromAliConfig(Preferences prefs, CopyFromAliConfig copyAliConfig) { + if (!initCommonConfig(prefs, copyAliConfig)) { + return false; + } + + if (!initCopyFromCompetitorConfig(prefs, copyAliConfig)) { + return false; + } + + return true; + } + + private boolean initCopyFromCompetitorConfig(Preferences prefs, + CopyFromCompetitorConfig copyOssConfig) { + + try { + String sectionName; + if (this.migrateType == MigrateType.MIGRATE_FROM_ALI) { + sectionName = ALI_SECTION_NAME; + } else if (this.migrateType == MigrateType.MIGRATE_FROM_AWS) { + sectionName = AWS_SECTION_NAME; + } else if (this.migrateType == MigrateType.MIGRATE_FROM_QINIU) { + sectionName = QINIU_SECTION_NAME; + } else { + log.error("unknow migrate type %s", migrateType.toString()); + return false; + } + + String prefixConfig = getConfigValue(prefs, sectionName, OSS_PREFIX); + if (prefixConfig != null) { + copyOssConfig.setSrcPrefix(prefixConfig); + } + + String bucket = getConfigValue(prefs, sectionName, OSS_BUCKET); + assert (bucket != null); + copyOssConfig.setSrcBucket(bucket); + + String accessKeyId = getConfigValue(prefs, sectionName, OSS_AK); + if (accessKeyId != null) { + copyOssConfig.setSrcAccessKeyId(accessKeyId); + } + + String accessKeySecret = getConfigValue(prefs, sectionName, OSS_SK); + if (accessKeySecret != null) { + copyOssConfig.setSrcAccessKeySecret(accessKeySecret); + } + + String endPoint = getConfigValue(prefs, sectionName, OSS_END_POINT); + // assert (endPoint != null); + if (endPoint != null) { + copyOssConfig.setEndpoint(endPoint); + } + + String proxyHost = getConfigValue(prefs, sectionName, OSS_PROXY_HOST); + copyOssConfig.setSrcProxyHost(proxyHost); + + int port = -1; + String portStr = getConfigValue(prefs, sectionName, OSS_PROXY_PORT); + + if (!portStr.isEmpty()) { + port = Integer.valueOf(portStr); + if (port > 0) { + copyOssConfig.setSrcProxyPort(port); + } else { + throw new Exception("invalid proxy port"); + } + } + + } catch (Exception e) { + System.err.println(e.getMessage()); + log.error(e.getMessage()); + return false; + } + return true; + } + + private boolean initCopyBucketConfig(Preferences prefs, CopyBucketConfig copyBucketConfig) { + if (!initCommonConfig(prefs, copyBucketConfig)) { + return false; + } + try { + String srcRegion = getConfigValue(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_REGION); + assert (srcRegion != null); + copyBucketConfig.setSrcRegion(srcRegion); + + String srcBucketName = + getConfigValue(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_BUCKETNAME); + assert (srcBucketName != null); + copyBucketConfig.setSrcBucket(srcBucketName); + + String srcSecretId = getConfigValue(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_SECRETID); + assert (srcSecretId != null); + copyBucketConfig.setSrcAk(srcSecretId); + + String srcSecretKey = + getConfigValue(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_SECRETKEY); + assert (srcSecretKey != null); + copyBucketConfig.setSrcSk(srcSecretKey); + + String srcCosPath = getConfigValue(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_COSPATH); + assert (srcCosPath != null); + copyBucketConfig.setSrcCosPath(srcCosPath); + + } catch (Exception e) { + System.err.println(e.getMessage()); + log.error(e.getMessage()); + return false; + } + return true; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyBucketConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyBucketConfig.java new file mode 100644 index 0000000..4d45fa3 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyBucketConfig.java @@ -0,0 +1,72 @@ +package com.qcloud.cos_migrate_tool.config; + +import java.util.regex.Pattern; + +import com.qcloud.cos_migrate_tool.utils.PathUtils; + +public class CopyBucketConfig extends CommonConfig { + private String srcRegion; + private String srcBucket; + private String srcAk; + private String srcSk; + private String srcCosPath; + + public String getSrcBucket() { + return srcBucket; + } + + public void setSrcBucket(String srcBucket) { + srcBucket = srcBucket.trim(); + String parrtern = ".*-(125|100)[0-9]{3,}$"; + if (!Pattern.matches(parrtern, srcBucket)) { + throw new IllegalArgumentException( + "SrcBucketName must contain appid. example: test-1250001000"); + } + this.srcBucket = srcBucket; + } + + public String getSrcRegion() { + return srcRegion; + } + + public void setSrcRegion(String srcRegion) { + if (srcRegion.trim().isEmpty()) { + throw new IllegalArgumentException("srcRegion value is missing"); + } + this.srcRegion = srcRegion.trim(); + } + + public String getSrcAk() { + return srcAk; + } + + public void setSrcAk(String srcAk) { + if (srcAk.trim().isEmpty()) { + throw new IllegalArgumentException("srcSecretId value is missing"); + } + this.srcAk = srcAk.trim(); + } + + public String getSrcSk() { + return srcSk; + } + + public void setSrcSk(String srcSk) { + if (srcSk.trim().isEmpty()) { + throw new IllegalArgumentException("srcSecretKey value is missing"); + } + this.srcSk = srcSk.trim(); + } + + public String getSrcCosPath() { + return srcCosPath; + } + + public void setSrcCosPath(String srcCosPath) { + if (!srcCosPath.startsWith("/")) { + throw new IllegalArgumentException("srcCosPath must start with /"); + } + this.srcCosPath = PathUtils.formatCosFolderPath(srcCosPath); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromAliConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromAliConfig.java new file mode 100644 index 0000000..ed9fd18 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromAliConfig.java @@ -0,0 +1,6 @@ +package com.qcloud.cos_migrate_tool.config; + + +public class CopyFromAliConfig extends CopyFromCompetitorConfig { + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromAwsConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromAwsConfig.java new file mode 100644 index 0000000..4814a40 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromAwsConfig.java @@ -0,0 +1,6 @@ +package com.qcloud.cos_migrate_tool.config; + + +public class CopyFromAwsConfig extends CopyFromCompetitorConfig { + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromCompetitorConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromCompetitorConfig.java new file mode 100644 index 0000000..27ac583 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromCompetitorConfig.java @@ -0,0 +1,89 @@ +package com.qcloud.cos_migrate_tool.config; + + +public class CopyFromCompetitorConfig extends CommonConfig { + private String srcPrefix = ""; + private String srcBucket; + private String srcEndpoint; + private String srcAccessKeyId; + private String srcAccessKeySecret; + + private String srcProxyHost = ""; + private int srcProxyPort = -1; + + public String getSrcProxyHost() { + return srcProxyHost; + } + + public void setSrcProxyHost(String proxyHost) { + this.srcProxyHost = proxyHost; + } + + public int getSrcProxyPort() { + return srcProxyPort; + } + + public void setSrcProxyPort(int proxyPort) { + this.srcProxyPort = proxyPort; + } + + public String getSrcPrefix() { + return srcPrefix; + } + + public void setSrcPrefix(String prefix) { + this.srcPrefix = prefix; + } + + + public String getSrcBucket() { + return srcBucket; + } + + public void setSrcBucket(String bucket) throws IllegalArgumentException { + if (bucket.isEmpty()) { + throw new IllegalArgumentException("bucket is empty"); + } + + this.srcBucket = bucket; + } + + public String getSrcEndpoint() { + return srcEndpoint; + } + + public void setEndpoint(String endpoint) throws IllegalArgumentException { + if (endpoint.isEmpty()) { + throw new IllegalArgumentException("endPoint is empty"); + } + + this.srcEndpoint = endpoint; + } + + + public String getSrcAccessKeyId() { + return srcAccessKeyId; + } + + public void setSrcAccessKeyId(String accessKeyId) throws IllegalArgumentException { + if (accessKeyId.isEmpty()) { + throw new IllegalArgumentException("accessKeyId is empty"); + } + + this.srcAccessKeyId = accessKeyId; + } + + public String getSrcAccessKeySecret() { + return srcAccessKeySecret; + } + + public void setSrcAccessKeySecret(String accessKeySecret) throws IllegalArgumentException { + if (accessKeySecret.isEmpty()) { + throw new IllegalArgumentException("accessKeySecret is empty"); + } + + this.srcAccessKeySecret = accessKeySecret; + } + + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java new file mode 100644 index 0000000..c3c2c7a --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java @@ -0,0 +1,41 @@ +package com.qcloud.cos_migrate_tool.config; + +import java.io.File; +import java.util.HashSet; +import java.util.Set; + +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public class CopyFromLocalConfig extends CommonConfig { + private String localPath; + private Set excludes = new HashSet(); + + public String getLocalPath() { + return localPath; + } + + public void setLocalPath(String localPath) throws IllegalArgumentException { + File localPathFile = new File(localPath); + if (!localPathFile.exists()) { + throw new IllegalArgumentException("local path not exist!"); + } + this.localPath = SystemUtils.formatLocalPath(localPath); + } + + public void setExcludes(String excludePath) throws IllegalArgumentException { + excludePath.trim(); + String[] exludePathArray = excludePath.split(";"); + for (String excludePathElement : exludePathArray) { + File tempFile = new File(excludePathElement); + if (!tempFile.exists()) { + throw new IllegalArgumentException("excludePath " + excludePath + " not exist"); + } + this.excludes.add(SystemUtils.formatLocalPath(tempFile.getAbsolutePath())); + } + } + + public boolean isExcludes(String excludePath) { + return this.excludes.contains(excludePath); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromQiniuConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromQiniuConfig.java new file mode 100644 index 0000000..67eea38 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromQiniuConfig.java @@ -0,0 +1,5 @@ +package com.qcloud.cos_migrate_tool.config; + + +public class CopyFromQiniuConfig extends CopyFromCompetitorConfig { +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromUrllistConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromUrllistConfig.java new file mode 100644 index 0000000..5355894 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromUrllistConfig.java @@ -0,0 +1,24 @@ +package com.qcloud.cos_migrate_tool.config; + +import java.io.File; + +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public class CopyFromUrllistConfig extends CommonConfig { + private String urllistPath; + + + public String getUrllistPath() { + return urllistPath; + } + + public void setUrllistPath(String urllistPath) throws IllegalArgumentException { + File localPathFile = new File(urllistPath); + if (!localPathFile.exists()) { + throw new IllegalArgumentException("local path not exist!"); + } + this.urllistPath = SystemUtils.formatLocalPath(urllistPath); + } + + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java b/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java new file mode 100644 index 0000000..35ee769 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java @@ -0,0 +1,30 @@ +package com.qcloud.cos_migrate_tool.config; + +public enum MigrateType { + MIGRATE_FROM_LOCAL("migrateLocal"), + MIGRATE_FROM_ALI("migrateAli"), + MIGRATE_FROM_QINIU("migrateQiniu"), + MIGRATE_FROM_AWS("migrateAws"), + MIGRATE_FROM_URLLIST("migrateUrl"), + MIGRATE_FROM_COS_BUCKET_COPY("migrateBucketCopy"); + + private String migrateType; + + private MigrateType(String migrateType) { + this.migrateType = migrateType; + } + + public static MigrateType fromValue(String value) throws IllegalArgumentException{ + for (MigrateType elementType : MigrateType.values()) { + if (elementType.toString().equalsIgnoreCase(value)) { + return elementType; + } + } + throw new IllegalArgumentException("invalid migrate_type: " + value); + } + + @Override + public String toString() { + return this.migrateType; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/meta/TaskStatics.java b/src/main/java/com/qcloud/cos_migrate_tool/meta/TaskStatics.java new file mode 100644 index 0000000..b529cea --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/meta/TaskStatics.java @@ -0,0 +1,65 @@ +package com.qcloud.cos_migrate_tool.meta; + +import java.util.concurrent.atomic.AtomicLong; + +import org.joda.time.DateTime; +import org.joda.time.Seconds; + +/** + * 用于统计各类任务的数量,并将每次执行的任务统计结果打印到本机, 并写入数据库 + * + * @author chengwu + * + */ +public class TaskStatics { + public static final TaskStatics instance = new TaskStatics(); + private DateTime startTime = DateTime.now(); + + // 上传文件的总量, 成功量, 失败量 + private AtomicLong successCnt = new AtomicLong(0L); + private AtomicLong failCnt = new AtomicLong(0L); + private AtomicLong skipCnt = new AtomicLong(0L); + + private TaskStatics() {} + + public void addSuccessCnt() { + this.successCnt.incrementAndGet(); + } + + public long getSuccessCnt() { + return this.successCnt.get(); + } + + public void addFailCnt() { + this.failCnt.incrementAndGet(); + } + + public long getFailCnt() { + return this.failCnt.get(); + } + + public void addSkipCnt() { + this.skipCnt.incrementAndGet(); + } + + public long getSkipCnt() { + return this.skipCnt.get(); + } + + public String getStartTimeStr() { + return this.startTime.toString("yyyy-MM-dd HH:mm:ss"); + } + + public int getUsedTimeSeconds() { + DateTime enDateTime = DateTime.now(); + Seconds seconds = Seconds.secondsBetween(this.startTime, enDateTime); + return seconds.getSeconds(); + } + + public void reset() { + this.startTime =DateTime.now(); + this.successCnt.set(0); + this.failCnt.set(0); + this.skipCnt.set(0); + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateCompetitorRecordElement.java b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateCompetitorRecordElement.java new file mode 100644 index 0000000..803daa3 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateCompetitorRecordElement.java @@ -0,0 +1,33 @@ +package com.qcloud.cos_migrate_tool.record; + +import com.qcloud.cos_migrate_tool.config.MigrateType; + +public class MigrateCompetitorRecordElement extends RecordElement { + private String bucketName; + private String etag; + private String cosPath; + private long fileSize; + + public MigrateCompetitorRecordElement(MigrateType migrateType, String bucketName, + String cosPath, String etag, long fileSize) { + super(migrateType); + this.bucketName = bucketName; + this.etag = etag; + this.cosPath = cosPath; + this.fileSize = fileSize; + } + + @Override + public String buildKey() { + String key = + String.format("[taskType: %s] [bucket: %s] [cosPath: %s]", + recordType.toString(), bucketName, cosPath); + return key; + } + + @Override + public String buildValue() { + String value = String.format("[fileSize: %d] [etag: %s]", fileSize, etag); + return value; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateCopyBucketRecordElement.java b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateCopyBucketRecordElement.java new file mode 100644 index 0000000..3ce38d2 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateCopyBucketRecordElement.java @@ -0,0 +1,44 @@ +package com.qcloud.cos_migrate_tool.record; + +import com.qcloud.cos_migrate_tool.config.MigrateType; + +public class MigrateCopyBucketRecordElement extends RecordElement { + private String destRegion; + private String destBucketName; + private String destKey; + private String srcRegion; + private String srcBucketName; + private String srcKey; + private long srcSize; + private String srcEtag; + + + + public MigrateCopyBucketRecordElement(String destRegion, String destBucketName, String destKey, + String srcRegion, String srcBucketName, String srcKey, long srcSize, String srcEtag) { + super(MigrateType.MIGRATE_FROM_COS_BUCKET_COPY); + this.destRegion = destRegion; + this.destBucketName = destBucketName; + this.destKey = destKey; + this.srcRegion = srcRegion; + this.srcBucketName = srcBucketName; + this.srcKey = srcKey; + this.srcSize = srcSize; + this.srcEtag = srcEtag; + } + + @Override + public String buildKey() { + String key = String.format( + "[taskType: %s] [destRegion: %s], [destBucketName: %s], [destKey: %s], [srcRegion: %s], [srcBucketName: %s], [srcKey: %s]", + recordType.toString(), destRegion, destBucketName, destKey, srcRegion, srcBucketName, srcKey); + return key; + } + + @Override + public String buildValue() { + String value = String.format("[srcSize: %d], [srcEtag: %s]", srcSize, srcEtag); + return value; + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateLocalRecordElement.java b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateLocalRecordElement.java new file mode 100644 index 0000000..43c22de --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateLocalRecordElement.java @@ -0,0 +1,36 @@ +package com.qcloud.cos_migrate_tool.record; + +import com.qcloud.cos_migrate_tool.config.MigrateType; + +public class MigrateLocalRecordElement extends RecordElement { + private String bucketName; + private String localPath; + private String cosPath; + private long mtime; + private long fileSize; + + public MigrateLocalRecordElement(String bucketName, + String localPath, String cosPath, long mtime, long fileSize) { + super(MigrateType.MIGRATE_FROM_LOCAL); + this.bucketName = bucketName; + this.localPath = localPath; + this.cosPath = cosPath; + this.mtime = mtime; + this.fileSize = fileSize; + } + + + + @Override + public String buildKey() { + String key = String.format("[taskType: %s] [bucket: %s], [localPath: %s], [cosPath: %s]", recordType.toString(), bucketName, + localPath, cosPath); + return key; + } + + @Override + public String buildValue() { + String value = String.format("[mtime: %d], [fileSize: %d]", mtime, fileSize); + return value; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java new file mode 100644 index 0000000..3794d21 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/MigrateUrllistRecordElement.java @@ -0,0 +1,32 @@ +package com.qcloud.cos_migrate_tool.record; + +import com.qcloud.cos_migrate_tool.config.MigrateType; + +public class MigrateUrllistRecordElement extends RecordElement { + private String bucketName; + private String cosPath; + private String url; + private long fileSize; + + public MigrateUrllistRecordElement(MigrateType migrateType, String bucketName, String cosPath, + String url, long fileSize) { + super(migrateType); + this.bucketName = bucketName; + this.cosPath = cosPath; + this.url = url; + this.fileSize = fileSize; + } + + @Override + public String buildKey() { + String key = String.format("[taskType: %s] [bucket: %s] [cosPath: %s] [url: %s]", + recordType.toString(), bucketName, cosPath, url); + return key; + } + + @Override + public String buildValue() { + String value = String.format("[fileSize: %d]", fileSize); + return value; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java new file mode 100644 index 0000000..76676aa --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java @@ -0,0 +1,167 @@ +package com.qcloud.cos_migrate_tool.record; + +import org.iq80.leveldb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.iq80.leveldb.impl.Iq80DBFactory.*; +import java.io.*; + +/** + * HistoryRecordDb 里存储已经上传的记录, key是Element的key, vaule是Element的value + * + * @author chengwu + * + */ + +public class RecordDb { + + public static final Logger log = LoggerFactory.getLogger(RecordDb.class); + + private static final String ENCODING_TYPE = "UTF-8"; + private static final int CACHE_SIZE = 128 << 20; + + private DB db; + + public RecordDb() {} + + public boolean init(String historyDbFolder, String comment) { + Options options = new Options(); + options.cacheSize(CACHE_SIZE); + options.createIfMissing(true); + + try { + db = factory.open(new File(historyDbFolder), options); + } catch (IOException e) { + log.error(e.toString()); + return false; + } + + String commentFile = historyDbFolder + "/README"; + try { + BufferedOutputStream bos = + new BufferedOutputStream(new FileOutputStream(commentFile, true)); + bos.write(comment.getBytes(ENCODING_TYPE)); + bos.close(); + } catch (FileNotFoundException e) { + log.error(e.toString()); + return false; + } catch (IOException e) { + log.error(e.toString()); + return false; + } + + return true; + } + + // 保存记录 + public boolean saveRecord(RecordElement recordElement) { + String key = recordElement.buildKey(); + String value = recordElement.buildValue(); + return saveKV(key, value); + } + + public String buildMultipartUploadSavePointKey(String bucketName, String cosKey, + String localFilePath, long mtime, long partSize, long mutlipartUploadThreshold) { + String key = String.format( + "[task_kind: upload_savepoint] [bucket: %s], [key: %s], [localPath: %s], [mtime: %d], [partSize: %d], [mutlipartUploadThreshold: %d]", + bucketName, cosKey, localFilePath, mtime, partSize, mutlipartUploadThreshold); + return key; + } + + public String queryMultipartUploadSavePoint(String bucketName, String cosKey, + String localFilePath, long mtime, long partSize, long mutlipartUploadThreshold) { + String key = buildMultipartUploadSavePointKey(bucketName, cosKey, localFilePath, mtime, + partSize, mutlipartUploadThreshold); + return queryKV(key); + } + + public boolean updateMultipartUploadSavePoint(String bucketName, String cosKey, + String localFilePath, long mtime, long partSize, long mutlipartUploadThreshold, + String multipartUploadId) { + String key = buildMultipartUploadSavePointKey(bucketName, cosKey, localFilePath, mtime, + partSize, mutlipartUploadThreshold); + return saveKV(key, multipartUploadId); + } + + public boolean deleteMultipartUploadSavePoint(String bucketName, String cosKey, + String localFilePath, long mtime, long partSize, long mutlipartUploadThreshold) { + String key = buildMultipartUploadSavePointKey(bucketName, cosKey, localFilePath, mtime, + partSize, mutlipartUploadThreshold); + return deleteKey(key); + + } + + public boolean queryRecord(RecordElement recordElement) { + String key = recordElement.buildKey(); + String value = queryKV(key); + if (value == null) { + return false; + } + String recordElementValue = recordElement.buildValue(); + return recordElementValue.equals(value); + } + + private String queryKV(String key) { + byte[] valueByte; + try { + valueByte = db.get(key.getBytes(ENCODING_TYPE)); + } catch (DBException e) { + log.error("query db failed, key:{}, exception: {}", key, e.toString()); + return null; + } catch (UnsupportedEncodingException e) { + log.error("query db failed, key:{}, exception: {}", key, e.toString()); + return null; + } + + if (valueByte == null) { + return null; + } + String value; + try { + value = new String(valueByte, ENCODING_TYPE); + } catch (UnsupportedEncodingException e) { + log.error("query db failed, key:{}, exception: {}", key, e.toString()); + return null; + } + return value; + } + + private boolean saveKV(String key, String value) { + try { + db.put(key.getBytes(ENCODING_TYPE), value.getBytes(ENCODING_TYPE)); + return true; + } catch (DBException e) { + log.error("update db failed, key:{}, value:{}, exception: {}", key, value, + e.toString()); + return false; + } catch (UnsupportedEncodingException e) { + log.error("update db failed, key:{}, value:{}, exception: {}", key, value, + e.toString()); + return false; + } + } + + private boolean deleteKey(String key) { + try { + db.delete(key.getBytes(ENCODING_TYPE)); + return true; + } catch (DBException e) { + log.error("update db failed, key:{}, exception: {}", key, e.toString()); + return false; + } catch (UnsupportedEncodingException e) { + log.error("update db failed, key:{}, exception: {}", key, e.toString()); + return false; + } + } + + public void shutdown() { + if (db != null) { + try { + db.close(); + } catch (IOException e) { + log.error("close db occur a exception: " + e.toString()); + } + } + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/record/RecordElement.java b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordElement.java new file mode 100644 index 0000000..1c3f8bb --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/record/RecordElement.java @@ -0,0 +1,27 @@ +package com.qcloud.cos_migrate_tool.record; + +import com.qcloud.cos_migrate_tool.config.MigrateType; + +public abstract class RecordElement { + protected MigrateType recordType; + + public RecordElement(MigrateType recordType) { + super(); + this.recordType = recordType; + } + + public abstract String buildKey(); + + public abstract String buildValue(); + + public static RecordElement parseRecord(String key, String value) { + return null; + } + + @Override + public String toString() { + String str = String.format("[record_type: %s], [key: %s], [value: %s]", + recordType.toString(), buildKey(), buildValue()); + return str; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java new file mode 100644 index 0000000..2cb4b01 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTask.java @@ -0,0 +1,232 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.io.FileInputStream; +import java.math.BigInteger; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; + +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.common.utils.CRC64; +import com.aliyun.oss.event.ProgressEvent; +import com.aliyun.oss.event.ProgressEventType; +import com.aliyun.oss.event.ProgressListener; +import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.ObjectMetadata; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CopyFromAliConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateCompetitorRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; + +public class MigrateAliTask extends Task { + + private CopyFromAliConfig config; + private OSSClient ossClient; + private String srcKey; + private long fileSize; + private String etag; + + public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcKey, + long fileSize, String etag, TransferManager smallFileTransfer, + TransferManager bigFileTransfer, RecordDb recordDb, Semaphore semaphore) { + super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), + recordDb); + + this.config = config; + this.ossClient = ossClient; + this.srcKey = srcKey; + this.fileSize = fileSize; + this.etag = etag; + if (srcKey.startsWith("/")) { + this.srcKey = srcKey.substring(1); + } + + } + + private String buildCOSPath() { + String srcPrefix = config.getSrcPrefix(); + int lastDelimiter = srcPrefix.lastIndexOf("/"); + String keyName = srcKey.substring(lastDelimiter + 1); + String cosPrefix = config.getCosPath(); + if (cosPrefix.endsWith("/")) { + return cosPrefix + keyName; + } else { + return cosPrefix + "/" + keyName; + } + } + + /** + * The downloading progress listener. Its progressChanged API is called by the SDK when there's + * an update. + */ + private static class GetObjectProgressListener implements ProgressListener { + + private String srcKey; + + private long bytesRead = 0; + private long totalBytes = -1; + private boolean succeed = false; + private long lastPrintTimeStamp = 0; + private long byteReadLastPrint = 0; + + public GetObjectProgressListener(String srcKey) { + super(); + this.srcKey = srcKey; + } + + private void showDownloadProgress(String key, long byteTotal, long byteDownloadSofar) { + double pct = 100.0; + if (byteTotal > 0) { + pct = byteDownloadSofar * 1.0 / byteTotal * 100; + } + String status = "DownloadInProgress"; + if (byteTotal == byteDownloadSofar) { + status = "DownloadOk"; + } + String printMsg = String.format( + "[%s] [key: %s] [byteDownload/ byteTotal/ percentage: %d/ %d/ %.2f%%]", status, + key, byteDownloadSofar, byteTotal, pct); + System.out.println(printMsg); + log.info(printMsg); + } + + public void progressChanged(ProgressEvent progressEvent) { + long bytes = progressEvent.getBytes(); + ProgressEventType eventType = progressEvent.getEventType(); + switch (eventType) { + case RESPONSE_CONTENT_LENGTH_EVENT: + this.totalBytes = bytes; + break; + case RESPONSE_BYTE_TRANSFER_EVENT: + this.bytesRead += bytes; + if (this.bytesRead - this.byteReadLastPrint >= 1024) { + long currentTimeStamp = System.currentTimeMillis(); + if (currentTimeStamp - lastPrintTimeStamp >= 2000) { + showDownloadProgress(srcKey, totalBytes, bytesRead); + byteReadLastPrint = bytesRead; + lastPrintTimeStamp = currentTimeStamp; + } + } + break; + case TRANSFER_COMPLETED_EVENT: + this.succeed = true; + showDownloadProgress(srcKey, totalBytes, bytesRead); + break; + default: + break; + } + } + + public boolean isSucceed() { + return succeed; + } + + } + + @Override + public void doTask() { + + String cosPath = buildCOSPath(); + String localPath = config.getTempFolderPath() + ThreadLocalRandom.current().nextLong(); + + MigrateCompetitorRecordElement ossRecordElement = new MigrateCompetitorRecordElement( + MigrateType.MIGRATE_FROM_ALI, config.getBucketName(), cosPath, etag, fileSize); + if (isExist(ossRecordElement)) { + TaskStatics.instance.addSkipCnt(); + return; + } + try { + // download + // 下载object到文件 + GetObjectProgressListener downloadProgressListener = + new GetObjectProgressListener(srcKey); + ObjectMetadata objMeta = ossClient.getObject( + new GetObjectRequest(config.getSrcBucket(), srcKey) + .withProgressListener(downloadProgressListener), + new File(localPath)); + if (!downloadProgressListener.isSucceed()) { + throw new Exception("download from ali failed"); + } + String serverChecksum = + objMeta.getRawMetadata().get("x-oss-hash-crc64ecma").toString().trim(); + + if ((serverChecksum != null) && !serverChecksum.isEmpty()) { + FileInputStream stream = new FileInputStream(new File(localPath)); + CRC64 crc = new CRC64(); + byte[] b = new byte[65536]; + int len = 0; + + while ((len = stream.read(b)) != -1) { + crc.update(b, len); + } + stream.close(); + BigInteger serverCrcNum = new BigInteger(serverChecksum); + BigInteger localCrcNum = new BigInteger(1, crc.getBytes()); + + if (!localCrcNum.equals(serverCrcNum)) { + String errMsg = String.format( + "[fail] taskInfo: %s, crc check fail, local crc: %s, server crc: %s", + ossRecordElement.buildKey(), localCrcNum.toString(), + serverCrcNum.toString()); + System.err.println(errMsg); + log.error(errMsg); + TaskStatics.instance.addFailCnt(); + return; + } + } + + } catch (Exception e) { + String errMsg = String.format("[fail] taskInfo: %s, Caught an Exception, error msg: %s", + ossRecordElement.buildKey(), e.toString()); + System.err.println(errMsg); + log.error(errMsg); + TaskStatics.instance.addFailCnt(); + return; + } + + // upload + File localFile; + localFile = new File(localPath); + + if (!localFile.exists()) { + String errMsg = String.format("[fail] taskInfo: %s. file: %s not exist, srcKey: %s", + ossRecordElement.buildKey(), localPath, srcKey); + System.err.println(errMsg); + log.error(errMsg); + TaskStatics.instance.addFailCnt(); + return; + } + + if (localFile.length() != this.fileSize) { + String errMsg = String.format( + "[fail] taskInfo: %s, download file size %d not equal meta size %d", + ossRecordElement.buildKey(), localFile.length(), this.fileSize); + System.err.println(errMsg); + log.error(errMsg); + TaskStatics.instance.addFailCnt(); + localFile.delete(); + return; + } + + try { + uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), + config.isEntireFileMd5Attached()); + saveRecord(ossRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = String.format("[ok] task_info: %s", ossRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = String.format("[fail] task_info: %s", ossRecordElement.buildKey()); + System.err.println(printMsg); + log.error("[fail] task_info: {}, exception: {}", ossRecordElement.buildKey(), + e.toString()); + TaskStatics.instance.addFailCnt(); + } finally { + localFile.delete(); + } + + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java new file mode 100644 index 0000000..06d4e1f --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAliTaskExecutor.java @@ -0,0 +1,121 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.util.List; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos_migrate_tool.config.CopyFromAliConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +import com.aliyun.oss.*; +import com.aliyun.oss.common.comm.Protocol; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.ObjectListing; + +public class MigrateAliTaskExecutor extends TaskExecutor { + + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTaskExecutor.class); + + private String bucketName; + private String cosFolder; + + private OSSClient ossClient; + private String srcBucket; + private String srcPrefix; + private String srcAccessKeyId; + private String srcAccessKeySecret; + private String srcEndpoint; + + private CopyFromAliConfig config; + + public MigrateAliTaskExecutor(CopyFromAliConfig config) { + super(MigrateType.MIGRATE_FROM_ALI, config); + this.bucketName = config.getBucketName(); + + this.cosFolder = config.getCosPath(); + + this.srcAccessKeyId = config.getSrcAccessKeyId(); + this.srcAccessKeySecret = config.getSrcAccessKeySecret(); + this.srcBucket = config.getSrcBucket(); + this.srcEndpoint = config.getSrcEndpoint(); + this.srcPrefix = config.getSrcPrefix(); + this.config = config; + + ClientConfiguration ossConf = new ClientConfiguration(); + ossConf.setConnectionTimeout(5000); + ossConf.setMaxErrorRetry(5); + ossConf.setSocketTimeout(10000); + ossConf.setMaxConnections(1024); + ossConf.setProtocol(Protocol.HTTP); + + if (!config.getSrcProxyHost().isEmpty()) { + ossConf.setProxyHost(config.getSrcProxyHost()); + } + + if (config.getSrcProxyPort() > 0) { + ossConf.setProxyPort(config.getSrcProxyPort()); + } + + this.ossClient = new OSSClient(this.srcEndpoint, this.srcAccessKeyId, + this.srcAccessKeySecret, ossConf); + } + + @Override + public String buildTaskDbComment() { + String comment = String.format( + "[time: %s], [srcBucketName: %s] [cosFolder: %s], [srcEndPoint: %s], [srcPrefix: %s]", + SystemUtils.getCurrentDateTime(), config.getSrcBucket(), cosFolder, + config.getSrcEndpoint(), config.getSrcPrefix()); + return comment; + } + + @Override + public String buildTaskDbFolderPath() { + String temp = String.format("[srcEndpoint: %s] [prefix: %s] [cosFolder: %s] ", srcEndpoint, + srcPrefix, cosFolder); + String dbFolderPath = + String.format("db/migrate_from_ali/%s/%s", bucketName, DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + public void buildTask() { + final int maxKeys = 200; + final String keyPrefix = this.srcPrefix; + String nextMarker = ""; + ObjectListing objectListing; + + try { + do { + objectListing = ossClient.listObjects(new ListObjectsRequest(this.srcBucket) + .withPrefix(keyPrefix).withMarker(nextMarker).withMaxKeys(maxKeys)); + log.info("list next marker: " + nextMarker); + List sums = objectListing.getObjectSummaries(); + for (OSSObjectSummary s : sums) { + // AddTask + MigrateAliTask task = new MigrateAliTask(config, ossClient, s.getKey(), + s.getSize(), s.getETag(), smallFileTransferManager, + bigFileTransferManager, recordDb, semaphore); + try { + AddTask(task); + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + } + nextMarker = objectListing.getNextMarker(); + } while (objectListing.isTruncated()); + } catch (OSSException e) { + log.error("list fail msg: {}", e.getMessage()); + } + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + this.ossClient.shutdown(); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java new file mode 100644 index 0000000..f0a626f --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTask.java @@ -0,0 +1,203 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Semaphore; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CopyFromAwsConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateCompetitorRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; + +public class MigrateAwsTask extends Task { + + private CopyFromAwsConfig config; + private String srcKey; + private long fileSize; + private String etag; + private AmazonS3 s3Client; + + public MigrateAwsTask(CopyFromAwsConfig config, AmazonS3 s3Client, String srcKey, long fileSize, + String etag, TransferManager smallFileTransfer, TransferManager bigFileTransfer, + RecordDb recordDb, Semaphore semaphore) { + super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), + recordDb); + this.config = config; + this.s3Client = s3Client; + this.srcKey = srcKey; + this.fileSize = fileSize; + this.etag = etag; + if (srcKey.startsWith("/")) { + this.srcKey = srcKey.substring(1); + } + + } + + private String buildCOSPath() { + String srcPrefix = config.getSrcPrefix(); + int lastDelimiter = srcPrefix.lastIndexOf("/"); + String keyName = srcKey.substring(lastDelimiter + 1); + String cosPrefix = config.getCosPath(); + if (cosPrefix.endsWith("/")) { + return cosPrefix + keyName; + } else { + return cosPrefix + "/" + keyName; + } + } + + /** + * The downloading progress listener. Its progressChanged API is called by the SDK when there's + * an update. + */ + private static class GetObjectProgressListener implements ProgressListener { + + private String srcKey; + + private long bytesRead = 0; + private long totalBytes = -1; + private boolean succeed = false; + private long lastPrintTimeStamp = 0; + private long byteReadLastPrint = 0; + + public GetObjectProgressListener(String srcKey) { + super(); + this.srcKey = srcKey; + } + + private void showDownloadProgress(String key, long byteTotal, long byteDownloadSofar) { + double pct = 100.0; + if (byteTotal > 0) { + pct = byteDownloadSofar * 1.0 / byteTotal * 100; + } + String status = "DownloadInProgress"; + if (byteTotal == byteDownloadSofar) { + status = "DownloadOk"; + } + String printMsg = String.format( + "[%s] [key: %s] [byteDownload/ byteTotal/ percentage: %d/ %d/ %.2f%%]", status, + key, byteDownloadSofar, byteTotal, pct); + System.out.println(printMsg); + log.info(printMsg); + } + + public void progressChanged(ProgressEvent progressEvent) { + long bytes = progressEvent.getBytes(); + ProgressEventType eventType = progressEvent.getEventType(); + switch (eventType) { + case RESPONSE_CONTENT_LENGTH_EVENT: + this.totalBytes = bytes; + break; + case RESPONSE_BYTE_TRANSFER_EVENT: + this.bytesRead += bytes; + if (this.bytesRead - this.byteReadLastPrint >= 1024) { + long currentTimeStamp = System.currentTimeMillis(); + if (currentTimeStamp - lastPrintTimeStamp >= 2000) { + showDownloadProgress(srcKey, totalBytes, bytesRead); + byteReadLastPrint = bytesRead; + lastPrintTimeStamp = currentTimeStamp; + } + } + break; + case TRANSFER_COMPLETED_EVENT: + this.succeed = true; + showDownloadProgress(srcKey, totalBytes, bytesRead); + break; + default: + break; + } + } + + public boolean isSucceed() { + return succeed; + } + + } + + @Override + public void doTask() { + String cosPath = buildCOSPath(); + + MigrateCompetitorRecordElement awsRecordElement = new MigrateCompetitorRecordElement( + MigrateType.MIGRATE_FROM_AWS, config.getBucketName(), cosPath, etag, fileSize); + if (isExist(awsRecordElement)) { + TaskStatics.instance.addSkipCnt(); + return; + } + + // download + // 下载object到文件 + String localPath = config.getTempFolderPath() + UUID.randomUUID().toString(); + File localFile = new File(localPath); + try { + GetObjectProgressListener getObjectProgressListener = + new GetObjectProgressListener(srcKey); + s3Client.getObject(new GetObjectRequest(config.getSrcBucket(), srcKey) + .withGeneralProgressListener(getObjectProgressListener), + localFile); + if (!localFile.exists()) { + String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + System.out.println(printMsg); + log.error("[fail] [taskInfo: {}] [srcKey: {}] [download localfile failed, localFile {} not exist]", + awsRecordElement.buildKey(), srcKey, localPath); + TaskStatics.instance.addFailCnt(); + return; + } + + if (localFile.length() != this.fileSize) { + String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + System.out.println(printMsg); + log.error("[fail] [taskInfo: {}] [download size {} not equal meta size {}]", + awsRecordElement.buildKey(), localFile.length(), this.fileSize); + TaskStatics.instance.addFailCnt(); + localFile.delete(); + return; + } + + } catch (AmazonServiceException e) { + String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + System.out.println(printMsg); + log.error("[fail] [taskInfo: {}] [AmazonServiceException occur, msg {}]", + awsRecordElement.buildKey(), e.getMessage().toString()); + TaskStatics.instance.addFailCnt(); + localFile.delete(); + return; + } catch (AmazonClientException e) { + String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + System.out.println(printMsg); + log.error("[fail] [taskInfo: {}] [AmazonClientException occur, msg {}]", + awsRecordElement.buildKey(), e.getMessage().toString()); + TaskStatics.instance.addFailCnt(); + localFile.delete(); + return; + } + + // upload file + try { + uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), + config.isEntireFileMd5Attached()); + saveRecord(awsRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = String.format("[ok] task_info: %s", awsRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = String.format("[fail] [task_info: %s]", awsRecordElement.buildKey()); + System.out.println(printMsg); + log.error("[fail] task_info: [key: {}], [value: {}], [exception: {}]", + awsRecordElement.buildKey(), awsRecordElement.buildValue(), e.toString()); + TaskStatics.instance.addFailCnt(); + } finally { + localFile.delete(); + } + + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java new file mode 100644 index 0000000..5f40d1f --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateAwsTaskExecutor.java @@ -0,0 +1,130 @@ +package com.qcloud.cos_migrate_tool.task; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos_migrate_tool.config.CopyFromAwsConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +public class MigrateAwsTaskExecutor extends TaskExecutor { + + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTaskExecutor.class); + + + private String bucketName; + private String cosFolder; + + private AmazonS3 s3Client; + private String srcBucket; + private String srcPrefix; + private String srcAccessKeyId; + private String srcAccessKeySecret; + private String srcEndpoint; + + private CopyFromAwsConfig config; + + public MigrateAwsTaskExecutor(CopyFromAwsConfig config) { + super(MigrateType.MIGRATE_FROM_AWS, config); + this.bucketName = config.getBucketName(); + this.cosFolder = config.getCosPath(); + + this.srcAccessKeyId = config.getSrcAccessKeyId(); + this.srcAccessKeySecret = config.getSrcAccessKeySecret(); + this.srcBucket = config.getSrcBucket(); + this.srcEndpoint = config.getSrcEndpoint(); + this.srcPrefix = config.getSrcPrefix(); + this.config = config; + + com.amazonaws.ClientConfiguration awsConf = new com.amazonaws.ClientConfiguration(); + awsConf.setConnectionTimeout(5000); + awsConf.setMaxErrorRetry(5); + awsConf.setSocketTimeout(10000); + awsConf.setMaxConnections(1024); + awsConf.setProtocol(Protocol.HTTPS); + + if (!config.getSrcProxyHost().isEmpty()) { + awsConf.setProxyHost(config.getSrcProxyHost()); + } + + if (config.getSrcProxyPort() > 0) { + awsConf.setProxyPort(config.getSrcProxyPort()); + } + + AWSCredentials credentials = new BasicAWSCredentials(srcAccessKeyId, srcAccessKeySecret); + this.s3Client = AmazonS3ClientBuilder.standard().withClientConfiguration(awsConf) + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withEndpointConfiguration(new EndpointConfiguration(srcEndpoint, null)).build(); + + } + + @Override + public String buildTaskDbComment() { + String comment = String.format( + "[time: %s], [srcBucketName: %s] [cosFolder: %s], [srcEndPoint: %s], [srcPrefix: %s]", + SystemUtils.getCurrentDateTime(), config.getSrcBucket(), cosFolder, + config.getSrcEndpoint(), config.getSrcPrefix()); + return comment; + } + + @Override + public String buildTaskDbFolderPath() { + String temp = String.format("[srcPrefix: %s], [cosFolder: %s]", srcPrefix, cosFolder); + String dbFolderPath = + String.format("db/migrate_from_aws/%s/%s", bucketName, DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + public void buildTask() { + + try { + + ListObjectsV2Request req = + new ListObjectsV2Request().withBucketName(srcBucket).withPrefix(srcPrefix); + ListObjectsV2Result result; + do { + result = s3Client.listObjectsV2(req); + for (S3ObjectSummary objectSummary : result.getObjectSummaries()) { + // AddTask + MigrateAwsTask task = new MigrateAwsTask(config, s3Client, + objectSummary.getKey(), objectSummary.getSize(), + objectSummary.getETag(), smallFileTransferManager, + bigFileTransferManager, recordDb, semaphore); + + try { + AddTask(task); + } catch (InterruptedException e) { + log.error(e.getMessage()); + } + + } + req.setContinuationToken(result.getNextContinuationToken()); + } while (result.isTruncated()); + } catch (AmazonServiceException ase) { + log.error("list fail AmazonServiceException errorcode: {}, msg: {}", ase.getErrorCode(), + ase.getMessage()); + } catch (AmazonClientException ace) { + log.error("list fail AmazonClientException msg: {}", ace.getMessage().toString()); + } + + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + this.s3Client.shutdown(); + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java new file mode 100644 index 0000000..c207cc6 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTask.java @@ -0,0 +1,75 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.util.concurrent.Semaphore; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.model.CopyObjectRequest; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.transfer.Copy; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateCopyBucketRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; + +public class MigrateCopyBucketTask extends Task { + private final COSClient srcCOSClient; + private final String destRegion; + private final String destBucketName; + private final String destKey; + private final String srcRegion; + private final String srcBucketName; + private final String srcKey; + private final long srcSize; + private final String srcEtag; + + public MigrateCopyBucketTask(Semaphore semaphore, TransferManager smallFileTransfer, + TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb, COSClient srcCOSClient, + String destRegion, String destBucketName, String destKey, String srcRegion, + String srcBucketName, String srcKey, long srcSize, String srcEtag) { + super(semaphore, smallFileTransfer, bigFileTransfer, smallFileThreshold, recordDb); + this.srcCOSClient = srcCOSClient; + this.destRegion = destRegion; + this.destBucketName = destBucketName; + this.destKey = destKey; + this.srcRegion = srcRegion; + this.srcBucketName = srcBucketName; + this.srcKey = srcKey; + this.srcSize = srcSize; + this.srcEtag = srcEtag; + } + + + + @Override + public void doTask() { + MigrateCopyBucketRecordElement migrateCopyBucketRecordElement = + new MigrateCopyBucketRecordElement(destRegion, destBucketName, destKey, srcRegion, + srcBucketName, srcKey, srcSize, srcEtag); + if (isExist(migrateCopyBucketRecordElement)) { + TaskStatics.instance.addSkipCnt(); + return; + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(new Region(srcRegion), + srcBucketName, srcKey, destBucketName, destKey); + try { + Copy copy = smallFileTransfer.copy(copyObjectRequest, srcCOSClient, null); + copy.waitForCompletion(); + saveRecord(migrateCopyBucketRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = + String.format("[ok] task_info: %s", migrateCopyBucketRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = String.format("[fail] task_info: %s", + migrateCopyBucketRecordElement.buildKey()); + System.out.println(printMsg); + log.error("fail! task_info: [key: {}], [value: {}], exception: {}", + migrateCopyBucketRecordElement.buildKey(), + migrateCopyBucketRecordElement.buildValue(), e.toString()); + TaskStatics.instance.addFailCnt(); + } + } + + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java new file mode 100644 index 0000000..4fe7048 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateCopyBucketTaskExecutor.java @@ -0,0 +1,108 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.util.List; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.http.HttpProtocol; +import com.qcloud.cos.model.COSObjectSummary; +import com.qcloud.cos.model.ListObjectsRequest; +import com.qcloud.cos.model.ObjectListing; +import com.qcloud.cos.region.Region; +import com.qcloud.cos_migrate_tool.config.CopyBucketConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public class MigrateCopyBucketTaskExecutor extends TaskExecutor { + private static final Logger log = LoggerFactory.getLogger(MigrateCopyBucketTaskExecutor.class); + + private COSClient srcCosClient; + private String srcRegion; + private String srcBucketName; + private String srcCosPath; + + public MigrateCopyBucketTaskExecutor(CopyBucketConfig config) { + super(MigrateType.MIGRATE_FROM_COS_BUCKET_COPY, config); + + COSCredentials srcCred = new BasicCOSCredentials(config.getSrcAk(), config.getSrcSk()); + ClientConfig clientConfig = new ClientConfig(new Region(config.getSrcRegion())); + if (config.isEnableHttps()) { + clientConfig.setHttpProtocol(HttpProtocol.https); + } + clientConfig.setUserAgent("cos-migrate-tool-v1.0"); + this.srcCosClient = new COSClient(srcCred, clientConfig); + this.srcRegion = config.getSrcRegion(); + this.srcBucketName = config.getSrcBucket(); + this.srcCosPath = config.getSrcCosPath(); + } + + @Override + protected String buildTaskDbComment() { + String comment = String.format( + "[time: %s], [destRegion: %s], [destBucketName: %s], [destCosFolder: %s], [srcRegion: %s], [srcBucketName: %s], [srcFolder: %s], [smallTaskExecutor: %d]\n", + SystemUtils.getCurrentDateTime(), config.getRegion(), config.getBucketName(), + config.getCosPath(), srcRegion, srcBucketName, srcCosPath, + this.smallFileUploadExecutorNum); + return comment; + } + + @Override + protected String buildTaskDbFolderPath() { + String temp = String.format( + "[destCosFolder: %s], [srcRegion: %s], [srcBucket: %s], [srcCosFolder: %s]", + config.getCosPath(), ((CopyBucketConfig) config).getSrcRegion(), + ((CopyBucketConfig) config).getSrcBucket(), + ((CopyBucketConfig) config).getSrcCosPath()); + String dbFolderPath = String.format("db/migrate_copy_bucket/%s/%s", config.getBucketName(), + DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + @Override + public void buildTask() { + ListObjectsRequest listObjectsRequest = + new ListObjectsRequest(srcBucketName, srcCosPath, null, null, 1000); + + int lastDelimiter = srcCosPath.lastIndexOf("/"); + + try { + while (true) { + ObjectListing objectListing = srcCosClient.listObjects(listObjectsRequest); + List cosObjectSummaries = objectListing.getObjectSummaries(); + for (COSObjectSummary cosObjectSummary : cosObjectSummaries) { + String srcKey = cosObjectSummary.getKey(); + String srcEtag = cosObjectSummary.getETag(); + long srcSize = cosObjectSummary.getSize(); + String keyName = srcKey.substring(lastDelimiter); + String copyDestKey = config.getCosPath() + keyName; + + MigrateCopyBucketTask task = new MigrateCopyBucketTask(semaphore, + smallFileTransferManager, bigFileTransferManager, + config.getSmallFileThreshold(), recordDb, srcCosClient, + config.getRegion(), config.getBucketName(), copyDestKey, srcRegion, + srcBucketName, srcKey, srcSize, srcEtag); + AddTask(task); + } + if (!objectListing.isTruncated()) { + break; + } + listObjectsRequest.setMarker(objectListing.getNextMarker()); + } + } catch (Exception e) { + log.error("List cos bucket occur a exception", e); + } + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + srcCosClient.shutdown(); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java new file mode 100644 index 0000000..768c313 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java @@ -0,0 +1,102 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos.model.StorageClass; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateLocalRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public class MigrateLocalTask extends Task { + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTask.class); + + private String bucketName; + private String localFolder; + private String cosFolder; + private File localFile; + private StorageClass storageClass; + private boolean entireMd5Attached; + + + public MigrateLocalTask(String bucketName, String localFolder, String cosFolder, File localFile, + StorageClass storageClass, TransferManager smallFileTransfer, + TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb, Semaphore semaphore, boolean entireMd5Attached) { + super(semaphore, smallFileTransfer, bigFileTransfer, smallFileThreshold, recordDb); + this.bucketName = bucketName; + this.localFolder = localFolder; + this.cosFolder = cosFolder; + this.localFile = localFile; + this.storageClass = storageClass; + this.entireMd5Attached = entireMd5Attached; + } + + private String buildCOSPath(String localPath) { + String cosPath = cosFolder + localPath.substring(localFolder.length()); + return cosPath; + } + + + @Override + public void doTask() { + String localPath = SystemUtils.formatLocalPath(localFile.getPath()); + String cosPath = buildCOSPath(localPath); + long mtime = localFile.lastModified(); + long fileSize = localFile.length(); + + MigrateLocalRecordElement migrateLocalRecordElement = + new MigrateLocalRecordElement(bucketName, localPath, cosPath, mtime, fileSize); + // 如果记录存在 + if (isExist(migrateLocalRecordElement)) { + TaskStatics.instance.addSkipCnt(); + return; + } + + try { + uploadFile(bucketName, cosPath, localFile, storageClass, entireMd5Attached); + saveRecord(migrateLocalRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = String.format("[ok] task_info: %s", migrateLocalRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = String.format("[fail] task_info: %s", migrateLocalRecordElement.buildKey()); + System.out.println(printMsg); + log.error("fail! task_info: [key: {}], [value: {}], exception: {}", migrateLocalRecordElement.buildKey(), + migrateLocalRecordElement.buildValue(), e.toString()); + TaskStatics.instance.addFailCnt(); + } + + /* + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, cosPath, localFile); + putObjectRequest.setStorageClass(storageClass); + Upload upload = null; + try { + if (fileSize >= SMALL_FILE_THRESHOLD) { + upload = bigFileTransfer.upload(putObjectRequest); + } else { + upload = smallFileTransfer.upload(putObjectRequest); + } + upload.waitForCompletion(); + recordDb.saveRecord(migrateLocalRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = + String.format("[ok] task_info: %s", migrateLocalRecordElement.buildKey()); + System.out.println(printMsg); + } catch (Exception e) { + String printMsg = + String.format("[fail] task_info: %s", migrateLocalRecordElement.buildKey()); + System.out.println(printMsg); + log.error("fail! task_info: [key: {}], [value: {}], exception: {}", + migrateLocalRecordElement.buildKey(), migrateLocalRecordElement.buildValue(), + e.toString()); + TaskStatics.instance.addSkipCnt(); + } + */ + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java new file mode 100644 index 0000000..aec9b5b --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java @@ -0,0 +1,106 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos.model.StorageClass; +import com.qcloud.cos_migrate_tool.config.CopyFromLocalConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public class MigrateLocalTaskExecutor extends TaskExecutor { + + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTaskExecutor.class); + + private String bucketName; + private String localFolder; + private String cosFolder; + + public MigrateLocalTaskExecutor(CopyFromLocalConfig config) { + super(MigrateType.MIGRATE_FROM_LOCAL, config); + this.bucketName = config.getBucketName(); + this.localFolder = config.getLocalPath(); + this.cosFolder = config.getCosPath(); + } + + @Override + public String buildTaskDbComment() { + String comment = String.format( + "[time: %s], [bucketName: %s], [localFolder: %s], [cosFolder: %s], [smallfile_exector_number: %d], [bigfile_executor_number: %d]\n", + SystemUtils.getCurrentDateTime(), bucketName, localFolder, cosFolder, + this.smallFileUploadExecutorNum, this.bigFileUploadExecutorNum); + return comment; + } + + @Override + public String buildTaskDbFolderPath() { + String temp = String.format("[local: %s], [cosFolder: %s]", localFolder, cosFolder); + String dbFolderPath = + String.format("db/migrate_from_local/%s/%s", bucketName, DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + public void buildTask() { + SimpleFileVisitor finder = new SimpleFileVisitor() { + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + String dirPath = SystemUtils.formatLocalPath(dir.toString()); + if (((CopyFromLocalConfig) config).isExcludes(dirPath)) { + return FileVisitResult.SKIP_SUBTREE; + } else { + return super.preVisitDirectory(dir, attrs); + } + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + + String localPath = SystemUtils.formatLocalPath(file.toString()); + String filePath = ""; + if (filePath.contains(".w.")) { + return super.visitFile(file, attrs); + } + try { + if (!((CopyFromLocalConfig) config).isExcludes(localPath)) { + File localFile = new File(file.toString()); + StorageClass storageClass = + ((CopyFromLocalConfig) config).getStorageClass(); + boolean entireFileMd5Attached = config.isEntireFileMd5Attached(); + long smallFileThreshold = config.getSmallFileThreshold(); + MigrateLocalTask migrateLocalTask = new MigrateLocalTask(bucketName, + localFolder, cosFolder, localFile, storageClass, + smallFileTransferManager, bigFileTransferManager, + smallFileThreshold, recordDb, semaphore, entireFileMd5Attached); + AddTask(migrateLocalTask); + } + } catch (InterruptedException e) { + throw new IOException(e.getMessage()); + } + return super.visitFile(file, attrs); + } + }; + + try { + java.nio.file.Files.walkFileTree(Paths.get(localFolder), finder); + } catch (IOException e) { + log.error("walk file tree error", e); + } + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java new file mode 100644 index 0000000..2afedf4 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTask.java @@ -0,0 +1,121 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; + +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CopyFromQiniuConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateCompetitorRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; +import com.qcloud.cos_migrate_tool.utils.Downloader; +import com.qiniu.util.Auth; + +public class MigrateQiniuTask extends Task { + + private CopyFromQiniuConfig config; + private Auth auth; + private String srcKey; + private long fileSize; + private String etag; + + public MigrateQiniuTask(CopyFromQiniuConfig config, Auth auth, String srcKey, long fileSize, String etag, + TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, + Semaphore semaphore) { + super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), recordDb); + + this.config = config; + this.srcKey = srcKey; + this.fileSize = fileSize; + this.etag = etag; + this.auth = auth; + if (srcKey.startsWith("/")) { + this.srcKey = srcKey.substring(1); + } + } + + private String buildCOSPath() { + String srcPrefix = config.getSrcPrefix(); + int lastDelimiter = srcPrefix.lastIndexOf("/"); + String keyName = srcKey.substring(lastDelimiter + 1); + String cosPrefix = config.getCosPath(); + if (cosPrefix.endsWith("/")) { + return cosPrefix + keyName; + } else { + return cosPrefix + "/" + keyName; + } + } + + @Override + public void doTask() { + + String cosPath = buildCOSPath(); + + String localPath = config.getTempFolderPath() + ThreadLocalRandom.current().nextLong(); + + MigrateCompetitorRecordElement qiniuRecordElement = new MigrateCompetitorRecordElement( + MigrateType.MIGRATE_FROM_QINIU, config.getBucketName(), cosPath, etag, fileSize); + if (isExist(qiniuRecordElement)) { + TaskStatics.instance.addSkipCnt(); + return; + } + + // generate download url + String baseUrl = "http://" + config.getSrcEndpoint() + "/" + srcKey; + String url = auth.privateDownloadUrl(baseUrl, 3600); + + File localFile = new File(localPath); + + // download + boolean downloadSucc = false; + try { + downloadSucc = Downloader.instance.downFile(url, localFile); + } catch (Exception e) { + TaskStatics.instance.addFailCnt(); + log.error("download fail url:{} msg:{}", url, e.getMessage()); + localFile.deleteOnExit(); + return; + } + + if (!downloadSucc) { + log.error("download fail url:{}", url); + TaskStatics.instance.addFailCnt(); + return; + } + + // upload + if (!localFile.exists()) { + String errMsg = String.format("[fail] taskInfo: %s. file: %s not exist, srcKey: %s", + qiniuRecordElement.buildKey(), localPath, srcKey); + System.err.println(errMsg); + log.error(errMsg); + TaskStatics.instance.addFailCnt(); + return; + } + + if (localFile.length() != this.fileSize) { + log.error("download size[{}] != list size[{}]", localFile.length(), this.fileSize); + TaskStatics.instance.addFailCnt(); + return; + } + + try { + uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), + config.isEntireFileMd5Attached()); + saveRecord(qiniuRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = String.format("[ok] task_info: %s", qiniuRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = String.format("[fail] task_info: %s", qiniuRecordElement.buildKey()); + System.err.println(printMsg); + log.error("[fail] task_info: {}, exception: {}", qiniuRecordElement.buildKey(), e.toString()); + TaskStatics.instance.addFailCnt(); + } finally { + localFile.delete(); + } + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTaskExecutor.java new file mode 100644 index 0000000..3ec34c5 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateQiniuTaskExecutor.java @@ -0,0 +1,107 @@ +package com.qcloud.cos_migrate_tool.task; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.qcloud.cos_migrate_tool.config.CopyFromQiniuConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; +import com.qiniu.common.Zone; +import com.qiniu.http.ProxyConfiguration; +import com.qiniu.storage.BucketManager; +import com.qiniu.storage.Configuration; +import com.qiniu.storage.model.FileInfo; +import com.qiniu.util.Auth; + +public class MigrateQiniuTaskExecutor extends TaskExecutor { + + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTaskExecutor.class); + + private String bucketName; + private String cosFolder; + + private BucketManager bucketManager; + private Auth auth; + private String srcBucket; + private String srcPrefix; + private String srcAccessKeyId; + private String srcAccessKeySecret; + + private CopyFromQiniuConfig config; + + public MigrateQiniuTaskExecutor(CopyFromQiniuConfig config) { + super(MigrateType.MIGRATE_FROM_QINIU, config); + this.bucketName = config.getBucketName(); + + this.cosFolder = config.getCosPath(); + + this.srcAccessKeyId = config.getSrcAccessKeyId(); + this.srcAccessKeySecret = config.getSrcAccessKeySecret(); + this.srcBucket = config.getSrcBucket(); + this.srcPrefix = config.getSrcPrefix(); + this.config = config; + + Configuration cfg = new Configuration(); + if (!config.getSrcProxyHost().isEmpty() && config.getSrcProxyPort() > 0) { + cfg.proxy = new ProxyConfiguration(config.getSrcProxyHost(), config.getSrcProxyPort()); + } + cfg.useHttpsDomains = false; + + cfg.zone = Zone.autoZone(); // 七牛使用autozone后,list性能不太好,不过一次获取1000,平摊起来耗能接受 + auth = Auth.create(srcAccessKeyId, srcAccessKeySecret); + + bucketManager = new BucketManager(auth, cfg); + } + + @Override + public String buildTaskDbComment() { + String comment = String.format( + "[time: %s], [srcBucketName: %s] [cosFolder: %s], [endPoint: %s], [srcPrefix: %s]", + SystemUtils.getCurrentDateTime(), config.getSrcBucket(), cosFolder, + config.getSrcEndpoint(), config.getSrcEndpoint(), config.getSrcPrefix()); + return comment; + } + + @Override + public String buildTaskDbFolderPath() { + String temp = String.format("[srcPrefix: %s] [srcBucket: %s] [cosFolder: %s]", srcPrefix, + srcBucket, cosFolder); + String dbFolderPath = + String.format("db/migrate_from_qiniu/%s/%s", bucketName, DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + public void buildTask() { + + // 每次迭代的长度限制,最大1000,推荐值 1000 + int limit = 1000; + String delimiter = ""; + try { + BucketManager.FileListIterator fileListIterator = + bucketManager.createFileListIterator(srcBucket, srcPrefix, limit, delimiter); + while (fileListIterator.hasNext()) { + // 处理获取的file list结果 + FileInfo[] items = fileListIterator.next(); + for (FileInfo item : items) { + MigrateQiniuTask task = new MigrateQiniuTask(config, auth, item.key, item.fsize, + item.hash, smallFileTransferManager, bigFileTransferManager, recordDb, + semaphore); + + try { + AddTask(task); + } catch (InterruptedException e) { + log.error("add task fail,msg:{}", e.getMessage()); + } + } + } + } catch (Exception e) { + log.error("list fail msg:{}", e.getMessage()); + } + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java new file mode 100644 index 0000000..8870a40 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTask.java @@ -0,0 +1,117 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; + +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CopyFromUrllistConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateUrllistRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; +import com.qcloud.cos_migrate_tool.utils.Downloader; + +public class MigrateUrllistTask extends Task { + + private CopyFromUrllistConfig config; + private String url; + private String srcKey; + + public MigrateUrllistTask(CopyFromUrllistConfig config, String url, String srcKey, + TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, + Semaphore semaphore) { + super(semaphore, smallFileTransfer, bigFileTransfer, config.getSmallFileThreshold(), + recordDb); + + this.config = config; + this.url = url; + this.srcKey = srcKey; + if (srcKey.startsWith("/")) { + this.srcKey = srcKey.substring(1); + } + } + + private String buildCOSPath() { + String cosPrefix = config.getCosPath(); + if (cosPrefix.endsWith("/")) { + return cosPrefix + srcKey; + } else { + return cosPrefix + "/" + srcKey; + } + } + + @Override + public void doTask() { + + String cosPath = buildCOSPath(); + String localPath = config.getTempFolderPath() + ThreadLocalRandom.current().nextLong(); + long fileSize = -1; + try { + fileSize = Downloader.instance.headFile(url); + } catch (Exception e) { + log.error("head file fail, url: {}, msg:{}", url, e.getMessage()); + } + + MigrateUrllistRecordElement urllistRecordElement = new MigrateUrllistRecordElement( + MigrateType.MIGRATE_FROM_URLLIST, config.getBucketName(), cosPath, url, fileSize); + if (isExist(urllistRecordElement)) { + TaskStatics.instance.addSkipCnt(); + return; + } + + File localFile = new File(localPath); + boolean downloadSucc = false; + try { + downloadSucc = Downloader.instance.downFile(url, localFile); + } catch (Exception e) { + String printMsg = + String.format("[fail] task_info: %s", urllistRecordElement.buildKey()); + System.err.println(printMsg); + TaskStatics.instance.addFailCnt(); + log.error("download fail task_info: %s, [msg:{}]", url, e.getMessage()); + localFile.delete(); + return; + } + + if (!downloadSucc) { + String printMsg = + String.format("[fail] task_info: %s", urllistRecordElement.buildKey()); + System.err.println(printMsg); + log.error(printMsg); + TaskStatics.instance.addFailCnt(); + localFile.deleteOnExit(); + return; + } + + // upload + if (!localFile.exists()) { + String errMsg = String.format("[fail] taskInfo: %s. tmpfile: %s not exist", + urllistRecordElement.buildKey(), localPath); + System.err.println(errMsg); + log.error(errMsg); + TaskStatics.instance.addFailCnt(); + return; + } + + try { + uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(), + config.isEntireFileMd5Attached()); + saveRecord(urllistRecordElement); + TaskStatics.instance.addSuccessCnt(); + String printMsg = String.format("[ok] task_info: %s", urllistRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = + String.format("[fail] task_info: %s", urllistRecordElement.buildKey()); + System.err.println(printMsg); + log.error("[fail] task_info: {}, exception: {}", urllistRecordElement.buildKey(), + e.toString()); + TaskStatics.instance.addFailCnt(); + } finally { + localFile.delete(); + } + + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java new file mode 100644 index 0000000..d83a1b8 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateUrllistTaskExecutor.java @@ -0,0 +1,124 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.file.FileVisitResult; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; + +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos_migrate_tool.config.CopyFromUrllistConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public class MigrateUrllistTaskExecutor extends TaskExecutor { + + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTaskExecutor.class); + + private String bucketName; + private String cosFolder; + + private CopyFromUrllistConfig config; + + public MigrateUrllistTaskExecutor(CopyFromUrllistConfig config) { + super(MigrateType.MIGRATE_FROM_URLLIST, config); + this.bucketName = config.getBucketName(); + this.cosFolder = config.getCosPath(); + + this.config = config; + } + + @Override + public String buildTaskDbComment() { + String comment = String.format("[time: %s], [cosFolder: %s]", SystemUtils.getCurrentDateTime(), cosFolder); + return comment; + } + + @Override + public String buildTaskDbFolderPath() { + String temp = String.format("[urlPath: %s], [cosFolder: %s]", config.getUrllistPath(), cosFolder); + String dbFolderPath = String.format("db/migrate_from_urllist/%s/%s", bucketName, DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + public void buildTask() { + + SimpleFileVisitor finder = new SimpleFileVisitor() { + public FileVisitResult visitFile(Path urllistPath, BasicFileAttributes attrs) throws IOException { + + System.out.println(urllistPath.toString()); + File urllistFile = urllistPath.toFile(); + FileInputStream fis = null; + try { + fis = new FileInputStream(urllistFile); + } catch (FileNotFoundException e) { + log.error("file not find,msg:{}", e.getMessage()); + return FileVisitResult.CONTINUE; + } + + BufferedReader br = new BufferedReader(new InputStreamReader(fis)); + + String line = null; + try { + while ((line = br.readLine()) != null) { + line = line.trim(); + if (line.isEmpty()) { + continue; + } + + String url_path; + try { + URL url = new URL(line); + url_path = url.getPath(); + } catch (Exception e) { + log.error("parse url fail,line:{} msg:{}", line, e.getMessage()); + continue; + } + + MigrateUrllistTask task = new MigrateUrllistTask(config, line, url_path, + smallFileTransferManager, bigFileTransferManager, recordDb, semaphore); + try { + AddTask(task); + } catch (InterruptedException e) { + log.error("add task fail,msg:{}", e.getMessage()); + } + } + } catch (IOException e) { + log.error("read line fail,msg:{}", e.getMessage()); + } + + try { + br.close(); + } catch (IOException e) { + log.error("close bufferedreader fail,msg:{}", e.getMessage()); + } + + return super.visitFile(urllistPath, attrs); + } + + }; + + try { + java.nio.file.Files.walkFileTree(Paths.get(config.getUrllistPath()), finder); + } catch (IOException e) { + log.error("walk file tree error", e); + } + + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java new file mode 100644 index 0000000..346bfab --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java @@ -0,0 +1,182 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos.model.ObjectMetadata; +import com.qcloud.cos.model.PutObjectRequest; +import com.qcloud.cos.model.StorageClass; +import com.qcloud.cos.transfer.PersistableUpload; +import com.qcloud.cos.transfer.Transfer.TransferState; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos.transfer.TransferProgress; +import com.qcloud.cos.transfer.Upload; +import com.qcloud.cos.utils.Md5Utils; +import com.qcloud.cos_migrate_tool.record.RecordDb; +import com.qcloud.cos_migrate_tool.record.RecordElement; + +public abstract class Task implements Runnable { + private Semaphore semaphore; + public static final Logger log = LoggerFactory.getLogger(MigrateLocalTask.class); + + + protected TransferManager smallFileTransfer; + protected TransferManager bigFileTransfer; + protected long smallFileThreshold; + private RecordDb recordDb; + + public Task(Semaphore semaphore, TransferManager smallFileTransfer, + TransferManager bigFileTransfer, long smallFileThreshold, RecordDb recordDb) { + super(); + this.semaphore = semaphore; + this.smallFileTransfer = smallFileTransfer; + this.bigFileTransfer = bigFileTransfer; + this.smallFileThreshold = smallFileThreshold; + this.recordDb = recordDb; + } + + public boolean isExist(RecordElement recordElement) { + if (recordDb.queryRecord(recordElement)) { + String printMsg = String.format("[skip] task_info: %s", recordElement.buildKey()); + System.out.println(printMsg); + log.info("skip! task_info: [key: {}], [value: {}]", recordElement.buildKey(), + recordElement.buildValue()); + return true; + } + return false; + } + + public void saveRecord(RecordElement recordElement) { + recordDb.saveRecord(recordElement); + } + + public void showTransferProgress(Upload upload, boolean multipart, String key, long mtime) + throws InterruptedException { + boolean pointSaveFlag = false; + do { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + return; + } + TransferProgress progress = upload.getProgress(); + long byteSent = progress.getBytesTransferred(); + long byteTotal = progress.getTotalBytesToTransfer(); + double pct = 100.0; + if (byteTotal != 0) { + pct = progress.getPercentTransferred(); + } + String printMsg = String.format( + "[UploadInProgress] [key: %s] [byteSent/ byteTotal/ percentage: %d/ %d/ %.2f%%]", + key, byteSent, byteTotal, pct); + log.info(printMsg); + System.out.println(printMsg); + if (multipart && byteSent > 0 && !pointSaveFlag) { + + PersistableUpload persistableUploadInfo = upload.getResumeableMultipartUploadId(); + String multipartUploadId = null; + if (persistableUploadInfo != null) { + multipartUploadId = persistableUploadInfo.getMultipartUploadId(); + if (multipartUploadId != null) { + pointSaveFlag = this.recordDb.updateMultipartUploadSavePoint( + persistableUploadInfo.getBucketName(), + persistableUploadInfo.getKey(), persistableUploadInfo.getFile(), + mtime, persistableUploadInfo.getPartSize(), + persistableUploadInfo.getMutlipartUploadThreshold(), + persistableUploadInfo.getMultipartUploadId()); + if (pointSaveFlag) { + log.info("save point success for multipart upload, key: {}", key); + } else { + log.error("save point failed for multipart upload, key: {}", key); + } + } + + } + } + + } while (upload.isDone() == false); + // 传输完成, 删除断点信息 + if (upload.getState() == TransferState.Completed && pointSaveFlag) { + PersistableUpload persistableUploadInfo = upload.getResumeableMultipartUploadId(); + String multipartUploadId = null; + if (persistableUploadInfo != null) { + multipartUploadId = persistableUploadInfo.getMultipartUploadId(); + if (multipartUploadId != null) { + boolean deleteFlag = this.recordDb.deleteMultipartUploadSavePoint( + persistableUploadInfo.getBucketName(), persistableUploadInfo.getKey(), + persistableUploadInfo.getFile(), mtime, + persistableUploadInfo.getPartSize(), + persistableUploadInfo.getMutlipartUploadThreshold()); + if (deleteFlag) { + log.info("delete point success for multipart upload, key: {}", key); + } else { + log.info("delete point failed for multipart upload, key: {}", key); + } + } + } + } + upload.waitForException(); + } + + private void uploadBigFile(PutObjectRequest putObjectRequest) throws InterruptedException { + String bucketName = putObjectRequest.getBucketName(); + String cosKey = putObjectRequest.getKey(); + String localPath = putObjectRequest.getFile().getAbsolutePath(); + long mtime = putObjectRequest.getFile().lastModified(); + long partSize = this.bigFileTransfer.getConfiguration().getMinimumUploadPartSize(); + long mutlipartUploadThreshold = + this.bigFileTransfer.getConfiguration().getMultipartUploadThreshold(); + + String multipartId = this.recordDb.queryMultipartUploadSavePoint(bucketName, cosKey, + localPath, mtime, partSize, mutlipartUploadThreshold); + Upload upload = null; + // 如果multipartId不为Null, 则表示存在断点, 使用续传. + if (multipartId != null) { + PersistableUpload persistableUpload = new PersistableUpload(bucketName, cosKey, + localPath, multipartId, partSize, mutlipartUploadThreshold); + upload = this.bigFileTransfer.resumeUpload(persistableUpload); + } else { + upload = this.bigFileTransfer.upload(putObjectRequest); + } + showTransferProgress(upload, true, cosKey, mtime); + } + + private void uploadSmallFile(PutObjectRequest putObjectRequest) throws InterruptedException { + Upload upload = smallFileTransfer.upload(putObjectRequest); + showTransferProgress(upload, false, putObjectRequest.getKey(), + putObjectRequest.getFile().lastModified()); + } + + + public void uploadFile(String bucketName, String cosPath, File localFile, + StorageClass storageClass, boolean entireMd5Attached) throws Exception { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, cosPath, localFile); + putObjectRequest.setStorageClass(storageClass); + if (entireMd5Attached) { + String md5 = Md5Utils.md5Hex(localFile); + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.addUserMetadata("md5", md5); + putObjectRequest.setMetadata(objectMetadata); + } + + if (localFile.length() >= smallFileThreshold) { + uploadBigFile(putObjectRequest); + } else { + uploadSmallFile(putObjectRequest); + } + } + + public abstract void doTask(); + + + public void run() { + try { + doTask(); + } finally { + semaphore.release(); + } + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java new file mode 100644 index 0000000..196e973 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/TaskExecutor.java @@ -0,0 +1,159 @@ +package com.qcloud.cos_migrate_tool.task; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.qcloud.cos.COSClient; +import com.qcloud.cos.ClientConfig; +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.http.HttpProtocol; +import com.qcloud.cos.region.Region; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CommonConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.RecordDb; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +public abstract class TaskExecutor { + private static final Logger log = LoggerFactory.getLogger(TaskExecutor.class); + + protected MigrateType migrateType; + protected final int smallFileUploadExecutorNum; + protected final int bigFileUploadExecutorNum; + protected RecordDb recordDb = new RecordDb(); + protected Semaphore semaphore = new Semaphore(1024); // 控制最大添加到任务队列里的任务数 + protected ExecutorService threadPool; + protected CommonConfig config; + + protected COSClient cosClient; + protected TransferManager smallFileTransferManager; + protected TransferManager bigFileTransferManager; + + public TaskExecutor(MigrateType migrateType, CommonConfig config) { + this.migrateType = migrateType; + this.config = config; + this.threadPool = Executors.newFixedThreadPool(config.getTaskExecutorNumber()); + this.smallFileUploadExecutorNum = config.getSmallFileExecutorNumber(); + this.bigFileUploadExecutorNum = config.getBigFileExecutorNum(); + + COSCredentials cred = new BasicCOSCredentials(config.getAk(), config.getSk()); + ClientConfig clientConfig = new ClientConfig(new Region(config.getRegion())); + if (config.isEnableHttps()) { + clientConfig.setHttpProtocol(HttpProtocol.https); + } + clientConfig.setUserAgent("cos-migrate-tool-v1.0"); + this.cosClient = new COSClient(cred, clientConfig); + this.smallFileTransferManager = new TransferManager(this.cosClient, + Executors.newFixedThreadPool(config.getSmallFileExecutorNumber())); + this.smallFileTransferManager.getConfiguration() + .setMultipartUploadThreshold(config.getSmallFileThreshold()); + this.bigFileTransferManager = new TransferManager(this.cosClient, + Executors.newFixedThreadPool(config.getBigFileExecutorNum())); + this.bigFileTransferManager.getConfiguration() + .setMultipartUploadThreshold(config.getSmallFileThreshold()); + } + + protected abstract String buildTaskDbComment(); + + protected abstract String buildTaskDbFolderPath(); + + // 初始化Record db + protected boolean initRecord() { + String comment = buildTaskDbComment(); + String dbFolderPath = buildTaskDbFolderPath(); + File dbFolder = new File(dbFolderPath); + if (!dbFolder.isDirectory()) { + boolean mkdirRet = dbFolder.mkdirs(); + if (!mkdirRet) { + log.error("make db folder fail! [db_folder_path: %s]", dbFolderPath); + return false; + } + } + return recordDb.init(dbFolderPath, comment); + } + + protected void AddTask(Task task) throws InterruptedException { + try { + semaphore.acquire(); + threadPool.submit(task); + } catch (InterruptedException e) { + log.error("add task is interrupted", e); + throw e; + } + } + + + // 用于产生任务 + public abstract void buildTask(); + + public void run() { + if (!initRecord()) { + String errMsg = "init db error, may be another process with same config is running "; + log.error(errMsg); + System.err.println(errMsg); + return; + } + buildTask(); + } + + public void waitTaskOver() { + this.threadPool.shutdown(); + try { + this.threadPool.awaitTermination(1000, TimeUnit.DAYS); + this.recordDb.shutdown(); + this.smallFileTransferManager.shutdownNow(); + this.bigFileTransferManager.shutdownNow(); + this.cosClient.shutdown(); + printTaskStaticsInfo(); + } catch (InterruptedException e) { + log.error("waitTaskOver is interrupted!", e); + System.err.println("waitTaskOver is interrupted!"); + } + } + + public void printTaskStaticsInfo() { + String opStatus = ""; + if (TaskStatics.instance.getFailCnt() == 0) { + opStatus = "ALL_OK"; + } else if (TaskStatics.instance.getSuccessCnt() == 0) { + opStatus = "ALL_FAIL"; + } else { + opStatus = "PART_OK"; + } + + String printStr = String.format("\n\n%s over! op statistics:", migrateType.toString()); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %s", "op_status", opStatus); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %d", "migrate_ok", TaskStatics.instance.getSuccessCnt()); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %d", "migrate_fail", TaskStatics.instance.getFailCnt()); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %d", "migrate_skip", TaskStatics.instance.getSkipCnt()); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %s", "start_time", TaskStatics.instance.getStartTimeStr()); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %s", "end_time", SystemUtils.getCurrentDateTime()); + System.out.println(printStr); + log.info(printStr); + printStr = String.format("%30s : %d s", "used_time", + TaskStatics.instance.getUsedTimeSeconds()); + System.out.println(printStr); + log.info(printStr); + } + +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java b/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java new file mode 100644 index 0000000..a375549 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/utils/Downloader.java @@ -0,0 +1,215 @@ +package com.qcloud.cos_migrate_tool.utils; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + +import com.qcloud.cos.http.IdleConnectionMonitorThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Downloader { + public static final Downloader instance = new Downloader(); + private static final Logger log = LoggerFactory.getLogger(Downloader.class); + + protected HttpClient httpClient; + + protected PoolingHttpClientConnectionManager connectionManager; + protected IdleConnectionMonitorThread idleConnectionMonitor; + + protected RequestConfig requestConfig; + + private Downloader() { + super(); + this.connectionManager = new PoolingHttpClientConnectionManager(); + this.connectionManager.setMaxTotal(2048); + this.connectionManager.setDefaultMaxPerRoute(2048); + this.connectionManager.setValidateAfterInactivity(1); + HttpClientBuilder httpClientBuilder = + HttpClients.custom().setConnectionManager(connectionManager); + this.httpClient = httpClientBuilder.build(); + this.requestConfig = RequestConfig.custom().setConnectionRequestTimeout(30 * 1000) + .setConnectTimeout(30 * 1000).setSocketTimeout(30 * 1000).build(); + this.idleConnectionMonitor = new IdleConnectionMonitorThread(this.connectionManager); + this.idleConnectionMonitor.setDaemon(true); + this.idleConnectionMonitor.start(); + } + + public long headFile(String url) { + + int retry = 0; + int maxRetryCount = 5; + while (retry < maxRetryCount) { + HttpHead httpHead = null; + try { + URL encodeUrl = new URL(url); + URI uri = new URI(encodeUrl.getProtocol(), encodeUrl.getHost(), encodeUrl.getPath(), + encodeUrl.getQuery(), null); + httpHead = new HttpHead(uri); + } catch (URISyntaxException e) { + String errMsg = "Invalid url:" + url; + log.error(errMsg); + } catch (MalformedURLException e) { + log.error("headFile url fail,url:{},msg:{}", url, e.getMessage()); + return -1; + } + + httpHead.setConfig(requestConfig); + httpHead.setHeader("Accept", "*/*"); + httpHead.setHeader("Connection", "Keep-Alive"); + httpHead.setHeader("User-Agent", "cos-migrate-tool"); + + try { + HttpResponse httpResponse = httpClient.execute(httpHead); + int http_statuscode = httpResponse.getStatusLine().getStatusCode(); + if (http_statuscode < 200 || http_statuscode > 500) { + String errMsg = String.format( + "head failed, url: %s, httpResponse: %s, response_statuscode: %d", url, + httpResponse.toString(), http_statuscode); + throw new Exception(errMsg); + } + + if (!httpResponse.containsHeader("content-length")) { + log.info("not find content-length"); + return -1; + } + + Header header = httpResponse.getFirstHeader("content-length"); + + try { + long contentLength = Long.valueOf(header.getValue()); + if (contentLength < 0) { + throw new IllegalArgumentException("The minimum of content-length is 0"); + } + return contentLength; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid content-length"); + } + + } catch (Exception e) { + log.error(e.toString()); + httpHead.abort(); + return -1; + } + + } + return -1; + } + + private void showDownloadProgress(String url, long byteTotal, long byteDownloadSofar) { + double pct = 100.0; + if (byteTotal != 0) { + pct = byteDownloadSofar * 1.0 / byteTotal * 100; + } + String status = "DownloadInProgress"; + if (byteTotal == byteDownloadSofar) { + status = "DownloadOk"; + } + String printMsg = String.format( + "[%s] [url: %s] [byteDownload/ byteTotal/ percentage: %d/ %d/ %.2f%%]", status, url, + byteDownloadSofar, byteTotal, pct); + System.out.println(printMsg); + log.info(printMsg); + } + + public boolean downFile(String url, File localFile) { + int retry = 0; + int maxRetryCount = 5; + while (retry < maxRetryCount) { + HttpGet httpGet = null; + try { + URL encodeUrl = new URL(url); + URI uri = new URI(encodeUrl.getProtocol(), encodeUrl.getHost(), encodeUrl.getPath(), + encodeUrl.getQuery(), null); + httpGet = new HttpGet(uri); + } catch (URISyntaxException e) { + String errMsg = "Invalid url:" + url; + log.error(errMsg); + return false; + } catch (MalformedURLException e) { + log.error("downFile url fail,url:{},msg:{}", url, e.getMessage()); + return false; + } + + httpGet.setConfig(requestConfig); + httpGet.setHeader("Accept", "*/*"); + httpGet.setHeader("Connection", "Keep-Alive"); + httpGet.setHeader("User-Agent", "cos-migrate-tool-v1.0"); + try { + HttpResponse httpResponse = httpClient.execute(httpGet); + int http_statuscode = httpResponse.getStatusLine().getStatusCode(); + if (http_statuscode < 200 || http_statuscode > 299) { + String errMsg = String.format( + "getFileinputstream failed, url: %s, httpResponse: %s, response_statuscode: %d", + url, httpResponse.toString(), http_statuscode); + log.error(errMsg); + throw new Exception(errMsg); + } + HttpEntity entity = httpResponse.getEntity(); + long contentLength = entity.getContentLength(); + long byteDownloadSoFar = 0; + long byteDownloadLastPrint = 0; + long lastPrintTimeStamp = 0; + + BufferedInputStream bis = new BufferedInputStream(entity.getContent()); + OutputStream out = null; + BufferedOutputStream bos = null; + try { + out = new FileOutputStream(localFile); + bos = new BufferedOutputStream(out); + int inByte; + while ((inByte = bis.read()) != -1) { + bos.write(inByte); + ++byteDownloadSoFar; + if (byteDownloadSoFar - byteDownloadLastPrint >= 1024) { + long currentTimeStamp = System.currentTimeMillis(); + if (currentTimeStamp - lastPrintTimeStamp >= 2000) { + showDownloadProgress(url, contentLength, byteDownloadSoFar); + byteDownloadLastPrint = byteDownloadSoFar; + lastPrintTimeStamp = currentTimeStamp; + } + } + } + showDownloadProgress(url, contentLength, byteDownloadSoFar); + httpGet.releaseConnection(); + return true; + } finally { + try { + bis.close(); + bos.close(); + } catch (IOException e) { + } + } + } catch (Exception e) { + log.error(e.toString()); + httpGet.abort(); + localFile.delete(); + } + ++retry; + } + return false; + } + + public void shutdown() { + this.idleConnectionMonitor.shutdown(); + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/utils/PathUtils.java b/src/main/java/com/qcloud/cos_migrate_tool/utils/PathUtils.java new file mode 100644 index 0000000..2881b9f --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/utils/PathUtils.java @@ -0,0 +1,20 @@ +package com.qcloud.cos_migrate_tool.utils; + +public class PathUtils { + private static final String PATH_DELIMITER = "/"; + + public static String formatCosFolderPath(String cosPath) { + cosPath = cosPath.trim(); + if (!cosPath.startsWith(PATH_DELIMITER)) { + cosPath = PATH_DELIMITER + cosPath; + } + + cosPath = cosPath.replaceAll("//", PATH_DELIMITER); + + if (cosPath.endsWith(PATH_DELIMITER)) { + return cosPath; + } else { + return cosPath + PATH_DELIMITER; + } + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/utils/SystemUtils.java b/src/main/java/com/qcloud/cos_migrate_tool/utils/SystemUtils.java new file mode 100644 index 0000000..e1b85ff --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/utils/SystemUtils.java @@ -0,0 +1,32 @@ +package com.qcloud.cos_migrate_tool.utils; + +import java.io.File; + +import org.joda.time.DateTime; + +public class SystemUtils { + public static boolean isWindowsSystem() { + String osSystemName = System.getProperty("os.name").toLowerCase(); + return osSystemName.startsWith("win"); + } + + public static String getCurrentDateTime() { + DateTime dateTime = new DateTime(); + return dateTime.toString("yyyy-MM-dd HH:mm:ss"); + } + + public static String formatLocalPath(String localPath) throws IllegalArgumentException{ + File localFile = new File(localPath); + if (!localFile.exists()) { + throw new IllegalArgumentException("localpath " + localPath + " not exist!"); + } + String absolutePath = localFile.getAbsolutePath(); + if (SystemUtils.isWindowsSystem()) { + absolutePath = absolutePath.replace('\\', '/'); + } + if (localFile.isDirectory()) { + absolutePath += "/"; + } + return absolutePath; + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..acbebd8 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,41 @@ +### log level +log4j.rootLogger = INFO, I, W, E + +###stdout +log4j.appender.stdout = org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target = System.out +log4j.appender.stdout.Threshold = ERROR +log4j.appender.stdout.layout = org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%-5p] [%c:%x] [%F:%L] %n %m%n + +###DEBUG +log4j.appender.D = org.apache.log4j.DailyRollingFileAppender +log4j.appender.D.File = ./log/debug.log +log4j.appender.D.Append = true +log4j.appender.D.Threshold = DEBUG +log4j.appender.D.layout = org.apache.log4j.PatternLayout +log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%-5p] [%c:%x] [%F:%L] %n %m%n + +###INFO +log4j.appender.I = org.apache.log4j.DailyRollingFileAppender +log4j.appender.I.File = ./log/info.log +log4j.appender.I.Append = true +log4j.appender.I.Threshold = INFO +log4j.appender.I.layout = org.apache.log4j.PatternLayout +log4j.appender.I.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%-5p] [%c:%x] [%F:%L] %n %m%n + +###WARN +log4j.appender.W = org.apache.log4j.DailyRollingFileAppender +log4j.appender.W.File = ./log/warn.log +log4j.appender.W.Append = true +log4j.appender.W.Threshold = WARN +log4j.appender.W.layout = org.apache.log4j.PatternLayout +log4j.appender.W.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%-5p] [%c:%x] [%F:%L] %n %m%n + +###ERROR +log4j.appender.E = org.apache.log4j.DailyRollingFileAppender +log4j.appender.E.File = ./log/error.log +log4j.appender.E.Append = true +log4j.appender.E.Threshold = ERROR +log4j.appender.E.layout = org.apache.log4j.PatternLayout +log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [%-5p] [%c:%x] [%F:%L] %n %m%n diff --git a/start_migrate.bat b/start_migrate.bat new file mode 100644 index 0000000..18e267a --- /dev/null +++ b/start_migrate.bat @@ -0,0 +1,6 @@ +@echo off +set cur_dir=%CD% +cd %cur_dir% +set my_java_cp=.;%cur_dir%\dep\*;%cur_dir%\src\main\resources +java -Dfile.encoding=UTF-8 -cp "%my_java_cp%" com.qcloud.cos_migrate_tool.app.App +pause>nul diff --git a/start_migrate.sh b/start_migrate.sh new file mode 100644 index 0000000..0dc994d --- /dev/null +++ b/start_migrate.sh @@ -0,0 +1,8 @@ +#!/bin/bash +export LANG=en_US.utf8 + +cur_dir=$(cd `dirname $0`;pwd) +cd ${cur_dir} +cp_path=${cur_dir}/src/main/resources:${cur_dir}/dep/* + +java -Dfile.encoding=UTF-8 $@ -cp "$cp_path" com.qcloud.cos_migrate_tool.app.App