Skip to content

Commit 11b95e1

Browse files
committed
Separate stores
1 parent a4a450f commit 11b95e1

23 files changed

+1225
-758
lines changed

spec/clustering_spec.cr

Lines changed: 249 additions & 249 deletions
Large diffs are not rendered by default.

spec/message_store_spec.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ describe LavinMQ::MessageStore do
2323
# Create an orphaned acks file
2424
File.write(File.join(dir, "acks.0000000002"), "data")
2525

26-
store = LavinMQ::MessageStore.new(dir, nil)
26+
store = LavinMQ::QueueMessageStore.new(dir, nil)
2727
store.close
2828

2929
File.exists?(File.join(dir, "acks.0000000001")).should be_true

spec/msg_segment_meta_files_spec.cr

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require "./spec_helper"
2+
# require "../src/lavinmq/message_store"
23

34
describe "Message segment metadata files" do
45
describe "Metadata file creation" do
@@ -19,15 +20,16 @@ describe "Message segment metadata files" do
1920
end
2021

2122
# Should have multiple segments or be able to verify metadata behavior
22-
total_segments = queue.@msg_store.@segments.size
23+
total_segments = queue.@msg_store.as(LavinMQ::QueueMessageStore).@segments.size
2324

2425
# Check behavior: either we have multiple segments with .meta files,
2526
# or we can verify the metadata creation logic works
27+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
2628
if total_segments > 1
2729
# Check that .meta files exist for completed segments
2830
completed_segments_with_meta = 0
29-
queue.@msg_store.@segments.each do |seg_id, mfile|
30-
next if seg_id == queue.@msg_store.@segments.last_key # skip current writing segment
31+
store.@segments.each do |seg_id, mfile|
32+
next if seg_id == store.@segments.last_key # skip current writing segment
3133
meta_path = "#{mfile.path}.meta"
3234
if File.exists?(meta_path)
3335
completed_segments_with_meta += 1
@@ -40,7 +42,7 @@ describe "Message segment metadata files" do
4042
else
4143
# If we only have one segment, verify that it doesn't have a .meta file
4244
# since it's the current writing segment
43-
mfile = queue.@msg_store.@segments.first_value
45+
mfile = store.@segments.first_value
4446
meta_path = "#{mfile.path}.meta"
4547
File.exists?(meta_path).should be_false
4648
end
@@ -60,19 +62,20 @@ describe "Message segment metadata files" do
6062
message_size = 50
6163
messages_needed = (segment_size / message_size).to_i + 10
6264

63-
initial_segments = queue.@msg_store.@segments.size
65+
store = queue.@msg_store.as(LavinMQ::AMQP::StreamMessageStore)
66+
initial_segments = store.@segments.size
6467

6568
messages_needed.times do |i|
6669
q.publish_confirm "message #{i}"
6770
end
6871

6972
# Should have created new segments
70-
final_segments = queue.@msg_store.@segments.size
73+
final_segments = store.@segments.size
7174
final_segments.should be > initial_segments
7275

7376
# Check meta file exists and contains stream-specific data
74-
queue.@msg_store.@segments.each do |seg_id, mfile|
75-
next if seg_id == queue.@msg_store.@segments.last_key # skip current writing segment
77+
store.@segments.each do |seg_id, mfile|
78+
next if seg_id == store.@segments.last_key # skip current writing segment
7679
meta_path = "#{mfile.path}.meta"
7780
File.exists?(meta_path).should be_true
7881

@@ -99,13 +102,14 @@ describe "Message segment metadata files" do
99102
with_channel(s, vhost: vhost.name) do |ch|
100103
q = ch.queue("metadata_load_test")
101104
queue = vhost.queues["metadata_load_test"].as(LavinMQ::AMQP::DurableQueue)
105+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
102106

103107
# Publish messages but not enough to trigger new segment
104108
50.times { |i| q.publish_confirm "message #{i}" }
105109

106110
# Manually create a new message store to simulate restart behavior
107-
msg_dir = queue.@msg_store.@msg_dir
108-
new_store = LavinMQ::MessageStore.new(msg_dir, nil)
111+
msg_dir = store.@msg_dir
112+
new_store = LavinMQ::QueueMessageStore.new(msg_dir, nil)
109113

110114
# Should have loaded messages correctly
111115
new_store.size.should eq 50
@@ -128,18 +132,19 @@ describe "Message segment metadata files" do
128132
with_channel(s, vhost: vhost.name) do |ch|
129133
q = ch.queue("fallback_test")
130134
queue = vhost.queues["fallback_test"].as(LavinMQ::AMQP::DurableQueue)
135+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
131136

132137
# Publish messages
133138
25.times { |i| q.publish_confirm "message #{i}" }
134139

135140
# Get message directory and create new store without metadata
136-
msg_dir = queue.@msg_store.@msg_dir
141+
msg_dir = store.@msg_dir
137142

138143
# Remove any .meta files to simulate missing metadata
139144
Dir.glob(File.join(msg_dir, "*.meta")).each { |path| File.delete(path) }
140145

141146
# Create new store - should fall back to message scanning
142-
new_store = LavinMQ::MessageStore.new(msg_dir, nil)
147+
new_store = LavinMQ::QueueMessageStore.new(msg_dir, nil)
143148
new_store.size.should eq 25
144149
new_store.close
145150
end
@@ -154,6 +159,7 @@ describe "Message segment metadata files" do
154159
with_channel(s, vhost: vhost.name) do |ch|
155160
q = ch.queue("delete_test")
156161
queue = vhost.queues["delete_test"].as(LavinMQ::AMQP::DurableQueue)
162+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
157163

158164
# Fill multiple segments
159165
segment_size = LavinMQ::Config.instance.segment_size
@@ -163,12 +169,12 @@ describe "Message segment metadata files" do
163169
messages_needed.times { |i| q.publish_confirm "message #{i}" }
164170

165171
# Should have multiple segments now
166-
queue.@msg_store.@segments.size.should be > 1
172+
store.@segments.size.should be > 1
167173

168174
# Get paths before deletion (only for completed segments that have .meta files)
169175
existing_meta_paths = [] of String
170-
queue.@msg_store.@segments.each do |seg_id, mfile|
171-
next if seg_id == queue.@msg_store.@segments.last_key # skip current writing segment
176+
store.@segments.each do |seg_id, mfile|
177+
next if seg_id == store.@segments.last_key # skip current writing segment
172178
meta_path = "#{mfile.path}.meta"
173179
if File.exists?(meta_path)
174180
existing_meta_paths << meta_path
@@ -190,6 +196,7 @@ describe "Message segment metadata files" do
190196
with_channel(s, vhost: vhost.name) do |ch|
191197
q = ch.queue("purge_test")
192198
queue = vhost.queues["purge_test"].as(LavinMQ::AMQP::DurableQueue)
199+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
193200

194201
# Fill segments enough to create multiple segments
195202
segment_size = LavinMQ::Config.instance.segment_size
@@ -200,8 +207,8 @@ describe "Message segment metadata files" do
200207

201208
# Collect existing meta file paths
202209
existing_meta_paths = [] of String
203-
queue.@msg_store.@segments.each do |seg_id, mfile|
204-
next if seg_id == queue.@msg_store.@segments.last_key # skip current writing segment
210+
store.@segments.each do |seg_id, mfile|
211+
next if seg_id == store.@segments.last_key # skip current writing segment
205212
meta_path = "#{mfile.path}.meta"
206213
existing_meta_paths << meta_path if File.exists?(meta_path)
207214
end
@@ -221,6 +228,7 @@ describe "Message segment metadata files" do
221228
with_channel(s, vhost: vhost.name) do |ch|
222229
q = ch.queue("cleanup_test")
223230
queue = vhost.queues["cleanup_test"].as(LavinMQ::AMQP::DurableQueue)
231+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
224232

225233
# Create enough messages to span multiple segments
226234
segment_size = LavinMQ::Config.instance.segment_size
@@ -229,14 +237,14 @@ describe "Message segment metadata files" do
229237

230238
messages_needed.times { |i| q.publish_confirm "message #{i}" }
231239

232-
initial_segments = queue.@msg_store.@segments.size
240+
initial_segments = store.@segments.size
233241
initial_segments.should be > 1
234242

235243
# Verify only completed segments have .meta files
236244
completed_segments = 0
237-
queue.@msg_store.@segments.each do |seg_id, mfile|
245+
store.@segments.each do |seg_id, mfile|
238246
meta_path = "#{mfile.path}.meta"
239-
if seg_id == queue.@msg_store.@segments.last_key
247+
if seg_id == store.@segments.last_key
240248
# Current writing segment should not have .meta file
241249
File.exists?(meta_path).should be_false
242250
else
@@ -263,6 +271,7 @@ describe "Message segment metadata files" do
263271
with_channel(s, vhost: vhost.name) do |ch|
264272
q = ch.queue("count_test")
265273
queue = vhost.queues["count_test"].as(LavinMQ::AMQP::DurableQueue)
274+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
266275

267276
# Create enough messages to fill multiple segments
268277
segment_size = LavinMQ::Config.instance.segment_size
@@ -272,16 +281,16 @@ describe "Message segment metadata files" do
272281
messages_needed.times { |i| q.publish_confirm "message #{i}" }
273282

274283
# Verify .meta files contain message counts for completed segments
275-
queue.@msg_store.@segments.each do |seg_id, mfile|
276-
next if seg_id == queue.@msg_store.@segments.last_key # skip current writing segment
284+
store.@segments.each do |seg_id, mfile|
285+
next if seg_id == store.@segments.last_key # skip current writing segment
277286
meta_path = "#{mfile.path}.meta"
278287

279288
if File.exists?(meta_path)
280289
count = File.open(meta_path, &.read_bytes(UInt32))
281290
count.should be > 0
282291

283292
# The count should match what's stored in the segment
284-
stored_count = queue.@msg_store.@segment_msg_count[seg_id]
293+
stored_count = store.@segment_msg_count[seg_id]
285294
count.should eq stored_count
286295
end
287296
end
@@ -295,16 +304,17 @@ describe "Message segment metadata files" do
295304
with_channel(s, vhost: vhost.name) do |ch|
296305
q = ch.queue("init_test")
297306
queue = vhost.queues["init_test"].as(LavinMQ::AMQP::DurableQueue)
307+
store = queue.@msg_store.as(LavinMQ::QueueMessageStore)
298308

299309
# Publish messages
300310
message_count = 42
301311
message_count.times { |i| q.publish_confirm "message #{i}" }
302312

303313
# Get the message directory
304-
msg_dir = queue.@msg_store.@msg_dir
314+
msg_dir = store.@msg_dir
305315

306316
# Create a new message store instance - should use metadata files if available
307-
new_store = LavinMQ::MessageStore.new(msg_dir, nil)
317+
new_store = LavinMQ::QueueMessageStore.new(msg_dir, nil)
308318

309319
# Should have correct message count
310320
new_store.size.should eq message_count

spec/priority_queue_spec.cr

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ describe LavinMQ::AMQP::PriorityQueue do
4949
it "can replicate migration" do
5050
with_clustering do |cluster|
5151
Dir.mkdir_p cluster.config.data_dir
52-
old_store = LavinMQ::MessageStore.new(cluster.config.data_dir, cluster.replicator, durable: true)
52+
old_store = LavinMQ::QueueMessageStore.new(cluster.config.data_dir, cluster.replicator, durable: true)
5353
60u8.times do |prio|
5454
props = AMQP::Client::Properties.new(priority: prio % 6)
5555
msg = LavinMQ::Message.new("ex", "rk", "body", properties: props)
@@ -84,7 +84,7 @@ describe LavinMQ::AMQP::PriorityQueue do
8484
data_dir = File.tempname
8585
Dir.mkdir data_dir
8686

87-
old_store = LavinMQ::MessageStore.new(data_dir, nil, durable: true)
87+
old_store = LavinMQ::QueueMessageStore.new(data_dir, nil, durable: true)
8888
60u8.times do |prio|
8989
props = AMQP::Client::Properties.new(priority: prio % 6)
9090
msg = LavinMQ::Message.new("ex", "rk", "body", properties: props)
@@ -99,7 +99,7 @@ describe LavinMQ::AMQP::PriorityQueue do
9999
store.@stores[i].size.should eq 10
100100
end
101101

102-
old_store = LavinMQ::MessageStore.new(data_dir, nil, durable: true)
102+
old_store = LavinMQ::QueueMessageStore.new(data_dir, nil, durable: true)
103103
old_store.size.should eq 0
104104
old_store.close
105105
ensure

spec/queue_spec.cr

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ describe LavinMQ::AMQP::Queue do
8989
s.vhosts.create("/")
9090
v = s.vhosts["/"].not_nil!
9191
v.declare_queue("q", true, false)
92-
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
92+
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
9393
s.vhosts["/"].queues["q"].pause!
9494
File.exists?(File.join(data_dir, "paused")).should be_true
9595
s.restart
@@ -292,7 +292,7 @@ describe LavinMQ::AMQP::Queue do
292292
with_channel(s) do |ch|
293293
ch.queue "transient", durable: false
294294
end
295-
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
295+
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
296296
s.stop
297297
Dir.exists?(data_dir).should be_false
298298
end
@@ -303,7 +303,7 @@ describe LavinMQ::AMQP::Queue do
303303
with_channel(s) do |ch|
304304
ch.queue "transient", durable: false
305305
end
306-
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
306+
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
307307
Dir.exists?(data_dir).should be_true
308308
File.exists?("#{data_dir}/msgs.0000000001").should be_false
309309
end
@@ -315,7 +315,7 @@ describe LavinMQ::AMQP::Queue do
315315
with_channel(s) do |ch|
316316
q = ch.queue "transient", durable: false
317317
q.publish_confirm "foobar"
318-
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
318+
data_dir = s.vhosts["/"].queues["transient"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
319319
FileUtils.cp_r data_dir, "#{s.vhosts["/"].data_dir}.copy"
320320
end
321321
s.stop
@@ -335,7 +335,7 @@ describe LavinMQ::AMQP::Queue do
335335
with_amqp_server do |s|
336336
with_channel(s) do |ch|
337337
q = ch.queue("q", auto_delete: true)
338-
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
338+
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
339339
sub = q.subscribe(no_ack: true) { |_| }
340340
Dir.exists?(data_dir).should be_true
341341
q.unsubscribe(sub)
@@ -365,7 +365,7 @@ describe LavinMQ::AMQP::Queue do
365365
data_dir = File.join(LavinMQ::Config.instance.data_dir, "msgstore")
366366
Dir.mkdir_p data_dir
367367
begin
368-
store = LavinMQ::MessageStore.new(data_dir, nil)
368+
store = LavinMQ::QueueMessageStore.new(data_dir, nil)
369369
body = IO::Memory.new(Random::DEFAULT.random_bytes(LavinMQ::Config.instance.segment_size), writeable: false)
370370
msg = LavinMQ::Message.new(0i64, "amq.topic", "rk", AMQ::Protocol::Properties.new, body.size.to_u64, body)
371371
sps = Array(LavinMQ::SegmentPosition).new(10) { store.push msg }
@@ -385,12 +385,12 @@ describe LavinMQ::AMQP::Queue do
385385
body = IO::Memory.new(Random::DEFAULT.random_bytes(LavinMQ::Config.instance.segment_size), writeable: false)
386386
msg = LavinMQ::Message.new(0i64, "amq.topic", "rk", AMQ::Protocol::Properties.new, body.size.to_u64, body)
387387

388-
store = LavinMQ::MessageStore.new(data_dir, nil)
388+
store = LavinMQ::QueueMessageStore.new(data_dir, nil)
389389
2.times { store.push msg }
390390
store.close
391391

392392
# recreate store to let it read the segments and cleanup
393-
LavinMQ::MessageStore.new(data_dir, nil)
393+
LavinMQ::QueueMessageStore.new(data_dir, nil)
394394
Dir.glob(File.join(data_dir, "acks.*")).should eq [] of String
395395
ensure
396396
FileUtils.rm_rf data_dir
@@ -400,9 +400,9 @@ describe LavinMQ::AMQP::Queue do
400400
it "should yield fiber while purging" do
401401
tmpdir = File.tempname "lavin", ".spec"
402402
Dir.mkdir_p tmpdir
403-
store = LavinMQ::MessageStore.new(tmpdir, nil)
403+
store = LavinMQ::QueueMessageStore.new(tmpdir, nil)
404404

405-
(LavinMQ::MessageStore::PURGE_YIELD_INTERVAL * 2 + 1).times do
405+
(LavinMQ::QueueMessageStore::PURGE_YIELD_INTERVAL * 2 + 1).times do
406406
store.push(LavinMQ::Message.new(0i64, "a", "b", AMQ::Protocol::Properties.new, 0u64, IO::Memory.new(0)))
407407
end
408408

@@ -434,7 +434,7 @@ describe LavinMQ::AMQP::Queue do
434434
it "should not raise NotFoundError if segment is gone when deleting" do
435435
tmpdir = File.tempname "lavin", ".spec"
436436
Dir.mkdir_p tmpdir
437-
store = LavinMQ::MessageStore.new(tmpdir, nil)
437+
store = LavinMQ::QueueMessageStore.new(tmpdir, nil)
438438
data = Random::Secure.hex(512)
439439
io = IO::Memory.new(data.to_slice)
440440

spec/schema_version_spec.cr

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ describe LavinMQ::SchemaVersion do
3131
file.resize(LavinMQ::Config.instance.segment_size)
3232
end
3333
# init new message store
34-
msg_store = LavinMQ::MessageStore.new(data_dir, nil)
34+
msg_store = LavinMQ::QueueMessageStore.new(data_dir, nil)
3535
msg_store.@segments.first_value.size.should eq 4
3636
end
3737
end
@@ -40,13 +40,13 @@ describe LavinMQ::SchemaVersion do
4040
with_amqp_server do |s|
4141
v = s.vhosts["/"]
4242
v.declare_queue("q", true, false)
43-
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@msg_dir
43+
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.as(LavinMQ::QueueMessageStore).@msg_dir
4444
path = File.join(data_dir, "msgs.0000000002")
4545
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
4646
file.resize(LavinMQ::Config.instance.segment_size)
4747
end
4848
# init new message store
49-
msg_store = LavinMQ::MessageStore.new(data_dir, nil)
49+
msg_store = LavinMQ::QueueMessageStore.new(data_dir, nil)
5050
msg_store.@segments.size.should eq 1
5151
end
5252
end

0 commit comments

Comments
 (0)