Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet File Metadata caching implementation #541

Open
wants to merge 10 commits into
base: project-antalya
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ The server successfully detected this situation and will download merged part fr
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.") \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.") \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
2 changes: 2 additions & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,8 @@ class IColumn;
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \
M(DateTimeOverflowBehavior, date_time_overflow_behavior, "ignore", "Overflow mode for Date, Date32, DateTime, DateTime64 types. Possible values: 'ignore', 'throw', 'saturate'.", 0) \
M(Bool, validate_experimental_and_suspicious_types_inside_nested_types, true, "Validate usage of experimental and suspicious types inside nested types like Array/Map/Tuple", 0) \
M(Bool, input_format_parquet_use_metadata_cache, false, "Enable parquet file metadata caching.", 0) \
arthurpassos marked this conversation as resolved.
Show resolved Hide resolved
M(UInt64, input_format_parquet_metadata_cache_max_entries, 100000, "Maximum number of parquet file metadata to cache.", 0) \


// End of FORMAT_FACTORY_SETTINGS
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class IInputFormat : public SourceWithKeyCondition

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
48 changes: 47 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#if USE_PARQUET

#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Formats/FormatFactory.h>
Expand Down Expand Up @@ -34,6 +36,12 @@ namespace CurrentMetrics
extern const Metric ParquetDecoderThreadsScheduled;
}

namespace ProfileEvents
{
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace DB
{

Expand Down Expand Up @@ -426,6 +434,15 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries)
: CacheBase(max_cache_entries) {}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries)
arthurpassos marked this conversation as resolved.
Show resolved Hide resolved
{
static ParquetFileMetaDataCache instance(max_cache_entries);
return &instance;
}

ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer & buf,
const Block & header_,
Expand All @@ -450,6 +467,27 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
pool->wait();
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
{
if (!metadata_cache.use_cache || !metadata_cache.key.length())
{
return parquet::ReadMetaData(arrow_file);
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_entries)->getOrSet(
metadata_cache.key,
[this]()
{
return parquet::ReadMetaData(arrow_file);
}
);
if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
return parquet_file_metadata;
}

void ParquetBlockInputFormat::initializeIfNeeded()
{
if (std::exchange(is_initialized, true))
Expand All @@ -463,7 +501,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
metadata = getFileMetaData();

std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
Expand Down Expand Up @@ -843,6 +881,14 @@ const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
return previous_block_missing_values;
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings.parquet_use_metadata_cache;
metadata_cache.max_entries = settings.parquet_metadata_cache_max_entries;
}


ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)
{
Expand Down
21 changes: 21 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "config.h"
#if USE_PARQUET

#include <Common/CacheBase.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -65,6 +66,8 @@ class ParquetBlockInputFormat : public IInputFormat

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;
arthurpassos marked this conversation as resolved.
Show resolved Hide resolved

private:
Chunk read() override;

Expand All @@ -83,6 +86,8 @@ class ParquetBlockInputFormat : public IInputFormat

void threadFunction(size_t row_group_batch_idx);

std::shared_ptr<parquet::FileMetaData> getFileMetaData();

// Data layout in the file:
//
// row group 0
Expand Down Expand Up @@ -288,6 +293,12 @@ class ParquetBlockInputFormat : public IInputFormat
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
struct Cache
{
String key;
bool use_cache = false;
UInt64 max_entries{0};
} metadata_cache;
};

class ParquetSchemaReader : public ISchemaReader
Expand All @@ -306,6 +317,16 @@ class ParquetSchemaReader : public ISchemaReader
std::shared_ptr<parquet::FileMetaData> metadata;
};

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries);
void clear() {}

private:
ParquetFileMetaDataCache(UInt64 max_cache_entries);
};

}

#endif
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
if (need_only_count)
input_format->needOnlyCount();

if (object_info->getPath().length())
input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);

builder.init(Pipe(input_format));

if (read_from_format_info.columns_description.hasDefaults())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
10
10
10
28 changes: 28 additions & 0 deletions tests/queries/0_stateless/03262_parquet_s3_metadata_cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Tags: no-parallel, no-fasttest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a few tests for parquet_metadata_cache_max_entries?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would have to be an integration test, maybe with 10s or 100's of parquet files. I can add it in another PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If local files also benefited from metadata cache, an integration test wouldn't be needed I suppose. But doesn't look like we want to do it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local Parquet files, OS file cache will be in effect.


DROP TABLE IF EXISTS t_parquet_03262;

CREATE TABLE t_parquet_03262 (a UInt64)
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet)
PARTITION BY a;

INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS parquet_use_metadata_cache=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS parquet_use_metadata_cache=1,custom_x='test03262';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where query like '%test03262%'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;
Loading