Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet File Metadata caching implementation #541

Open
wants to merge 10 commits into
base: project-antalya
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ The server successfully detected this situation and will download merged part fr
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.") \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.") \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
3 changes: 2 additions & 1 deletion src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ namespace DB
M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
M(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0)
M(Bool, disable_insertion_and_mutation, false, "Disable all insert/alter/delete queries. This setting will be enabled if someone needs read-only nodes to prevent insertion and mutation affect reading performance.", 0) \
M(UInt64, input_format_parquet_metadata_cache_max_entries, 100000, "Maximum number of parquet file metadata to cache.", 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turned it into a server settings, makes more sense as it can't be changed at runtime


/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,7 @@ class IColumn;
M(Bool, precise_float_parsing, false, "Prefer more precise (but slower) float parsing algorithm", 0) \
M(DateTimeOverflowBehavior, date_time_overflow_behavior, "ignore", "Overflow mode for Date, Date32, DateTime, DateTime64 types. Possible values: 'ignore', 'throw', 'saturate'.", 0) \
M(Bool, validate_experimental_and_suspicious_types_inside_nested_types, true, "Validate usage of experimental and suspicious types inside nested types like Array/Map/Tuple", 0) \
M(Bool, input_format_parquet_use_metadata_cache, false, "Enable parquet file metadata caching.", 0) \
arthurpassos marked this conversation as resolved.
Show resolved Hide resolved


// End of FORMAT_FACTORY_SETTINGS
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class IInputFormat : public SourceWithKeyCondition

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
78 changes: 75 additions & 3 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#if USE_PARQUET

#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Formats/FormatFactory.h>
Expand Down Expand Up @@ -34,6 +37,12 @@ namespace CurrentMetrics
extern const Metric ParquetDecoderThreadsScheduled;
}

namespace ProfileEvents
{
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace DB
{

Expand Down Expand Up @@ -426,6 +435,15 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_cache_entries)
: CacheBase(max_cache_entries) {}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_cache_entries)
arthurpassos marked this conversation as resolved.
Show resolved Hide resolved
{
static ParquetFileMetaDataCache instance(max_cache_entries);
return &instance;
}

ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer & buf,
const Block & header_,
Expand All @@ -450,20 +468,58 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
pool->wait();
}

void ParquetBlockInputFormat::initializeIfNeeded()
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
{
if (std::exchange(is_initialized, true))
createArrowFileIfNotCreated();
return parquet::ReadMetaData(arrow_file);
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
{
// in-memory cache is not implemented for local file operations, only for remote files
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
if (!metadata_cache.use_cache || metadata_cache.key.empty())
{
return readMetadataFromFile();
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_entries)->getOrSet(
metadata_cache.key,
[&]()
{
return readMetadataFromFile();
}
);
if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
return parquet_file_metadata;
}

void ParquetBlockInputFormat::createArrowFileIfNotCreated()
{
if (arrow_file)
{
return;
}

// Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
// we'll need to read (which we know in advance). Use max_download_threads for that.
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
}

void ParquetBlockInputFormat::initializeIfNeeded()
{
if (std::exchange(is_initialized, true))
return;

if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
metadata = getFileMetaData();

std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
Expand Down Expand Up @@ -494,6 +550,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast<size_t>(format_settings.parquet.max_block_size));
};

bool has_row_groups_to_read = false;

for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
Expand All @@ -515,6 +573,12 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();
auto rows = adaptive_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
has_row_groups_to_read = true;
}

if (has_row_groups_to_read)
{
createArrowFileIfNotCreated();
}
}

Expand Down Expand Up @@ -843,6 +907,14 @@ const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
return previous_block_missing_values;
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings.input_format_parquet_use_metadata_cache;
metadata_cache.max_entries = server_settings.input_format_parquet_metadata_cache_max_entries;
}


ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)
{
Expand Down
24 changes: 24 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "config.h"
#if USE_PARQUET

#include <Common/CacheBase.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -65,6 +66,8 @@ class ParquetBlockInputFormat : public IInputFormat

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;

private:
Chunk read() override;

Expand All @@ -83,6 +86,11 @@ class ParquetBlockInputFormat : public IInputFormat

void threadFunction(size_t row_group_batch_idx);

void createArrowFileIfNotCreated();
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();

std::shared_ptr<parquet::FileMetaData> getFileMetaData();

// Data layout in the file:
//
// row group 0
Expand Down Expand Up @@ -288,6 +296,12 @@ class ParquetBlockInputFormat : public IInputFormat
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
struct Cache
{
String key;
bool use_cache = false;
UInt64 max_entries{0};
} metadata_cache;
};

class ParquetSchemaReader : public ISchemaReader
Expand All @@ -306,6 +320,16 @@ class ParquetSchemaReader : public ISchemaReader
std::shared_ptr<parquet::FileMetaData> metadata;
};

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance(UInt64 max_cache_entries);
void clear() {}

private:
ParquetFileMetaDataCache(UInt64 max_cache_entries);
};

}

#endif
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
if (need_only_count)
input_format->needOnlyCount();

if (object_info->getPath().length())
input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);

builder.init(Pipe(input_format));

if (read_from_format_info.columns_description.hasDefaults())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
10
10
10
28 changes: 28 additions & 0 deletions tests/queries/0_stateless/03262_parquet_s3_metadata_cache.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Tags: no-parallel, no-fasttest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a few tests for parquet_metadata_cache_max_entries?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would have to be an integration test, maybe with 10s or 100's of parquet files. I can add it in another PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If local files also benefited from metadata cache, an integration test wouldn't be needed I suppose. But doesn't look like we want to do it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local Parquet files, OS file cache will be in effect.


DROP TABLE IF EXISTS t_parquet_03262;

CREATE TABLE t_parquet_03262 (a UInt64)
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet)
PARTITION BY a;

INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where log_comment = 'test_03262_parquet_metadata_cache'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;
Loading