Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/msg_segment_meta_files_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe "Message segment metadata files" do

# Publish enough messages to trigger new segment creation
segment_size = LavinMQ::Config.instance.segment_size
message_size = 50
message_size = 42
messages_needed = (segment_size / message_size).to_i + 10

messages_needed.times do |i|
Expand Down
84 changes: 43 additions & 41 deletions src/lavinmq/amqp/stream/stream_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ module LavinMQ::AMQP
property max_age : Time::Span | Time::MonthSpan | Nil
getter last_offset : Int64
@segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age
@offset_index = Hash(UInt32, Int64).new # segment_id => offset of first msg
@timestamp_index = Hash(UInt32, Int64).new # segment_id => ts of first msg
@segment_first_offset = Hash(UInt32, Int64).new # segment_id => offset of first msg
@segment_first_ts = Hash(UInt32, Int64).new # segment_id => ts of first msg
@consumer_offsets : MFile
@consumer_offset_positions = Hash(String, Int64).new # used for consumer offsets

Expand All @@ -35,15 +35,9 @@ module LavinMQ::AMQP

private def get_last_offset : Int64
return 0i64 if @size.zero?
bytesize = 0_u32
mfile = @segments.last_value
loop do
bytesize = BytesMessage.skip(mfile)
rescue IO::EOFError
break
end
msg = BytesMessage.from_bytes(mfile.to_slice + (mfile.pos - bytesize))
offset_from_headers(msg.properties.headers)
offset = @segment_first_offset.last_value
offset += @segment_msg_count.last_value
offset
end

# Used once when a consumer is started
Expand Down Expand Up @@ -86,8 +80,12 @@ module LavinMQ::AMQP
private def offset_at(seg, pos, retried = false) : Tuple(Int64, UInt32, UInt32)
return {@last_offset, seg, pos} if @size.zero?
mfile = @segments[seg]
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
offset = offset_from_headers(msg.properties.headers)
offset = @segment_first_offset[seg]
mfile.pos = 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not introduced, or only a problem here, but using the internal MFile pos wont be thread safe. Doing a .dup of the MFile is one way to solve it, there are probably more efficent ways.

while mfile.pos < pos
BytesMessage.skip(mfile)
offset += 1
end
{offset, seg, pos}
rescue ex : IndexError # first segment can be empty if message size >= segment size
return offset_at(seg + 1, 4_u32, true) unless retried
Expand All @@ -101,24 +99,25 @@ module LavinMQ::AMQP
private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32)
segment = offset_index_lookup(offset)
pos = 4u32
msg_offset = 0i64
msg_offset = @segment_first_offset[segment] || 0i64
loop do
rfile = @segments[segment]?
if rfile.nil? || pos == rfile.size
if segment = @segments.each_key.find { |sid| sid > segment }
rfile = @segments[segment]
pos = 4_u32
msg_offset = @segment_first_offset[segment]
else
return last_offset_seg_pos
end
end
msg = BytesMessage.from_bytes(rfile.to_slice + pos)
msg_offset = offset_from_headers(msg.properties.headers)

case offset
in Int then break if offset <= msg_offset
in Time then break if offset <= Time.unix_ms(msg.timestamp)
end
pos += msg.bytesize
msg_offset += 1
pos += msg.bytesize.to_u32
rescue ex
raise rfile ? Error.new(rfile, cause: ex) : ex
end
Expand All @@ -129,12 +128,12 @@ module LavinMQ::AMQP
seg = @segments.first_key
case offset
when Int
@offset_index.each do |seg_id, first_seg_offset|
@segment_first_offset.each do |seg_id, first_seg_offset|
break if first_seg_offset > offset
seg = seg_id
end
when Time
@timestamp_index.each do |seg_id, first_seg_ts|
@segment_first_ts.each do |seg_id, first_seg_ts|
break if Time.unix_ms(first_seg_ts) > offset
seg = seg_id
end
Expand Down Expand Up @@ -230,7 +229,7 @@ module LavinMQ::AMQP
def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

if env = shift_requeued(consumer.requeued)
if env = shift_requeued(consumer)
return env
end

Expand All @@ -243,6 +242,7 @@ module LavinMQ::AMQP
begin
msg = BytesMessage.from_bytes(rfile.to_slice + consumer.pos)
sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32)
msg.properties.headers = add_offset_header(msg.properties.headers, consumer.offset)
consumer.pos += sp.bytesize
consumer.offset += 1
return unless consumer.filter_match?(msg.properties.headers)
Expand All @@ -252,11 +252,13 @@ module LavinMQ::AMQP
end
end

private def shift_requeued(requeued) : Envelope?
while sp = requeued.shift?
private def shift_requeued(consumer) : Envelope?
while sp = consumer.requeued.shift?
if segment = @segments[sp.segment]? # segment might have expired since requeued
begin
msg = BytesMessage.from_bytes(segment.to_slice + sp.position)
offset, _, _ = offset_at(sp.segment, sp.position)
msg.properties.headers = add_offset_header(msg.properties.headers, offset)
return Envelope.new(sp, msg, redelivered: true)
rescue ex
raise Error.new(segment, cause: ex)
Expand All @@ -279,7 +281,7 @@ module LavinMQ::AMQP

def push(msg) : SegmentPosition
raise ClosedError.new if @closed
msg.properties.headers = add_offset_header(msg.properties.headers, @last_offset += 1)
@last_offset += 1
sp = write_to_disk(msg)
@bytesize += sp.bytesize
@size += 1
Expand All @@ -290,15 +292,15 @@ module LavinMQ::AMQP
private def open_new_segment(next_msg_size = 0) : MFile
super.tap do
drop_overflow
@offset_index[@segments.last_key] = @last_offset + 1
@timestamp_index[@segments.last_key] = RoughTime.unix_ms
@segment_first_offset[@segments.last_key] = @last_offset unless @last_offset.zero?
@segment_first_ts[@segments.last_key] = RoughTime.unix_ms
end
end

private def write_metadata(io, seg)
super
io.write_bytes @offset_index[seg]
io.write_bytes @timestamp_index[seg]
io.write_bytes @segment_first_offset[seg]
io.write_bytes @segment_first_ts[seg]
end

def drop_overflow
Expand Down Expand Up @@ -330,8 +332,8 @@ module LavinMQ::AMQP
msg_count = @segment_msg_count.delete(seg_id)
@size -= msg_count if msg_count
@segment_last_ts.delete(seg_id)
@offset_index.delete(seg_id)
@timestamp_index.delete(seg_id)
@segment_first_offset.delete(seg_id)
@segment_first_ts.delete(seg_id)
@bytesize -= mfile.size - 4
delete_file(mfile, including_meta: true)
true
Expand Down Expand Up @@ -361,23 +363,23 @@ module LavinMQ::AMQP
end
end

private def offset_from_headers(headers) : Int64
headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64)
end

private def produce_metadata(seg, mfile)
super
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@offset_index[seg] = offset_from_headers(msg.properties.headers)
@timestamp_index[seg] = msg.timestamp
rescue IndexError
@offset_index[seg] = @last_offset
@timestamp_index[seg] = RoughTime.unix_ms
if @empty
@segment_first_offset[seg] = @last_offset + 1
@segment_first_ts[seg] = RoughTime.unix_ms
else
previous_segment_last_offset = @segment_first_offset[seg - 1]? || 0i64
previous_segment_last_offset += @segment_msg_count[seg - 1]? || 0i64
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@segment_first_offset[seg] = previous_segment_last_offset + 1
@segment_first_ts[seg] = msg.timestamp
end
end

private def read_extra_metadata_fields(file : File, seg : UInt32)
@offset_index[seg] = file.read_bytes(Int64)
@timestamp_index[seg] = file.read_bytes(Int64)
@segment_first_offset[seg] = file.read_bytes(Int64)
@segment_first_ts[seg] = file.read_bytes(Int64)
end

class OffsetError < Exception
Expand Down
Loading