-
Notifications
You must be signed in to change notification settings - Fork 2k
[CHORE][wal3] encapsulate fragment reading in FragmentConsumer trait #6107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
|
It also updates the downstream log-service and s3heap-service consumers to align with the new constructor signatures, adds coverage for relative and absolute parquet offsets, and refreshes the manifest utilities to follow the refactored call pattern. Affected Areas• rust/wal3/src/interfaces/mod.rs This summary was automatically generated by @propel-code-bot |
8174102 to
6758bfc
Compare
| async fn read_fragment(&self, path: &str, _: LogPosition) -> Result<Option<Fragment>, Error> { | ||
| super::read_fragment(&self.storage, &self.prefix, path, None).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Logic] The LogPosition argument is ignored here as well. It should be passed to read_fragment to support relative offsets.
Context for Agents
The `LogPosition` argument is ignored here as well. It should be passed to `read_fragment` to support relative offsets.
File: rust/wal3/src/interfaces/s3/fragment_puller.rs
Line: 44There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. I changed this elsewhere.
| async fn read_parquet( | ||
| &self, | ||
| path: &str, | ||
| _: LogPosition, | ||
| ) -> Result<(Setsum, Vec<(LogPosition, Vec<u8>)>, u64), Error> { | ||
| super::read_parquet(&self.storage, &self.prefix, path, None).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Logic] The LogPosition argument is ignored here, effectively passing None to read_parquet. This prevents reading fragments with relative offsets (which require a base position). Consider passing Some(pos) to support both absolute and relative offset files.
Note: This change requires updating read_parquet in mod.rs to accept (Some(_), false) (see my other comment).
Context for Agents
The `LogPosition` argument is ignored here, effectively passing `None` to `read_parquet`. This prevents reading fragments with relative offsets (which require a base position). Consider passing `Some(pos)` to support both absolute and relative offset files.
Note: This change requires updating `read_parquet` in `mod.rs` to accept `(Some(_), false)` (see my other comment).
File: rust/wal3/src/interfaces/s3/fragment_puller.rs
Line: 40There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By design. Pass in None when you know it's absolute, pass in Some when you know it's relative.
| (Some(starting_log_position), true) => { | ||
| for record in records.iter_mut() { | ||
| record.0 = LogPosition::from_offset( | ||
| starting_log_position | ||
| .offset() | ||
| .checked_add(record.0.offset()) | ||
| .ok_or(Error::Overflow(format!( | ||
| "log position overflow: {} + {}", | ||
| starting_log_position.offset(), | ||
| record.0.offset() | ||
| )))?, | ||
| ); | ||
| } | ||
| Ok((setsum, records, num_bytes)) | ||
| } | ||
| (None, false) => Ok((setsum, records, num_bytes)), | ||
| (Some(_), false) => Err(Error::internal(file!(), line!())), | ||
| (None, true) => Err(Error::internal(file!(), line!())), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Logic] The current pattern matching treats (Some(_), false) as an error, which causes failures if a caller provides a position for a file with absolute offsets. To support S3FragmentPuller passing the position (needed for relative offsets), this case should be relaxed to return the records as-is.
| (Some(starting_log_position), true) => { | |
| for record in records.iter_mut() { | |
| record.0 = LogPosition::from_offset( | |
| starting_log_position | |
| .offset() | |
| .checked_add(record.0.offset()) | |
| .ok_or(Error::Overflow(format!( | |
| "log position overflow: {} + {}", | |
| starting_log_position.offset(), | |
| record.0.offset() | |
| )))?, | |
| ); | |
| } | |
| Ok((setsum, records, num_bytes)) | |
| } | |
| (None, false) => Ok((setsum, records, num_bytes)), | |
| (Some(_), false) => Err(Error::internal(file!(), line!())), | |
| (None, true) => Err(Error::internal(file!(), line!())), | |
| } | |
| (Some(starting_log_position), true) => { | |
| for record in records.iter_mut() { | |
| record.0 = LogPosition::from_offset( | |
| starting_log_position | |
| .offset() | |
| .checked_add(record.0.offset()) | |
| .ok_or(Error::Overflow(format!( | |
| "log position overflow: {} + {}", | |
| starting_log_position.offset(), | |
| record.0.offset() | |
| )))?, | |
| ); | |
| } | |
| Ok((setsum, records, num_bytes)) | |
| } | |
| (_, false) => Ok((setsum, records, num_bytes)), | |
| (None, true) => Err(Error::internal(file!(), line!())), | |
| } |
Context for Agents
The current pattern matching treats `(Some(_), false)` as an error, which causes failures if a caller provides a position for a file with absolute offsets. To support `S3FragmentPuller` passing the position (needed for relative offsets), this case should be relaxed to return the records as-is.
```suggestion
(Some(starting_log_position), true) => {
for record in records.iter_mut() {
record.0 = LogPosition::from_offset(
starting_log_position
.offset()
.checked_add(record.0.offset())
.ok_or(Error::Overflow(format!(
"log position overflow: {} + {}",
starting_log_position.offset(),
record.0.offset()
)))?,
);
}
Ok((setsum, records, num_bytes))
}
(_, false) => Ok((setsum, records, num_bytes)),
(None, true) => Err(Error::internal(file!(), line!())),
}
```
File: rust/wal3/src/interfaces/s3/mod.rs
Line: 2836758bfc to
bc1ffd3
Compare
|
|
||
| /// Concrete type alias for the LogReader with S3 consumers. | ||
| type S3LogReader = LogReader<(FragmentSeqNo, LogPosition), FragmentPuller, ManifestReader>; | ||
| type S3LogReader = LogReader<(FragmentSeqNo, LogPosition), S3FragmentPuller, ManifestReader>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this type def seems to be in two places - here and in rust/log-service/src/lib.rs. Is there a shared util somewhere between s3heap and wal3?
bc1ffd3 to
cae531a
Compare
| for record in records.iter_mut() { | ||
| record.0 = LogPosition::from_offset( | ||
| starting_log_position | ||
| .offset() | ||
| .checked_add(record.0.offset()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Logic] checksum_parquet already adds starting_log_position to the offsets when processing relative offset files (see offset_base calculation in interfaces/mod.rs).
Adding it again here results in double application of the offset (e.g., base + (base + offset)).
Since checksum_parquet handles the offset translation and overflow checking, this logic should be removed. Just return the records from checksum_parquet.
Context for Agents
`checksum_parquet` already adds `starting_log_position` to the offsets when processing relative offset files (see `offset_base` calculation in `interfaces/mod.rs`).
Adding it again here results in double application of the offset (e.g., `base + (base + offset)`).
Since `checksum_parquet` handles the offset translation and overflow checking, this logic should be removed. Just return the records from `checksum_parquet`.
File: rust/wal3/src/interfaces/s3/mod.rs
Line: 270There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get this right with unit tests up the stack.
Move fragment reading methods (read_raw_bytes, read_parquet, read_fragment) into the FragmentConsumer trait, eliminating the need for LogReader to hold a Storage reference. This improves encapsulation and simplifies the LogReader interface. - Rename FragmentPuller to S3FragmentPuller for naming consistency - Add read_raw_bytes, read_parquet, read_fragment to FragmentConsumer trait - Move checksum_parquet utility from reader.rs to interfaces/mod.rs - Remove storage parameter from LogReader::new and LogReader::open - Remove unused _writer_name parameters from make_log_reader helpers Co-authored-by: AI
cae531a to
6b9e844
Compare
Description of changes
Move fragment reading methods (read_raw_bytes, read_parquet, read_fragment)
into the FragmentConsumer trait, eliminating the need for LogReader to hold
a Storage reference. This improves encapsulation and simplifies the LogReader
interface.
Test plan
Pass locally + CI
Migration plan
N/A
Observability plan
N/A
Documentation Changes
N/A
Co-authored-by: AI