Skip to content

Commit

Permalink
1. rocksdb instead of leveldb. 2. add cos proxy. 3.record requestid
Browse files Browse the repository at this point in the history
1. rocksdb instead of leveldb. 2. add cos proxy. 3.record requestid
  • Loading branch information
kitmanzheng committed Aug 14, 2018
1 parent f00e4b0 commit 354fd6f
Show file tree
Hide file tree
Showing 22 changed files with 329 additions and 91 deletions.
Binary file modified dep/cos_migrate_tool-1.0-jar-with-dependencies.jar
Binary file not shown.
12 changes: 12 additions & 0 deletions opbin/dump_requestid.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
export LANG=en_US.utf8

cur_dir=$(cd `dirname $0`; cd ..; pwd)
echo ${cur_dir}
cd ${cur_dir}
cp_path=${cur_dir}/src/main/resources:${cur_dir}/dep/*

export RUN_MODE='DUMP_REQUESTID'
export DUMP_REQUESTID_FILE="${cur_dir}/dump-requestid.txt"
echo "try to dump requestid to ${DUMP_REQUESTID_FILE}"
java -Dfile.encoding=UTF-8 $@ -cp "$cp_path" com.qcloud.cos_migrate_tool.app.App
11 changes: 11 additions & 0 deletions opbin/query_requestid.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash
export LANG=en_US.utf8

cur_dir=$(cd `dirname $0`; cd ..; pwd)
cd ${cur_dir}
cp_path=${cur_dir}/src/main/resources:${cur_dir}/dep/*

export RUN_MODE='QUERY_REQUESTID'
export QUERY_REQUESTID_KEY="/create_test_data.sh"
echo "try to query requestid ${QUERY_REQUESID_KEY}"
java -Dfile.encoding=UTF-8 $@ -cp "$cp_path" com.qcloud.cos_migrate_tool.app.App
12 changes: 9 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@
<version>1.7.25</version>
</dependency>

<!-- <dependency> -->
<!-- <groupId>org.iq80.leveldb</groupId> -->
<!-- <artifactId>leveldb</artifactId> -->
<!-- <version>0.9</version> -->
<!-- </dependency> -->

<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.9</version>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>5.14.2</version>
</dependency>


Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/qcloud/cos_migrate_tool/config/CommonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class CommonConfig {
private int timeWindowBegin = 0;
private int timeWindowEnd = 24;
private String endpointSuffix = null;
private String cosProxyHost = "";
private int cosProxyPort = -1;

public String getTempFolderPath() {
return tempFolderPath;
Expand Down Expand Up @@ -314,4 +316,20 @@ public int getTimeWindowBegin() {
public int getTimeWindowEnd() {
return timeWindowEnd;
}

public void setProxyHost(String host) {
this.cosProxyHost = host;
}

public void setProxyPort(int port) {
this.cosProxyPort = port;
}

public String getProxyHost() {
return this.cosProxyHost;
}

public int getProxyPort() {
return this.cosProxyPort;
}
}
20 changes: 20 additions & 0 deletions src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class ConfigParser {
private static final String COMMON_DAEMON_MODE = "daemonMode";
private static final String COMMON_DAEMON_MODE_INTERVAL = "daemonModeInterVal";
private static final String COMMON_EXECUTE_TIME_WINDOW = "executeTimeWindow";
private static final String COMMON_PROXY_HOST = "proxyHost";
private static final String COMMON_PROXY_PORT = "proxyPort";

private static final String LOCAL_SECTION_NAME = "migrateLocal";
private static final String LOCAL_LOCALPATH = "localPath";
Expand Down Expand Up @@ -438,6 +440,24 @@ private boolean initCommonConfig(Preferences prefs, CommonConfig commonConfig) {
if (endPointSuffixStr != null && !endPointSuffixStr.trim().isEmpty()) {
commonConfig.setEndpointSuffix(endPointSuffixStr);
}

String proxyHost = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_PROXY_HOST);
if (proxyHost != null && !proxyHost.trim().isEmpty()) {
commonConfig.setProxyHost(proxyHost);
}

int port = -1;
String portStr = getConfigValue(prefs, COMMON_SECTION_NAME, COMMON_PROXY_PORT);

if (portStr != null && !portStr.isEmpty()) {
port = Integer.valueOf(portStr);
if (port > 0) {
commonConfig.setProxyPort(port);
} else {
throw new Exception("invalid cos proxy port");
}
}


} catch (Exception e) {
System.err.println(e.getMessage());
Expand Down
108 changes: 92 additions & 16 deletions src/main/java/com/qcloud/cos_migrate_tool/record/RecordDb.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package com.qcloud.cos_migrate_tool.record;

import org.iq80.leveldb.*;
import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.util.SizeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.iq80.leveldb.impl.Iq80DBFactory.*;
import java.io.*;
import com.qcloud.cos.utils.IOUtils;

/**
* HistoryRecordDb 里存储已经上传的记录, key是Element的key, vaule是Element的value
Expand All @@ -19,24 +30,27 @@ public class RecordDb {
public static final Logger log = LoggerFactory.getLogger(RecordDb.class);

private static final String ENCODING_TYPE = "UTF-8";
private static final int CACHE_SIZE = 128 << 20;

private DB db;
private RocksDB db;
private Options options;
private final String requestIdPrefix = "x-cos-requestId-";

public RecordDb() {}

public boolean init(String historyDbFolder, String comment) {
Options options = new Options();
options.cacheSize(CACHE_SIZE);
options.createIfMissing(true);

try {
db = factory.open(new File(historyDbFolder), options);
} catch (IOException e) {
options = new Options();
options.setCreateIfMissing(true);
options.setWriteBufferSize(16 * SizeUnit.MB).setMaxWriteBufferNumber(4)
.setMaxBackgroundCompactions(4);
db = RocksDB.open(options, historyDbFolder);
} catch (RocksDBException e) {
log.error(e.toString());
return false;
}


String commentFile = historyDbFolder + "/README";
try {
BufferedOutputStream bos =
Expand All @@ -50,7 +64,6 @@ public boolean init(String historyDbFolder, String comment) {
log.error(e.toString());
return false;
}

return true;
}

Expand All @@ -61,6 +74,62 @@ public boolean saveRecord(RecordElement recordElement) {
return saveKV(key, value);
}

public boolean saveRequestId(String cosKey, String requestId) {
String dbKey = requestIdPrefix + cosKey;
if (requestId == null) {
log.warn("requestId is null for cosKey " + cosKey);
return saveKV(dbKey, "Null");
} else {
return saveKV(dbKey, requestId);
}
}

public void dumpRequestId(String saveFilePath) {
ReadOptions readOptions = null;
RocksIterator rocksIterator = null;
BufferedOutputStream bos = null;
try {
bos = new BufferedOutputStream(new FileOutputStream(saveFilePath));
readOptions = new ReadOptions();
rocksIterator = db.newIterator(readOptions);
rocksIterator.seek(requestIdPrefix.getBytes(ENCODING_TYPE));
while (rocksIterator.isValid()) {
String key = new String(rocksIterator.key(), ENCODING_TYPE).trim();
key = key.substring(requestIdPrefix.length());
String value = new String(rocksIterator.value(), ENCODING_TYPE).trim();
String content = String.format("%s \t %s\n", key, value);
bos.write(content.getBytes(ENCODING_TYPE));
rocksIterator.next();
}
} catch (Exception e) {
final String errMsg = "dumpRequestId error.";
System.err.println(errMsg);
log.error(errMsg, e);
} finally {
if (readOptions != null) {
readOptions.close();
}
if (rocksIterator != null) {
rocksIterator.close();
}
if (bos != null) {
IOUtils.closeQuietly(bos, log);
}
}
}

public void queryRequestId(String cosKey) {
String dbKey = requestIdPrefix + cosKey;
String requestIdValue = queryKV(dbKey);
if (requestIdValue == null) {
requestIdValue = "Null";
}
String infoMsg = String.format("query requestid, [key: %s], [requestid: %s]", cosKey,
requestIdValue);
System.out.println(infoMsg);
log.info(infoMsg);
}

public String buildMultipartUploadSavePointKey(String bucketName, String cosKey,
String localFilePath, long mtime, long partSize, long mutlipartUploadThreshold) {
String key = String.format(
Expand Down Expand Up @@ -112,7 +181,7 @@ private String queryKV(String key) {
byte[] valueByte;
try {
valueByte = db.get(key.getBytes(ENCODING_TYPE));
} catch (DBException e) {
} catch (RocksDBException e) {
log.error("query db failed, key:{}, exception: {}", key, e.toString());
return null;
} catch (UnsupportedEncodingException e) {
Expand All @@ -137,7 +206,7 @@ private boolean saveKV(String key, String value) {
try {
db.put(key.getBytes(ENCODING_TYPE), value.getBytes(ENCODING_TYPE));
return true;
} catch (DBException e) {
} catch (RocksDBException e) {
log.error("update db failed, key:{}, value:{}, exception: {}", key, value,
e.toString());
return false;
Expand All @@ -147,12 +216,12 @@ private boolean saveKV(String key, String value) {
return false;
}
}

private boolean deleteKey(String key) {
try {
db.delete(key.getBytes(ENCODING_TYPE));
return true;
} catch (DBException e) {
} catch (RocksDBException e) {
log.error("update db failed, key:{}, exception: {}", key, e.toString());
return false;
} catch (UnsupportedEncodingException e) {
Expand All @@ -164,8 +233,15 @@ private boolean deleteKey(String key) {
public void shutdown() {
if (db != null) {
try {
FlushOptions flushOptions = new FlushOptions();
flushOptions.setWaitForFlush(true);
db.flush(flushOptions);
flushOptions.close();
db.close();
} catch (IOException e) {
if (options != null) {
options.close();
}
} catch (RocksDBException e) {
log.error("close db occur a exception: " + e.toString());
}
}
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MigrateAliTask(CopyFromAliConfig config, OSSClient ossClient, String srcK
}

private String buildCOSPath() {
String srcPrefix = ((CopyFromAliConfig)config).getSrcPrefix();
String srcPrefix = ((CopyFromAliConfig) config).getSrcPrefix();
int lastDelimiter = srcPrefix.lastIndexOf("/");
String keyName = srcKey.substring(lastDelimiter + 1);
String cosPrefix = config.getCosPath();
Expand Down Expand Up @@ -134,15 +134,15 @@ public void doTask() {
TaskStatics.instance.addSkipCnt();
return;
}

Map<String, String> userMetaMap = null;
try {
// download
// 下载object到文件
GetObjectProgressListener downloadProgressListener =
new GetObjectProgressListener(srcKey);
ObjectMetadata objMeta = ossClient.getObject(
new GetObjectRequest(((CopyFromAliConfig)config).getSrcBucket(), srcKey)
new GetObjectRequest(((CopyFromAliConfig) config).getSrcBucket(), srcKey)
.<GetObjectRequest>withProgressListener(downloadProgressListener),
new File(localPath));
if (!downloadProgressListener.isSucceed()) {
Expand Down Expand Up @@ -211,11 +211,12 @@ public void doTask() {
}

try {
uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(),
config.isEntireFileMd5Attached(), userMetaMap);
String requestId = uploadFile(config.getBucketName(), cosPath, localFile,
config.getStorageClass(), config.isEntireFileMd5Attached(), userMetaMap);
saveRecord(ossRecordElement);
saveRequestId(cosPath, requestId);
TaskStatics.instance.addSuccessCnt();
String printMsg = String.format("[ok] task_info: %s", ossRecordElement.buildKey());
String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, ossRecordElement.buildKey());
System.out.println(printMsg);
log.info(printMsg);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void doTask() {
// 下载object到文件
String localPath = config.getTempFolderPath() + UUID.randomUUID().toString();
File localFile = new File(localPath);
Map<String , String> userMetaMap;
Map<String, String> userMetaMap;
try {
GetObjectProgressListener getObjectProgressListener =
new GetObjectProgressListener(srcKey);
Expand All @@ -154,7 +154,7 @@ public void doTask() {
TaskStatics.instance.addFailCnt();
return;
}

userMetaMap = objectMetadata.getUserMetadata();
if (localFile.length() != this.fileSize) {
String printMsg =
Expand Down Expand Up @@ -187,11 +187,12 @@ public void doTask() {

// upload file
try {
uploadFile(config.getBucketName(), cosPath, localFile, config.getStorageClass(),
config.isEntireFileMd5Attached(), userMetaMap);
String requestId = uploadFile(config.getBucketName(), cosPath, localFile,
config.getStorageClass(), config.isEntireFileMd5Attached(), userMetaMap);
saveRecord(awsRecordElement);
saveRequestId(cosPath, requestId);
TaskStatics.instance.addSuccessCnt();
String printMsg = String.format("[ok] task_info: %s", awsRecordElement.buildKey());
String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, awsRecordElement.buildKey());
System.out.println(printMsg);
log.info(printMsg);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import com.qcloud.cos.COSClient;
import com.qcloud.cos.model.CopyObjectRequest;
import com.qcloud.cos.model.CopyObjectResult;
import com.qcloud.cos.model.CopyResult;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.transfer.Copy;
import com.qcloud.cos.transfer.TransferManager;
Expand Down Expand Up @@ -53,14 +55,16 @@ public void doTask() {
}
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(new Region(srcRegion),
srcBucketName, srcKey, destBucketName, destKey);
copyObjectRequest.setSourceEndpointSuffix(srcEndpointSuffx);
// copyObjectRequest.setSourceEndpointSuffix(srcEndpointSuffx);
try {
Copy copy = smallFileTransfer.copy(copyObjectRequest, srcCOSClient, null);
copy.waitForCompletion();
CopyResult copyResult = copy.waitForCopyResult();
String requestId = copyResult.getRequestId();
saveRecord(migrateCopyBucketRecordElement);
saveRequestId(destKey, requestId);
TaskStatics.instance.addSuccessCnt();
String printMsg =
String.format("[ok] task_info: %s", migrateCopyBucketRecordElement.buildKey());
String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, migrateCopyBucketRecordElement.buildKey());
System.out.println(printMsg);
log.info(printMsg);
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 354fd6f

Please sign in to comment.