Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional_len to FSReadRequest #13350

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
@@ -93,6 +93,7 @@ void FilePrefetchBuffer::PrepareBufferForRead(
Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t read_len, uint64_t aligned_useful_len,
uint64_t optional_read_len,
uint64_t start_offset, bool use_fs_buffer) {
Slice result;
Status s;
@@ -102,8 +103,13 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,
read_len, result);
} else {
to_buf = buf->buffer_.BufferStart() + aligned_useful_len;
s = reader->Read(opts, start_offset + aligned_useful_len, read_len, &result,
to_buf, /*aligned_buf=*/nullptr);
if (0 < optional_read_len) {
s = FlexibleRead(reader, opts, start_offset + aligned_useful_len,
read_len, optional_read_len, to_buf, result);
} else {
s = reader->Read(opts, start_offset + aligned_useful_len, read_len,
&result, to_buf, /*aligned_buf=*/nullptr);
}
}

#ifndef NDEBUG
@@ -196,8 +202,13 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,

Status s;
if (read_len > 0) {
s = Read(buf, opts, reader, read_len, aligned_useful_len, rounddown_offset,
use_fs_buffer);
// Currently FilePrefetchBuffer::Prefetch is used in
// BlockBasedTable::PrefetchTail. Our optimization for FlexibleRead is meant
// for when we want to start from the beginning of the file in compaction
// and read the whole file sequentially. It is probably not worth setting
// optimal_read_len > 0 in this case.
s = Read(buf, opts, reader, read_len, aligned_useful_len,
/*optional_read_len=*/0, rounddown_offset, use_fs_buffer);
}

if (usage_ == FilePrefetchBufferUsage::kTableOpenPrefetchTail && s.ok()) {
@@ -592,6 +603,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t length,
size_t readahead_size,
bool for_compaction,
bool& copy_to_overlap_buffer) {
if (!enable_) {
return Status::OK();
@@ -603,7 +615,7 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
Status s;
uint64_t tmp_offset = offset;
size_t tmp_length = length;
size_t original_length = length;
const size_t original_length = length;

// Abort outdated IO.
if (!explicit_prefetch_submitted_) {
@@ -734,8 +746,16 @@ Status FilePrefetchBuffer::PrefetchInternal(const IOOptions& opts,
}

if (read_len1 > 0) {
s = Read(buf, opts, reader, read_len1, aligned_useful_len1, start_offset1,
use_fs_buffer);
// This optimization applies for low priority reads that read the entire
// file in general, but for now we can focus on compaction reads. Direct IO
// requires the start / end offsets to be aligned so we don't want our read
// request to be trimmed at the end.
size_t optional_read_length = for_compaction && !reader->use_direct_io() &&
original_length < read_len1
? read_len1 - original_length
: 0;
s = Read(buf, opts, reader, read_len1, aligned_useful_len1,
optional_read_length, start_offset1, use_fs_buffer);
if (!s.ok()) {
AbortAllIOs();
FreeAllBuffers();
@@ -839,7 +859,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
s = PrefetchInternal(
opts, reader, offset, n,
(num_buffers_ > 1 ? readahead_size_ / 2 : readahead_size_),
copy_to_overlap_buffer);
for_compaction, copy_to_overlap_buffer);
explicit_prefetch_submitted_ = false;
if (!s.ok()) {
if (status) {
29 changes: 26 additions & 3 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
@@ -434,12 +434,12 @@ class FilePrefetchBuffer {

Status PrefetchInternal(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t length, size_t readahead_size,
bool& copy_to_third_buffer);
bool for_compaction, bool& copy_to_third_buffer);

Status Read(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
uint64_t aligned_useful_len, uint64_t start_offset,
bool use_fs_buffer);
uint64_t aligned_useful_len, uint64_t optional_read_len,
uint64_t start_offset, bool use_fs_buffer);

Status ReadAsync(BufferInfo* buf, const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t read_len,
@@ -541,6 +541,29 @@ class FilePrefetchBuffer {
return s;
}

// FlexibleRead enables the result size to be in the range of
// [len - optional_len, len]
IOStatus FlexibleRead(RandomAccessFileReader* reader, const IOOptions& opts,
uint64_t offset, size_t len, size_t optional_len,
char* scratch, Slice& result) {
FSReadRequest read_req;
read_req.offset = offset;
read_req.len = len;
read_req.scratch = scratch;
assert(optional_len <= len);
read_req.optional_len = optional_len;
IOStatus s = reader->MultiRead(opts, &read_req, 1, nullptr);
if (!s.ok()) {
return s;
}
s = read_req.status;
if (!s.ok()) {
return s;
}
result = read_req.result;
return s;
}

void DestroyAndClearIOHandle(BufferInfo* buf) {
if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) {
buf->del_fn_(buf->io_handle_);
30 changes: 29 additions & 1 deletion include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
@@ -812,6 +812,25 @@ struct FSReadRequest {
// returns fewer bytes if end of file is hit (or `status` is not OK).
size_t len;

// EXPERIMENTAL: Enables the file system to return less data than
// requested, even when the end of file has not been reached. Normally, our
// read semantics are defined so that we assume that less data is only
// returned when the end of file has been reached or an error has occurred.
//
// When optional_len > 0, if the end of the file is not reached, the
// returned data's size can be in the range [len - optional_len, len]. The
// returned data must still begin at the same offset, so only the tail end of
// the request is potentially trimmed.
//
// Note that optional_len should never exceed len
//
// It may be useful to set optional_len > 0 when prefetching is being
// performed and some of the data is not needed immediately. In that case, the
// file system has the freedom to tune the read size optimally based on its
// storage internals.

size_t optional_len = 0;

// A buffer that MultiRead() can optionally place data in. It can
// ignore this and allocate its own buffer.
// The lifecycle of scratch will be until IO is completed.
@@ -857,11 +876,20 @@ struct FSReadRequest {
// 2. Take ownership of the object managed by fs_scratch.
// 3. Handle invoking the custom deleter function from the FSAllocationPtr.
//
// WARNING: Do NOT assume that fs_scratch points to the start of the actual
// WARNING 1: Do NOT assume that fs_scratch points to the start of the actual
// char* data returned by the read. As the type signature suggests, fs_scratch
// is a pointer to any arbitrary data type. Use result.data() to get a valid
// start to the real data. See https://github.com/facebook/rocksdb/pull/13189
// for more context.
//
// WARNING 2: Since fs_scratch is a unique pointer, FSReadRequest's copy
// constructor is implicitly disabled. This turns out to be very useful
// because we want users to be explicit when setting offset, len, and
// optional_len. Consider the possibility where optional_len ends
// up exceeding the request length because it was copied over by mistake. If
// you end up wanting to delete this field, be very careful and consider
// explicitly deleting the copy constructor, since the lack of a copy
// constructor is likely acting as a good protective measure against bugs
FSAllocationPtr fs_scratch;
};