Skip to content

Commit

Permalink
Optional read size
Browse files Browse the repository at this point in the history
  • Loading branch information
archang19 committed Jan 29, 2025
1 parent a6322b9 commit 2e53604
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
25 changes: 16 additions & 9 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ void FilePrefetchBuffer::PrepareBufferForRead(

Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader,
const uint64_t original_length,
uint64_t read_len, uint64_t aligned_useful_len,
uint64_t start_offset, bool use_fs_buffer) {
uint64_t start_offset, bool use_fs_buffer,
bool for_compaction) {
Slice result;
Status s;
char* to_buf = nullptr;
Expand All @@ -102,8 +104,11 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
read_len, result);
} else {
to_buf = buf->buffer_.BufferStart() + aligned_useful_len;
s = reader->Read(opts, start_offset + aligned_useful_len, read_len, &result,
to_buf, /*aligned_buf=*/nullptr);
IOOptions tuned_read_opts =
MaybeTuneIOOptions(opts, for_compaction, reader->use_direct_io(),
original_length, read_len);
s = reader->Read(tuned_read_opts, start_offset + aligned_useful_len,
read_len, &result, to_buf, /*aligned_buf=*/nullptr);
}

#ifndef NDEBUG
Expand Down Expand Up @@ -196,8 +201,9 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,

Status s;
if (read_len > 0) {
s = Read(buf, opts, reader, read_len, aligned_useful_len, rounddown_offset,
use_fs_buffer);
s = Read(buf, opts, reader, /*original_length=*/n, read_len,
aligned_useful_len, rounddown_offset, use_fs_buffer,
/*for_compaction=*/false);
}

if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) {
Expand Down Expand Up @@ -592,6 +598,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t length,
size_t readahead_size,
bool for_compaction,
bool& copy_to_overlap_buffer) {
if (!enable_) {
return Status::OK();
Expand All @@ -603,7 +610,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
Status s;
uint64_t tmp_offset = offset;
size_t tmp_length = length;
size_t original_length = length;
const size_t original_length = length;

// Abort outdated IO.
if (!explicit_prefetch_submitted_) {
Expand Down Expand Up @@ -734,8 +741,8 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
}

if (read_len1 > 0) {
s = Read(buf, opts, reader, read_len1, aligned_useful_len1, start_offset1,
use_fs_buffer);
s = Read(buf, opts, reader, original_length, read_len1, aligned_useful_len1,
start_offset1, use_fs_buffer, for_compaction);
if (!s.ok()) {
AbortAllIOs();
FreeAllBuffers();
Expand Down Expand Up @@ -839,7 +846,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
for_compaction, copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
if (!s.ok()) {
if (status) {
Expand Down
19 changes: 15 additions & 4 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,12 @@ class FilePrefetchBuffer {

Status PrefetchInternal(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t length, size_t readahead_size,
bool& copy_to_third_buffer);
bool for_compaction, bool& copy_to_third_buffer);

Status Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
uint64_t aligned_useful_len, uint64_t start_offset,
bool use_fs_buffer);
RandomAccessFileReader* reader, const uint64_t original_length,
uint64_t read_len, uint64_t aligned_useful_len,
uint64_t start_offset, bool use_fs_buffer, bool for_compaction);

Status ReadAsync(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
Expand Down Expand Up @@ -573,6 +573,17 @@ class FilePrefetchBuffer {
uint64_t& offset, uint64_t& end_offset,
size_t& read_len, uint64_t& aligned_useful_len);

IOOptions MaybeTuneIOOptions(const IOOptions& opts, bool for_compaction,
bool use_direct_io, uint64_t original_length,
uint64_t read_length) {
if (for_compaction && !use_direct_io && original_length < read_length) {
IOOptions updated_opts = opts;
updated_opts.optional_read_size = read_length - original_length;
return updated_opts;
}
return opts;
}

void UpdateStats(bool found_in_buffer, size_t length_found) {
if (found_in_buffer) {
RecordTick(stats_, PREFETCH_HITS);
Expand Down
10 changes: 10 additions & 0 deletions include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ struct IOOptions {
// FSSupportedOps, otherwise this feature will not be used.
bool verify_and_reconstruct_read;

// EXPERIMENTAL: Enables the file system to return less data than
// requested, even when the end of file has not been reached. Normally, our
// read semantics are defined so that we assume that less data is only
// returned when the end of file has been reached or an error has occurred. It
// may be useful to set optional_read_size > 0 when prefetching is being
// performed and some of the data is not needed immediately. In that case, the
// file system has the freedom to tune the read size optimally based on its
// storage internals.
size_t optional_read_size = 0;

// EXPERIMENTAL
Env::IOActivity io_activity = Env::IOActivity::kUnknown;

Expand Down

0 comments on commit 2e53604

Please sign in to comment.