Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/message_store_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe LavinMQ::MessageStore do
# Create an orphaned acks file
File.write(File.join(dir, "acks.0000000002"), "data")

store = LavinMQ::MessageStore.new(dir, nil)
store = LavinMQ::QueueMessageStore.new(dir, nil)
store.close

File.exists?(File.join(dir, "acks.0000000001")).should be_true
Expand Down
62 changes: 36 additions & 26 deletions spec/msg_segment_meta_files_spec.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./spec_helper"
# require "../src/lavinmq/message_store"

describe "Message segment metadata files" do
describe "Metadata file creation" do
Expand All @@ -19,15 +20,16 @@ describe "Message segment metadata files" do
end

# Should have multiple segments or be able to verify metadata behavior
total_segments = queue.@[email protected]
total_segments = queue.@msg_store.as(LavinMQ::QueueMessageStore).@segments.size

# Check behavior: either we have multiple segments with .meta files,
# or we can verify the metadata creation logic works
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
if total_segments > 1
# Check that .meta files exist for completed segments
completed_segments_with_meta = 0
queue.@msg_store[email protected] do |seg_id, mfile|
next if seg_id == queue.@msg_store[email protected]_key # skip current writing segment
store[email protected] do |seg_id, mfile|
next if seg_id == store[email protected]_key # skip current writing segment
meta_path = "#{mfile.path}.meta"
if File.exists?(meta_path)
completed_segments_with_meta += 1
Expand All @@ -40,7 +42,7 @@ describe "Message segment metadata files" do
else
# If we only have one segment, verify that it doesn't have a .meta file
# since it's the current writing segment
mfile = queue.@msg_store[email protected]_value
mfile = store[email protected]_value
meta_path = "#{mfile.path}.meta"
File.exists?(meta_path).should be_false
end
Expand All @@ -53,26 +55,27 @@ describe "Message segment metadata files" do
vhost = s.vhosts.create("test_vhost")
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("stream_meta_test", args: AMQP::Client::Arguments.new({"x-queue-type" => "stream"}))
queue = vhost.queues["stream_meta_test"].as(LavinMQ::AMQP::StreamQueue)
queue = vhost.queues["stream_meta_test"].as(LavinMQ::AMQP::Stream)

# Publish enough messages to trigger new segment creation
segment_size = LavinMQ::Config.instance.segment_size
message_size = 50
messages_needed = (segment_size / message_size).to_i + 10

initial_segments = queue.@[email protected]
store = queue.@msg_store.as(LavinMQ::AMQP::StreamMessageStore)
initial_segments = [email protected]

messages_needed.times do |i|
q.publish_confirm "message #{i}"
end

# Should have created new segments
final_segments = queue.@msg_store[email protected]
final_segments = store[email protected]
final_segments.should be > initial_segments

# Check meta file exists and contains stream-specific data
queue.@msg_store[email protected] do |seg_id, mfile|
next if seg_id == queue.@msg_store[email protected]_key # skip current writing segment
store[email protected] do |seg_id, mfile|
next if seg_id == store[email protected]_key # skip current writing segment
meta_path = "#{mfile.path}.meta"
File.exists?(meta_path).should be_true

Expand All @@ -99,13 +102,14 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("metadata_load_test")
queue = vhost.queues["metadata_load_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Publish messages but not enough to trigger new segment
50.times { |i| q.publish_confirm "message #{i}" }

# Manually create a new message store to simulate restart behavior
msg_dir = queue.@msg_store.@msg_dir
new_store = LavinMQ::MessageStore.new(msg_dir, nil)
msg_dir = store.@msg_dir
new_store = LavinMQ::QueueMessageStore.new(msg_dir, nil)

# Should have loaded messages correctly
new_store.size.should eq 50
Expand All @@ -128,18 +132,19 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("fallback_test")
queue = vhost.queues["fallback_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Publish messages
25.times { |i| q.publish_confirm "message #{i}" }

# Get message directory and create new store without metadata
msg_dir = queue.@msg_store.@msg_dir
msg_dir = store.@msg_dir

# Remove any .meta files to simulate missing metadata
Dir.glob(File.join(msg_dir, "*.meta")).each { |path| File.delete(path) }

# Create new store - should fall back to message scanning
new_store = LavinMQ::MessageStore.new(msg_dir, nil)
new_store = LavinMQ::QueueMessageStore.new(msg_dir, nil)
new_store.size.should eq 25
new_store.close
end
Expand All @@ -154,6 +159,7 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("delete_test")
queue = vhost.queues["delete_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Fill multiple segments
segment_size = LavinMQ::Config.instance.segment_size
Expand All @@ -163,12 +169,12 @@ describe "Message segment metadata files" do
messages_needed.times { |i| q.publish_confirm "message #{i}" }

# Should have multiple segments now
queue.@msg_store[email protected] be > 1
store[email protected] be > 1

# Get paths before deletion (only for completed segments that have .meta files)
existing_meta_paths = [] of String
queue.@msg_store[email protected] do |seg_id, mfile|
next if seg_id == queue.@msg_store[email protected]_key # skip current writing segment
store[email protected] do |seg_id, mfile|
next if seg_id == store[email protected]_key # skip current writing segment
meta_path = "#{mfile.path}.meta"
if File.exists?(meta_path)
existing_meta_paths << meta_path
Expand All @@ -190,6 +196,7 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("purge_test")
queue = vhost.queues["purge_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Fill segments enough to create multiple segments
segment_size = LavinMQ::Config.instance.segment_size
Expand All @@ -200,8 +207,8 @@ describe "Message segment metadata files" do

# Collect existing meta file paths
existing_meta_paths = [] of String
queue.@msg_store[email protected] do |seg_id, mfile|
next if seg_id == queue.@msg_store[email protected]_key # skip current writing segment
store[email protected] do |seg_id, mfile|
next if seg_id == store[email protected]_key # skip current writing segment
meta_path = "#{mfile.path}.meta"
existing_meta_paths << meta_path if File.exists?(meta_path)
end
Expand All @@ -221,6 +228,7 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("cleanup_test")
queue = vhost.queues["cleanup_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Create enough messages to span multiple segments
segment_size = LavinMQ::Config.instance.segment_size
Expand All @@ -229,14 +237,14 @@ describe "Message segment metadata files" do

messages_needed.times { |i| q.publish_confirm "message #{i}" }

initial_segments = queue.@msg_store[email protected]
initial_segments = store[email protected]
initial_segments.should be > 1

# Verify only completed segments have .meta files
completed_segments = 0
queue.@msg_store[email protected] do |seg_id, mfile|
store[email protected] do |seg_id, mfile|
meta_path = "#{mfile.path}.meta"
if seg_id == queue.@msg_store[email protected]_key
if seg_id == store[email protected]_key
# Current writing segment should not have .meta file
File.exists?(meta_path).should be_false
else
Expand All @@ -263,6 +271,7 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("count_test")
queue = vhost.queues["count_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Create enough messages to fill multiple segments
segment_size = LavinMQ::Config.instance.segment_size
Expand All @@ -272,16 +281,16 @@ describe "Message segment metadata files" do
messages_needed.times { |i| q.publish_confirm "message #{i}" }

# Verify .meta files contain message counts for completed segments
queue.@msg_store[email protected] do |seg_id, mfile|
next if seg_id == queue.@msg_store[email protected]_key # skip current writing segment
store[email protected] do |seg_id, mfile|
next if seg_id == store[email protected]_key # skip current writing segment
meta_path = "#{mfile.path}.meta"

if File.exists?(meta_path)
count = File.open(meta_path, &.read_bytes(UInt32))
count.should be > 0

# The count should match what's stored in the segment
stored_count = queue.@msg_store.@segment_msg_count[seg_id]
stored_count = store.@segment_msg_count[seg_id]
count.should eq stored_count
end
end
Expand All @@ -295,16 +304,17 @@ describe "Message segment metadata files" do
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue("init_test")
queue = vhost.queues["init_test"].as(LavinMQ::AMQP::DurableQueue)
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)

# Publish messages
message_count = 42
message_count.times { |i| q.publish_confirm "message #{i}" }

# Get the message directory
msg_dir = queue.@msg_store.@msg_dir
msg_dir = store.@msg_dir

# Create a new message store instance - should use metadata files if available
new_store = LavinMQ::MessageStore.new(msg_dir, nil)
new_store = LavinMQ::QueueMessageStore.new(msg_dir, nil)

# Should have correct message count
new_store.size.should eq message_count
Expand Down
6 changes: 3 additions & 3 deletions spec/priority_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe LavinMQ::AMQP::PriorityQueue do
it "can replicate migration" do
with_clustering do |cluster|
Dir.mkdir_p cluster.config.data_dir
old_store = LavinMQ::MessageStore.new(cluster.config.data_dir, cluster.replicator, durable: true)
old_store = LavinMQ::QueueMessageStore.new(cluster.config.data_dir, cluster.replicator, durable: true)
60u8.times do |prio|
props = AMQP::Client::Properties.new(priority: prio % 6)
msg = LavinMQ::Message.new("ex", "rk", "body", properties: props)
Expand Down Expand Up @@ -84,7 +84,7 @@ describe LavinMQ::AMQP::PriorityQueue do
data_dir = File.tempname
Dir.mkdir data_dir

old_store = LavinMQ::MessageStore.new(data_dir, nil, durable: true)
old_store = LavinMQ::QueueMessageStore.new(data_dir, nil, durable: true)
60u8.times do |prio|
props = AMQP::Client::Properties.new(priority: prio % 6)
msg = LavinMQ::Message.new("ex", "rk", "body", properties: props)
Expand All @@ -99,7 +99,7 @@ describe LavinMQ::AMQP::PriorityQueue do
store.@stores[i].size.should eq 10
end

old_store = LavinMQ::MessageStore.new(data_dir, nil, durable: true)
old_store = LavinMQ::QueueMessageStore.new(data_dir, nil, durable: true)
old_store.size.should eq 0
old_store.close
ensure
Expand Down
2 changes: 1 addition & 1 deletion spec/queue_factory_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ describe LavinMQ::QueueFactory do
frame = AMQ::Protocol::Frame::Method::Queue::Declare.new(0, 0, "test", false, true, false,
false, false, queue_args)
q = LavinMQ::QueueFactory.make(s.vhosts["/"], frame)
q.should be_a LavinMQ::AMQP::StreamQueue
q.should be_a LavinMQ::AMQP::Stream
end
end
end
Expand Down
24 changes: 12 additions & 12 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ describe LavinMQ::AMQP::Queue do
s.vhosts.create("/")
v = s.vhosts["/"].not_nil!
v.declare_queue("q", true, false)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
s.vhosts["/"].queues["q"].pause!
File.exists?(File.join(data_dir, "paused")).should be_true
s.restart
Expand All @@ -104,7 +104,7 @@ describe LavinMQ::AMQP::Queue do
s.vhosts.create("/")
v = s.vhosts["/"].not_nil!
v.declare_queue("q", true, false)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
File.touch(File.join(data_dir, ".paused"))
s.restart
File.exists?(File.join(data_dir, "paused")).should be_true
Expand Down Expand Up @@ -292,7 +292,7 @@ describe LavinMQ::AMQP::Queue do
with_channel(s) do |ch|
ch.queue "transient", durable: false
end
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
s.stop
Dir.exists?(data_dir).should be_false
end
Expand All @@ -303,7 +303,7 @@ describe LavinMQ::AMQP::Queue do
with_channel(s) do |ch|
ch.queue "transient", durable: false
end
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
Dir.exists?(data_dir).should be_true
File.exists?("#{data_dir}/msgs.0000000001").should be_false
end
Expand All @@ -315,7 +315,7 @@ describe LavinMQ::AMQP::Queue do
with_channel(s) do |ch|
q = ch.queue "transient", durable: false
q.publish_confirm "foobar"
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
FileUtils.cp_r data_dir, "#{s.vhosts["/"].data_dir}.copy"
end
s.stop
Expand All @@ -335,7 +335,7 @@ describe LavinMQ::AMQP::Queue do
with_amqp_server do |s|
with_channel(s) do |ch|
q = ch.queue("q", auto_delete: true)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
sub = q.subscribe(no_ack: true) { |_| }
Dir.exists?(data_dir).should be_true
q.unsubscribe(sub)
Expand Down Expand Up @@ -365,7 +365,7 @@ describe LavinMQ::AMQP::Queue do
data_dir = File.join(LavinMQ::Config.instance.data_dir, "msgstore")
Dir.mkdir_p data_dir
begin
store = LavinMQ::MessageStore.new(data_dir, nil)
store = LavinMQ::QueueMessageStore.new(data_dir, nil)
body = IO::Memory.new(Random::DEFAULT.random_bytes(LavinMQ::Config.instance.segment_size), writeable: false)
msg = LavinMQ::Message.new(0i64, "amq.topic", "rk", AMQ::Protocol::Properties.new, body.size.to_u64, body)
sps = Array(LavinMQ::SegmentPosition).new(10) { store.push msg }
Expand All @@ -385,12 +385,12 @@ describe LavinMQ::AMQP::Queue do
body = IO::Memory.new(Random::DEFAULT.random_bytes(LavinMQ::Config.instance.segment_size), writeable: false)
msg = LavinMQ::Message.new(0i64, "amq.topic", "rk", AMQ::Protocol::Properties.new, body.size.to_u64, body)

store = LavinMQ::MessageStore.new(data_dir, nil)
store = LavinMQ::QueueMessageStore.new(data_dir, nil)
2.times { store.push msg }
store.close

# recreate store to let it read the segments and cleanup
LavinMQ::MessageStore.new(data_dir, nil)
LavinMQ::QueueMessageStore.new(data_dir, nil)
Dir.glob(File.join(data_dir, "acks.*")).should eq [] of String
ensure
FileUtils.rm_rf data_dir
Expand All @@ -400,9 +400,9 @@ describe LavinMQ::AMQP::Queue do
it "should yield fiber while purging" do
tmpdir = File.tempname "lavin", ".spec"
Dir.mkdir_p tmpdir
store = LavinMQ::MessageStore.new(tmpdir, nil)
store = LavinMQ::QueueMessageStore.new(tmpdir, nil)

(LavinMQ::MessageStore::PURGE_YIELD_INTERVAL * 2 + 1).times do
(LavinMQ::QueueMessageStore::PURGE_YIELD_INTERVAL * 2 + 1).times do
store.push(LavinMQ::Message.new(0i64, "a", "b", AMQ::Protocol::Properties.new, 0u64, IO::Memory.new(0)))
end

Expand Down Expand Up @@ -434,7 +434,7 @@ describe LavinMQ::AMQP::Queue do
it "should not raise NotFoundError if segment is gone when deleting" do
tmpdir = File.tempname "lavin", ".spec"
Dir.mkdir_p tmpdir
store = LavinMQ::MessageStore.new(tmpdir, nil)
store = LavinMQ::QueueMessageStore.new(tmpdir, nil)
data = Random::Secure.hex(512)
io = IO::Memory.new(data.to_slice)

Expand Down
6 changes: 3 additions & 3 deletions spec/schema_version_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe LavinMQ::SchemaVersion do
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::MessageStore.new(data_dir, nil)
msg_store = LavinMQ::QueueMessageStore.new(data_dir, nil)
[email protected]_value.size.should eq 4
end
end
Expand All @@ -40,13 +40,13 @@ describe LavinMQ::SchemaVersion do
with_amqp_server do |s|
v = s.vhosts["/"]
v.declare_queue("q", true, false)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
path = File.join(data_dir, "msgs.0000000002")
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::MessageStore.new(data_dir, nil)
msg_store = LavinMQ::QueueMessageStore.new(data_dir, nil)
[email protected] eq 1
end
end
Expand Down
Loading