Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/llfs/ioring.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ TEST(IoRingTest, Test)
/*mode=*/0644);
ASSERT_GE(fd, 0) << std::strerror(errno);

std::array<char, 1023> buf;
std::array<char, kDirectIOBlockSize + kDirectIOBlockAlign - 1> buf;
std::intptr_t i = (std::intptr_t)buf.data();
i = (i + 511) & ~std::intptr_t{511};
i = (i + static_cast<std::intptr_t>(kDirectIOBlockAlign - 1)) &
~static_cast<std::intptr_t>(kDirectIOBlockAlign - 1);

ASSERT_EQ(i % kDirectIOBlockAlign, 0);

std::ostringstream oss;
oss << "hello, io_uring world!";
Expand Down
178 changes: 128 additions & 50 deletions src/llfs/ioring_page_file_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ StatusOr<std::shared_ptr<PageBuffer>> IoRingPageFileDevice::prepare(PageId page_
return {batt::StatusCode::kFailedPrecondition};
}

if (this->known_file_size_.load() == 0) {
BATT_REQUIRE_OK(this->init_known_file_size());
}

StatusOr<u64> physical_page = this->get_physical_page(page_id);
BATT_REQUIRE_OK(physical_page);

Expand Down Expand Up @@ -137,6 +141,7 @@ void IoRingPageFileDevice::write(std::shared_ptr<const PageBuffer>&& page_buffer
const PackedPageHeader& page_header = get_page_header(*page_buffer);
ConstBuffer remaining_data = [&] {
ConstBuffer buffer = get_const_buffer(page_buffer);

if (page_header.unused_end == page_header.size &&
page_header.unused_begin < page_header.unused_end) {
buffer = resize_buffer(
Expand Down Expand Up @@ -192,6 +197,10 @@ void IoRingPageFileDevice::write_some(i64 page_offset_in_file,
remaining_data += bytes_written;
page_offset_in_file += bytes_written;

// Update the known file size.
//
this->update_known_file_size(page_offset_in_file);

// The write was short; write again from the new stop point.
//
this->write_some(page_offset_in_file, std::move(page_buffer), remaining_data,
Expand All @@ -213,6 +222,10 @@ void IoRingPageFileDevice::read(PageId page_id, ReadHandler&& handler)
return;
}

if (this->known_file_size_.load() == 0) {
this->init_known_file_size().IgnoreError();
}

PageDeviceMetrics::instance() //
.read_count_per_page_size_log2[this->layout().page_size_log2]
.add(1);
Expand All @@ -238,64 +251,99 @@ void IoRingPageFileDevice::read_some(PageId page_id, i64 page_offset_in_file,

this->file_.async_read_some(
page_offset_in_file + n_read_so_far, buffer,
bind_handler(
std::move(handler),
[this, page_id, page_offset_in_file, page_buffer = std::move(page_buffer),
page_buffer_size, n_read_so_far](ReadHandler&& handler, StatusOr<i32> result) mutable {
if (!result.ok()) {
if (batt::status_is_retryable(result.status())) {
this->read_some(page_id, page_offset_in_file, std::move(page_buffer),
page_buffer_size, n_read_so_far, std::move(handler));
return;
}

LLFS_LOG_WARNING() << "IoRingPageFileDevice::read failed; page_offset_in_file+"
<< n_read_so_far << "=" << page_offset_in_file + n_read_so_far
<< " n_read_so_far=" << n_read_so_far
<< " page_offset_in_file=" << page_offset_in_file;

handler(result.status());
return;
}
BATT_CHECK_GT(*result, 0) << "We must either make progress or receive an error code!";
bind_handler(std::move(handler), [this, page_id, page_offset_in_file,
page_buffer = std::move(page_buffer), page_buffer_size,
n_read_so_far,
file_size_before = this->known_file_size_.load()](
ReadHandler&& handler, StatusOr<i32> result) mutable {
if (!result.ok()) {
if (batt::status_is_retryable(result.status())) {
this->read_some(page_id, page_offset_in_file, std::move(page_buffer), page_buffer_size,
n_read_so_far, std::move(handler));
return;
}

// Sanity check the page header and fail fast if something looks wrong.
//
const usize n_read_before = n_read_so_far;
n_read_so_far += *result;

if (!this->is_sharded_view_ && (n_read_before < sizeof(PackedPageHeader) &&
n_read_so_far >= sizeof(PackedPageHeader))) {
Status status = get_page_header(*page_buffer)
.sanity_check(PageSize{BATT_CHECKED_CAST(u32, page_buffer_size)},
page_id, this->page_ids_);
if (!status.ok()) {
// If the only sanity check that failed was a bad generation number, then we report
// page not found.
//
if (status == StatusCode::kPageHeaderBadGeneration) {
status = batt::StatusCode::kNotFound;
}
handler(status);
return;
}
VLOG(1) << "Short read: " << BATT_INSPECT(page_id)
<< BATT_INSPECT(page_offset_in_file) << BATT_INSPECT(page_buffer_size)
<< BATT_INSPECT(n_read_so_far);
}
LLFS_LOG_WARNING() << "IoRingPageFileDevice::read failed; page_offset_in_file+"
<< n_read_so_far << "=" << page_offset_in_file + n_read_so_far
<< " n_read_so_far=" << n_read_so_far
<< " page_offset_in_file=" << page_offset_in_file
<< BATT_INSPECT(result.status());

// If we have reached the end of the buffer, invoke the handler. Success!
//
if (n_read_so_far == page_buffer_size) {
handler(result.status());
return;
}

// If this is a grow-on-demand page file, and we are trying to read the last written
// page, fill with zeros if necessary.
//
if (*result == 0 && n_read_so_far > 0) {
const i64 file_size_now = this->known_file_size_.load();

if (this->physical_layout_.is_last_in_file) {
StatusOr<i64> file_size = sizeof_fd(this->file_.get_fd());
if (file_size.ok() &&
page_offset_in_file + n_read_so_far == BATT_CHECKED_CAST(u64, *file_size)) {
MutableBuffer buffer = get_mutable_buffer(page_buffer) + n_read_so_far;
std::memset(buffer.data(), 0, buffer.size());

// Success!
//
handler(std::move(page_buffer));
return;
}
}

// The write was short; write again from the new stop point.
//
// If file has grown, then try again.
//
if (file_size_now != file_size_before) {
this->read_some(page_id, page_offset_in_file, std::move(page_buffer), page_buffer_size,
n_read_so_far, std::move(handler));
}));
return;
}

BATT_CHECK_GT(*result, 0)
<< "We must either make progress or receive an error code!"
<< BATT_INSPECT(page_offset_in_file) << BATT_INSPECT(n_read_so_far)
<< BATT_INSPECT(page_offset_in_file + n_read_so_far)
<< BATT_INSPECT(this->physical_layout_.is_last_in_file);
}

// Sanity check the page header and fail fast if something looks wrong.
//
const usize n_read_before = n_read_so_far;
n_read_so_far += *result;

if (!this->is_sharded_view_ && (n_read_before < sizeof(PackedPageHeader) &&
n_read_so_far >= sizeof(PackedPageHeader))) {
Status status = get_page_header(*page_buffer)
.sanity_check(PageSize{BATT_CHECKED_CAST(u32, page_buffer_size)},
page_id, this->page_ids_);
if (!status.ok()) {
// If the only sanity check that failed was a bad generation number, then we report
// page not found.
//
if (status == StatusCode::kPageHeaderBadGeneration) {
status = batt::StatusCode::kNotFound;
}
handler(status);
return;
}
VLOG(1) << "Short read: " << BATT_INSPECT(page_id) << BATT_INSPECT(page_offset_in_file)
<< BATT_INSPECT(page_buffer_size) << BATT_INSPECT(n_read_so_far);
}

// If we have reached the end of the buffer, invoke the handler. Success!
//
if (n_read_so_far == page_buffer_size) {
handler(std::move(page_buffer));
return;
}

// The write was short; write again from the new stop point.
//
this->read_some(page_id, page_offset_in_file, std::move(page_buffer), page_buffer_size,
n_read_so_far, std::move(handler));
}));
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -322,6 +370,36 @@ bool IoRingPageFileDevice::is_last_in_file() const
return this->layout().is_last_in_file;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
Status IoRingPageFileDevice::init_known_file_size()
{
StatusOr<i64> file_size = sizeof_fd(this->file_.get_fd());
BATT_REQUIRE_OK(file_size);

this->update_known_file_size(*file_size);

return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
i64 IoRingPageFileDevice::update_known_file_size(i64 target_min_size)
{
i64 observed_size = this->known_file_size_.load();
for (;;) {
if (observed_size >= target_min_size) {
break;
}

if (this->known_file_size_.compare_exchange_weak(observed_size, target_min_size)) {
observed_size = target_min_size;
break;
}
}
return observed_size;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<u64> IoRingPageFileDevice::get_physical_page(PageId page_id) const
Expand Down
8 changes: 8 additions & 0 deletions src/llfs/ioring_page_file_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class IoRingPageFileDevice : public PageDevice

private:
//+++++++++++-+-+--+----- --- -- - - - -
Status init_known_file_size();

i64 update_known_file_size(i64 target_min_size);

StatusOr<u64> get_physical_page(PageId page_id) const;

StatusOr<i64> get_file_offset_of_page(PageId page_id) const;
Expand Down Expand Up @@ -145,6 +149,10 @@ class IoRingPageFileDevice : public PageDevice
// Used to construct and parse PageIds for this device.
//
PageIdFactory page_ids_;

// If this is not read-only, then track the known file size.
//
std::atomic<i64> known_file_size_{0};
};

} // namespace llfs
Expand Down
39 changes: 39 additions & 0 deletions src/llfs/page_cache_slot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,45 @@ class PageCacheSlot
return this->size_;
}

/** \brief Takes the size allocated to `that` and moves it to `this`.
*/
void subsume(ExternalAllocation&& that)
{
if (that.pool_ == nullptr || that.pool_ == 0) {
return;
}

if (this->pool_ == nullptr || this->size_ == 0) {
*this = std::move(that);
return;
}

BATT_CHECK_EQ(this->pool_, that.pool_);
this->size_ += that.size_;

that.pool_ = nullptr;
that.size_ = 0;
}

/** \brief Divides this allocation into two, returning a new object with the passed size, which
* is subtracted from this->size().
*
* If byte_count is too large, returns batt::StatusCode::kOutOfRange.
*/
StatusOr<ExternalAllocation> split(usize byte_count)
{
if (this->size_ < byte_count) {
return {batt::StatusCode::kOutOfRange};
}

this->size_ -= byte_count;
if (this->size_ == 0) {
this->pool_ = nullptr;
}

return ExternalAllocation{*this->pool_, byte_count};
}

explicit operator bool() const noexcept
{
return this->pool_ && this->size_ != 0;
Expand Down
6 changes: 4 additions & 2 deletions src/llfs/raw_block_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//

#include <batteries/hint.hpp>
#include "llfs/config.hpp"

namespace llfs {

Expand All @@ -36,14 +37,15 @@ Status RawBlockFile::validate_buffer(const ConstBuffer& buffer, i64 offset)
//
/*static*/ i64 RawBlockFile::align_up(i64 n)
{
return (n + 511) & ~511ll;
return (n + static_cast<i64>(kDirectIOBlockAlign - 1)) &
~static_cast<i64>(kDirectIOBlockAlign - 1);
}

// Return the greatest block-aligned value not greater than n.
//
/*static*/ i64 RawBlockFile::align_down(i64 n)
{
return n & ~511ll;
return n & ~static_cast<i64>(kDirectIOBlockAlign - 1);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down