Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/msg_segment_meta_files_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe "Message segment metadata files" do

# Publish enough messages to trigger new segment creation
segment_size = LavinMQ::Config.instance.segment_size
message_size = 50
message_size = 42
messages_needed = (segment_size / message_size).to_i + 10

messages_needed.times do |i|
Expand Down
56 changes: 31 additions & 25 deletions src/lavinmq/amqp/stream/stream_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,9 @@ module LavinMQ::AMQP

private def get_last_offset : Int64
return 0i64 if @size.zero?
bytesize = 0_u32
mfile = @segments.last_value
loop do
bytesize = BytesMessage.skip(mfile)
rescue IO::EOFError
break
end
msg = BytesMessage.from_bytes(mfile.to_slice + (mfile.pos - bytesize))
offset_from_headers(msg.properties.headers)
offset = @offset_index.last_value
offset += @segment_msg_count.last_value
offset
end

# Used once when a consumer is started
Expand Down Expand Up @@ -86,8 +80,15 @@ module LavinMQ::AMQP
private def offset_at(seg, pos, retried = false) : Tuple(Int64, UInt32, UInt32)
return {@last_offset, seg, pos} if @size.zero?
mfile = @segments[seg]
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
offset = offset_from_headers(msg.properties.headers)
offset = @offset_index[seg]
mfile.pos = 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced, or only a problem here, but using the internal MFile pos wont be thread safe. Doing a .dup of the MFile is one way to solve it, there are probably more efficent ways.

loop do
BytesMessage.skip(mfile)
break if mfile.pos >= pos
offset += 1
rescue IO::Error
break
end
{offset, seg, pos}
rescue ex : IndexError # first segment can be empty if message size >= segment size
return offset_at(seg + 1, 4_u32, true) unless retried
Expand All @@ -101,24 +102,26 @@ module LavinMQ::AMQP
private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32)
segment = offset_index_lookup(offset)
pos = 4u32
msg_offset = 0i64
msg_offset = @offset_index[segment] || 0i64
loop do
rfile = @segments[segment]?
if rfile.nil? || pos == rfile.size
if segment = @segments.each_key.find { |sid| sid > segment }
rfile = @segments[segment]
pos = 4_u32
rfile.pos = 4u32
msg_offset = @offset_index[segment]
else
return last_offset_seg_pos
end
end
msg = BytesMessage.from_bytes(rfile.to_slice + pos)
msg_offset = offset_from_headers(msg.properties.headers)

case offset
in Int then break if offset <= msg_offset
in Time then break if offset <= Time.unix_ms(msg.timestamp)
end
pos += msg.bytesize
msg_offset += 1
pos += msg.bytesize.to_u32
rescue ex
raise rfile ? Error.new(rfile, cause: ex) : ex
end
Expand Down Expand Up @@ -230,7 +233,7 @@ module LavinMQ::AMQP
def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

if env = shift_requeued(consumer.requeued)
if env = shift_requeued(consumer)
return env
end

Expand All @@ -243,6 +246,7 @@ module LavinMQ::AMQP
begin
msg = BytesMessage.from_bytes(rfile.to_slice + consumer.pos)
sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32)
msg.properties.headers = add_offset_header(msg.properties.headers, consumer.offset)
consumer.pos += sp.bytesize
consumer.offset += 1
return unless consumer.filter_match?(msg.properties.headers)
Expand All @@ -252,11 +256,13 @@ module LavinMQ::AMQP
end
end

private def shift_requeued(requeued) : Envelope?
while sp = requeued.shift?
private def shift_requeued(consumer) : Envelope?
while sp = consumer.requeued.shift?
if segment = @segments[sp.segment]? # segment might have expired since requeued
begin
msg = BytesMessage.from_bytes(segment.to_slice + sp.position)
offset, _, _ = offset_at(sp.segment, sp.position)
msg.properties.headers = add_offset_header(msg.properties.headers, offset)
return Envelope.new(sp, msg, redelivered: true)
rescue ex
raise Error.new(segment, cause: ex)
Expand All @@ -279,18 +285,19 @@ module LavinMQ::AMQP

def push(msg) : SegmentPosition
raise ClosedError.new if @closed
msg.properties.headers = add_offset_header(msg.properties.headers, @last_offset += 1)
@last_offset += 1
sp = write_to_disk(msg)
@bytesize += sp.bytesize
@size += 1
@segment_last_ts[sp.segment] = msg.timestamp
@offset_index[sp.segment] = @last_offset if (@offset_index[sp.segment]? || 0i64).zero?
sp
end

private def open_new_segment(next_msg_size = 0) : MFile
super.tap do
drop_overflow
@offset_index[@segments.last_key] = @last_offset + 1
@offset_index[@segments.last_key] = @last_offset
@timestamp_index[@segments.last_key] = RoughTime.unix_ms
end
end
Expand Down Expand Up @@ -361,14 +368,13 @@ module LavinMQ::AMQP
end
end

private def offset_from_headers(headers) : Int64
headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64)
end

private def produce_metadata(seg, mfile)
super
previous_segment_last_offset = @offset_index[seg - 1]? || 0i64
previous_segment_last_offset += @segment_msg_count[seg - 1]? || 0i64

msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@offset_index[seg] = offset_from_headers(msg.properties.headers)
@offset_index[seg] = previous_segment_last_offset + 1
@timestamp_index[seg] = msg.timestamp
rescue IndexError
@offset_index[seg] = @last_offset
Expand Down
Loading