From edbfc9a4875b604c8107fa9caadf712071dd19f7 Mon Sep 17 00:00:00 2001 From: Edouard A Date: Sun, 5 Sep 2021 09:37:51 -0400 Subject: [PATCH] Fix a timeout error on Windows when writing to a S3 bucket * Binary files must be opened with the binary flag or reading may stop before EOF, causing a timeout as AWS waits for the end of the file * Added an #undef for GetFreeSpace to fix a compilation error on Windows * Added default definition when USE_AWS is 0 * Fixed integer cast * Fixed uninitialized variable warning * Fixed include --- cloud/aws/aws_env.h | 9 +++- cloud/aws/aws_s3.cc | 17 ++++---- cloud/cloud_env.cc | 86 ++++++++++++++++++++------------------- cloud/cloud_env_impl.cc | 6 +-- cloud/cloud_env_impl.h | 12 ++++-- cloud/cloud_file_cache.cc | 2 +- 6 files changed, 72 insertions(+), 60 deletions(-) diff --git a/cloud/aws/aws_env.h b/cloud/aws/aws_env.h index a1bd01704f9..da1c8a3e4b3 100644 --- a/cloud/aws/aws_env.h +++ b/cloud/aws/aws_env.h @@ -11,7 +11,6 @@ #include "cloud/cloud_env_impl.h" #ifdef USE_AWS - #include #include @@ -76,5 +75,11 @@ class AwsEnv : public CloudEnvImpl { }; } // namespace ROCKSDB_NAMESPACE - +#else +namespace ROCKSDB_NAMESPACE { +class AwsEnv : public CloudEnvImpl { + public: + static Status NewAwsEnv(Env* env, std::unique_ptr* cenv); +}; +} // namespace ROCKSDB_NAMESPACE #endif // USE_AWS diff --git a/cloud/aws/aws_s3.cc b/cloud/aws/aws_s3.cc index 09e8c377c96..6b3ccb2ef12 100644 --- a/cloud/aws/aws_s3.cc +++ b/cloud/aws/aws_s3.cc @@ -900,11 +900,10 @@ Status S3StorageProvider::DoGetCloudObject(const std::string& bucket_name, // The stream is not flushed when WaitUntilFinished() returns. // TODO(igor) Fix this once the AWS SDK's bug is fixed. auto ioStreamFactory = [=]() -> Aws::IOStream* { - // fallback to FStream - return Aws::New( - Aws::Utils::ARRAY_ALLOCATION_TAG, destination, - std::ios_base::out | std::ios_base::trunc); - + // fallback to FStream + return Aws::New( + Aws::Utils::ARRAY_ALLOCATION_TAG, destination, + std::ios_base::out | std::ios_base::trunc | std::ios_base::binary); }; auto handle = s3client_->DownloadFile(ToAwsString(bucket_name), @@ -934,7 +933,7 @@ Status S3StorageProvider::DoGetCloudObject(const std::string& bucket_name, // fallback to FStream return Aws::New( Aws::Utils::ARRAY_ALLOCATION_TAG, destination, - std::ios_base::out | std::ios_base::trunc); + std::ios_base::out | std::ios_base::trunc | std::ios_base::binary); } return new IOStreamWithOwnedBuf( std::unique_ptr(new WritableFileStreamBuf( @@ -982,9 +981,9 @@ Status S3StorageProvider::DoPutCloudObject(const std::string& local_file, return Status::IOError(local_file, errmsg); } } else { - auto inputData = - Aws::MakeShared(object_path.c_str(), local_file.c_str(), - std::ios_base::in | std::ios_base::out); + auto inputData = Aws::MakeShared( + object_path.c_str(), local_file.c_str(), + std::ios_base::in | std::ios_base::binary); Aws::S3::Model::PutObjectRequest putRequest; putRequest.SetBucket(ToAwsString(bucket_name)); diff --git a/cloud/cloud_env.cc b/cloud/cloud_env.cc index 594d4c6e18d..744e59e2c68 100644 --- a/cloud/cloud_env.cc +++ b/cloud/cloud_env.cc @@ -1,10 +1,10 @@ // Copyright (c) 2017 Rockset. #ifndef ROCKSDB_LITE -#ifndef _WIN32_WINNT -#include -#else +#ifdef WIN32 #include +#else +#include #endif #include @@ -200,7 +200,8 @@ static std::unordered_map const char* addr1, const char* addr2, std::string* /*mismatch*/) { auto bucket1 = reinterpret_cast(addr1); auto bucket2 = reinterpret_cast(addr2); - return bucket1->GetBucketName(false) == bucket2->GetBucketName(false); + return bucket1->GetBucketName(false) == + bucket2->GetBucketName(false); }}}, {"TEST", {0, OptionType::kUnknown, OptionVerificationType::kAlias, @@ -338,22 +339,23 @@ Status CloudEnvOptions::Configure(const ConfigOptions& config_options, } } if (s.ok()) { - s = OptionTypeInfo::ParseStruct(config_options, CloudEnvOptions::kName(), - &cloud_env_option_type_info, - CloudEnvOptions::kName(), opts_str, reinterpret_cast(this)); - if (!s.ok()) { // Something went wrong. Attempt to reset - OptionTypeInfo::ParseStruct(config_options, CloudEnvOptions::kName(), - &cloud_env_option_type_info, - CloudEnvOptions::kName(), current, reinterpret_cast(this)); + s = OptionTypeInfo::ParseStruct( + config_options, CloudEnvOptions::kName(), &cloud_env_option_type_info, + CloudEnvOptions::kName(), opts_str, reinterpret_cast(this)); + if (!s.ok()) { // Something went wrong. Attempt to reset + OptionTypeInfo::ParseStruct( + config_options, CloudEnvOptions::kName(), &cloud_env_option_type_info, + CloudEnvOptions::kName(), current, reinterpret_cast(this)); } } return s; } - -Status CloudEnvOptions::Serialize(const ConfigOptions& config_options, std::string* value) const { - return OptionTypeInfo::SerializeStruct(config_options, CloudEnvOptions::kName(), - &cloud_env_option_type_info, - CloudEnvOptions::kName(), reinterpret_cast(this), value); + +Status CloudEnvOptions::Serialize(const ConfigOptions& config_options, + std::string* value) const { + return OptionTypeInfo::SerializeStruct( + config_options, CloudEnvOptions::kName(), &cloud_env_option_type_info, + CloudEnvOptions::kName(), reinterpret_cast(this), value); } CloudEnv::CloudEnv(const CloudEnvOptions& options, Env* base, @@ -393,12 +395,13 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) { int count = 0; // Register the Env types library.Register( - CloudEnvImpl::kClassName(), - [](const std::string& /*uri*/, std::unique_ptr* guard, - std::string* /*errmsg*/) { - guard->reset(new CloudEnvImpl(CloudEnvOptions(), Env::Default(), nullptr)); - return guard->get(); - }); + CloudEnvImpl::kClassName(), + [](const std::string& /*uri*/, std::unique_ptr* guard, + std::string* /*errmsg*/) { + guard->reset( + new CloudEnvImpl(CloudEnvOptions(), Env::Default(), nullptr)); + return guard->get(); + }); count++; count += CloudEnvImpl::RegisterAwsObjects(library, arg); @@ -416,24 +419,24 @@ int DoRegisterCloudObjects(ObjectLibrary& library, const std::string& arg) { return guard->get(); }); count++; - + return count; } void CloudEnv::RegisterCloudObjects(const std::string& arg) { static std::once_flag do_once; - std::call_once(do_once, - [&]() { - auto library = ObjectLibrary::Default(); - DoRegisterCloudObjects(*library, arg); - }); -} - -Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std::string& value, + std::call_once(do_once, [&]() { + auto library = ObjectLibrary::Default(); + DoRegisterCloudObjects(*library, arg); + }); +} + +Status CloudEnv::CreateFromString(const ConfigOptions& config_options, + const std::string& value, std::unique_ptr* result) { RegisterCloudObjects(); std::string id; - std::unordered_map options; + std::unordered_map options; Status s; if (value.find("=") == std::string::npos) { id = value; @@ -471,19 +474,20 @@ Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std } } } - + if (s.ok()) { result->reset(static_cast(env.release())); } - - return s; + + return s; } -Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std::string& value, +Status CloudEnv::CreateFromString(const ConfigOptions& config_options, + const std::string& value, const CloudEnvOptions& cloud_options, std::unique_ptr* result) { RegisterCloudObjects(); std::string id; - std::unordered_map options; + std::unordered_map options; Status s; if (value.find("=") == std::string::npos) { id = value; @@ -523,14 +527,14 @@ Status CloudEnv::CreateFromString(const ConfigOptions& config_options, const std } } } - + if (s.ok()) { result->reset(static_cast(env.release())); } - - return s; + + return s; } - + #ifndef USE_AWS Status CloudEnv::NewAwsEnv(Env* /*base_env*/, const CloudEnvOptions& /*options*/, diff --git a/cloud/cloud_env_impl.cc b/cloud/cloud_env_impl.cc index dc47c6f3f83..85b492a166e 100644 --- a/cloud/cloud_env_impl.cc +++ b/cloud/cloud_env_impl.cc @@ -877,7 +877,7 @@ std::string CloudEnvImpl::RemapFilename(const std::string& logical_path) const { return logical_path; } auto file_name = basename(logical_path); - uint64_t fileNumber; + uint64_t fileNumber = 0; FileType type; WalFileType walType; if (file_name == "MANIFEST") { @@ -1497,14 +1497,14 @@ Status CloudEnvImpl::LoadCloudManifest(const std::string& local_dbname, if (!st.ok()) { return st; } - + // Do the cleanup, but don't fail if the cleanup fails. if (!read_only) { st = DeleteInvisibleFiles(local_dbname); if (!st.ok()) { Log(InfoLogLevel::INFO_LEVEL, info_log_, "Failed to delete invisible files: %s", st.ToString().c_str()); - // Ignore the fail + // Ignore the fail st = Status::OK(); } } diff --git a/cloud/cloud_env_impl.h b/cloud/cloud_env_impl.h index 7dbac544310..24e0f579237 100644 --- a/cloud/cloud_env_impl.h +++ b/cloud/cloud_env_impl.h @@ -27,11 +27,11 @@ class CloudEnvImpl : public CloudEnv { // Constructor CloudEnvImpl(const CloudEnvOptions& options, Env* base_env, const std::shared_ptr& logger); - + virtual ~CloudEnvImpl(); - static const char *kClassName() { return kCloud(); } + static const char* kClassName() { return kCloud(); } virtual const char* Name() const override { return kClassName(); } - + Status NewSequentialFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override; @@ -226,6 +226,11 @@ class CloudEnvImpl : public CloudEnv { const ImmutableDBOptions& db_options) const override { return base_env_->OptimizeForCompactionTableRead(env_options, db_options); } + +#ifdef _WIN32_WINNT +#undef GetFreeSpace +#endif + Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override { return base_env_->GetFreeSpace(path, diskfree); } @@ -249,7 +254,6 @@ class CloudEnvImpl : public CloudEnv { file_deletion_delay_ = delay; } - Status PrepareOptions(const ConfigOptions& config_options) override; Status ValidateOptions(const DBOptions& /*db_opts*/, const ColumnFamilyOptions& /*cf_opts*/) const override; diff --git a/cloud/cloud_file_cache.cc b/cloud/cloud_file_cache.cc index 95864b5835e..18af79fc537 100644 --- a/cloud/cloud_file_cache.cc +++ b/cloud/cloud_file_cache.cc @@ -147,7 +147,7 @@ void CloudEnvImpl::log(InfoLogLevel level, const std::string& fname, const std::string& msg) { uint64_t usage = cloud_env_options.sst_file_cache->GetUsage(); uint64_t capacity = cloud_env_options.sst_file_cache->GetCapacity(); - long percent = (capacity > 0 ? (100L * usage / capacity) : 0); + auto percent = (capacity > 0 ? (100L * usage / capacity) : 0); Log(level, info_log_, "[%s] FileCache %s %s cache-used %" PRIu64 "/%" PRIu64 "(%ld%%) bytes", Name(), fname.c_str(), msg.c_str(), usage, capacity, percent);