diff --git a/spec/msg_segment_meta_files_spec.cr b/spec/msg_segment_meta_files_spec.cr index 8fe70db807..e27df5d398 100644 --- a/spec/msg_segment_meta_files_spec.cr +++ b/spec/msg_segment_meta_files_spec.cr @@ -47,7 +47,7 @@ describe "Message segment metadata files" do # Publish enough messages to trigger new segment creation segment_size = LavinMQ::Config.instance.segment_size - message_size = 50 + message_size = 42 messages_needed = (segment_size / message_size).to_i + 10 messages_needed.times do |i| diff --git a/src/lavinmq/amqp/stream/stream_message_store.cr b/src/lavinmq/amqp/stream/stream_message_store.cr index 45e688f8ac..eb3b3a1e78 100644 --- a/src/lavinmq/amqp/stream/stream_message_store.cr +++ b/src/lavinmq/amqp/stream/stream_message_store.cr @@ -9,8 +9,8 @@ module LavinMQ::AMQP property max_age : Time::Span | Time::MonthSpan | Nil getter last_offset : Int64 @segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age - @offset_index = Hash(UInt32, Int64).new # segment_id => offset of first msg - @timestamp_index = Hash(UInt32, Int64).new # segment_id => ts of first msg + @segment_first_offset = Hash(UInt32, Int64).new # segment_id => offset of first msg + @segment_first_ts = Hash(UInt32, Int64).new # segment_id => ts of first msg @consumer_offsets : MFile @consumer_offset_positions = Hash(String, Int64).new # used for consumer offsets @@ -35,15 +35,9 @@ module LavinMQ::AMQP private def get_last_offset : Int64 return 0i64 if @size.zero? - bytesize = 0_u32 - mfile = @segments.last_value - loop do - bytesize = BytesMessage.skip(mfile) - rescue IO::EOFError - break - end - msg = BytesMessage.from_bytes(mfile.to_slice + (mfile.pos - bytesize)) - offset_from_headers(msg.properties.headers) + offset = @segment_first_offset.last_value + offset += @segment_msg_count.last_value + offset end # Used once when a consumer is started @@ -86,8 +80,12 @@ module LavinMQ::AMQP private def offset_at(seg, pos, retried = false) : Tuple(Int64, UInt32, UInt32) return {@last_offset, seg, pos} if @size.zero? mfile = @segments[seg] - msg = BytesMessage.from_bytes(mfile.to_slice + pos) - offset = offset_from_headers(msg.properties.headers) + offset = @segment_first_offset[seg] + mfile.pos = 4 + while mfile.pos < pos + BytesMessage.skip(mfile) + offset += 1 + end {offset, seg, pos} rescue ex : IndexError # first segment can be empty if message size >= segment size return offset_at(seg + 1, 4_u32, true) unless retried @@ -101,24 +99,25 @@ module LavinMQ::AMQP private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32) segment = offset_index_lookup(offset) pos = 4u32 - msg_offset = 0i64 + msg_offset = @segment_first_offset[segment] || 0i64 loop do rfile = @segments[segment]? if rfile.nil? || pos == rfile.size if segment = @segments.each_key.find { |sid| sid > segment } rfile = @segments[segment] - pos = 4_u32 + msg_offset = @segment_first_offset[segment] else return last_offset_seg_pos end end msg = BytesMessage.from_bytes(rfile.to_slice + pos) - msg_offset = offset_from_headers(msg.properties.headers) + case offset in Int then break if offset <= msg_offset in Time then break if offset <= Time.unix_ms(msg.timestamp) end - pos += msg.bytesize + msg_offset += 1 + pos += msg.bytesize.to_u32 rescue ex raise rfile ? Error.new(rfile, cause: ex) : ex end @@ -129,12 +128,12 @@ module LavinMQ::AMQP seg = @segments.first_key case offset when Int - @offset_index.each do |seg_id, first_seg_offset| + @segment_first_offset.each do |seg_id, first_seg_offset| break if first_seg_offset > offset seg = seg_id end when Time - @timestamp_index.each do |seg_id, first_seg_ts| + @segment_first_ts.each do |seg_id, first_seg_ts| break if Time.unix_ms(first_seg_ts) > offset seg = seg_id end @@ -230,7 +229,7 @@ module LavinMQ::AMQP def shift?(consumer : AMQP::StreamConsumer) : Envelope? raise ClosedError.new if @closed - if env = shift_requeued(consumer.requeued) + if env = shift_requeued(consumer) return env end @@ -243,6 +242,7 @@ module LavinMQ::AMQP begin msg = BytesMessage.from_bytes(rfile.to_slice + consumer.pos) sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32) + msg.properties.headers = add_offset_header(msg.properties.headers, consumer.offset) consumer.pos += sp.bytesize consumer.offset += 1 return unless consumer.filter_match?(msg.properties.headers) @@ -252,11 +252,13 @@ module LavinMQ::AMQP end end - private def shift_requeued(requeued) : Envelope? - while sp = requeued.shift? + private def shift_requeued(consumer) : Envelope? + while sp = consumer.requeued.shift? if segment = @segments[sp.segment]? # segment might have expired since requeued begin msg = BytesMessage.from_bytes(segment.to_slice + sp.position) + offset, _, _ = offset_at(sp.segment, sp.position) + msg.properties.headers = add_offset_header(msg.properties.headers, offset) return Envelope.new(sp, msg, redelivered: true) rescue ex raise Error.new(segment, cause: ex) @@ -279,7 +281,7 @@ module LavinMQ::AMQP def push(msg) : SegmentPosition raise ClosedError.new if @closed - msg.properties.headers = add_offset_header(msg.properties.headers, @last_offset += 1) + @last_offset += 1 sp = write_to_disk(msg) @bytesize += sp.bytesize @size += 1 @@ -290,15 +292,15 @@ module LavinMQ::AMQP private def open_new_segment(next_msg_size = 0) : MFile super.tap do drop_overflow - @offset_index[@segments.last_key] = @last_offset + 1 - @timestamp_index[@segments.last_key] = RoughTime.unix_ms + @segment_first_offset[@segments.last_key] = @last_offset unless @last_offset.zero? + @segment_first_ts[@segments.last_key] = RoughTime.unix_ms end end private def write_metadata(io, seg) super - io.write_bytes @offset_index[seg] - io.write_bytes @timestamp_index[seg] + io.write_bytes @segment_first_offset[seg] + io.write_bytes @segment_first_ts[seg] end def drop_overflow @@ -330,8 +332,8 @@ module LavinMQ::AMQP msg_count = @segment_msg_count.delete(seg_id) @size -= msg_count if msg_count @segment_last_ts.delete(seg_id) - @offset_index.delete(seg_id) - @timestamp_index.delete(seg_id) + @segment_first_offset.delete(seg_id) + @segment_first_ts.delete(seg_id) @bytesize -= mfile.size - 4 delete_file(mfile, including_meta: true) true @@ -361,23 +363,23 @@ module LavinMQ::AMQP end end - private def offset_from_headers(headers) : Int64 - headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64) - end - private def produce_metadata(seg, mfile) super - msg = BytesMessage.from_bytes(mfile.to_slice + 4u32) - @offset_index[seg] = offset_from_headers(msg.properties.headers) - @timestamp_index[seg] = msg.timestamp - rescue IndexError - @offset_index[seg] = @last_offset - @timestamp_index[seg] = RoughTime.unix_ms + if @empty + @segment_first_offset[seg] = @last_offset + 1 + @segment_first_ts[seg] = RoughTime.unix_ms + else + previous_segment_last_offset = @segment_first_offset[seg - 1]? || 0i64 + previous_segment_last_offset += @segment_msg_count[seg - 1]? || 0i64 + msg = BytesMessage.from_bytes(mfile.to_slice + 4u32) + @segment_first_offset[seg] = previous_segment_last_offset + 1 + @segment_first_ts[seg] = msg.timestamp + end end private def read_extra_metadata_fields(file : File, seg : UInt32) - @offset_index[seg] = file.read_bytes(Int64) - @timestamp_index[seg] = file.read_bytes(Int64) + @segment_first_offset[seg] = file.read_bytes(Int64) + @segment_first_ts[seg] = file.read_bytes(Int64) end class OffsetError < Exception