Skip to content

Conversation

@viktorerlingsson
Copy link
Member

WHAT is this pull request doing?

Optimize stream message storage by only storing offsets for the first message in each segment instead of every message. The offset for subsequent messages is calculated on-demand based on the segment's first offset and message position in the segment.

  • Removes x-stream-offset header from messages at write time, reducing disk I/O and message size
  • Calculates offsets dynamically when consuming messages or looking up positions

Increases write-performance by around ~30% with small messages in my tests. Doesn't have a big impact on read performance.

Works with streams created before this change as well, but messages published with the changes in this PR can not be read by older lavinmq versions.

HOW can this pull request be tested?

Should be well covered by existing specs.

…es following messages based on that. saves us from having to write "x-stream-offset":i64 for each message
@viktorerlingsson viktorerlingsson requested a review from a team as a code owner November 14, 2025 14:03
@claude
Copy link

claude bot commented Nov 14, 2025

Claude finished @viktorerlingsson's task —— View job


Code Review Progress

  • Read and analyze changed files
  • Identify bugs, security issues, performance problems, error handling gaps, and Crystal anti-patterns
  • Provide review feedback

Issues Found

Potential race condition in consumer offset handling (stream_message_store.cr:368-377):
The produce_metadata method calculates segment first offset based on previous segments, but this calculation could be incorrect if segments are being dropped concurrently due to max-age/max-length policies.

Missing error handling in offset calculation (stream_message_store.cr:102):
The line msg_offset = @segment_first_offset[segment] || 0i64 defaults to 0 if the segment offset is missing, but this could lead to incorrect offset calculations if the segment legitimately should have a different starting offset.

Inconsistent state handling (stream_message_store.cr:295):
The condition unless @last_offset.zero? in open_new_segment may not handle the case where the first message in a new message store has offset 1, but @last_offset is 0.

@viktorerlingsson viktorerlingsson self-assigned this Nov 17, 2025
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
offset = offset_from_headers(msg.properties.headers)
offset = @segment_first_offset[seg]
mfile.pos = 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced, or only a problem here, but using the internal MFile pos wont be thread safe. Doing a .dup of the MFile is one way to solve it, there are probably more efficent ways.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants