Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions spec/api/queues_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ describe LavinMQ::HTTP::QueuesController do
end
end
end

describe "PUT /api/queues/vhost/name/pause" do
it "should pause the queue" do
with_http_server do |http, s|
Expand All @@ -548,6 +549,7 @@ describe LavinMQ::HTTP::QueuesController do
end
end
end

describe "PUT /api/queues/vhost/name/resume" do
it "should resume the queue" do
with_http_server do |http, s|
Expand All @@ -573,6 +575,7 @@ describe LavinMQ::HTTP::QueuesController do
end
end
end

describe "GET /api/queues/vhost/name effective_arguments" do
it "should include x-max-age in effective_arguments for streams" do
with_http_server do |http, s|
Expand All @@ -589,4 +592,49 @@ describe LavinMQ::HTTP::QueuesController do
end
end
end

describe "PUT /api/queues/vhost/name/restart" do
it "should restart a queue" do
with_http_server do |http, s|
with_channel(s) do |ch|
queue_name = "restart_queue"
ch.queue(queue_name)

q = s.vhosts["/"].queues[queue_name]
q.close

response = http.get("/api/queues/%2f/#{queue_name}")
response.status_code.should eq 200
body = JSON.parse(response.body)
body["state"].should eq "closed"

response = http.put("/api/queues/%2f/#{queue_name}/restart")
response.status_code.should eq 204

response = http.get("/api/queues/%2f/#{queue_name}")
response.status_code.should eq 200
body = JSON.parse(response.body)
body["state"].should eq "running"
end
end
end

it "should not restart if queue is still running" do
with_http_server do |http, s|
with_channel(s) do |ch|
queue_name = "restart_queue"
ch.queue(queue_name)

s.vhosts["/"].queues[queue_name]
response = http.put("/api/queues/%2f/#{queue_name}/restart")
response.status_code.should eq 400

response = http.get("/api/queues/%2f/#{queue_name}")
response.status_code.should eq 200
body = JSON.parse(response.body)
body["state"].should eq "running"
end
end
end
end
end
115 changes: 115 additions & 0 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,121 @@ describe LavinMQ::AMQP::Queue do
end
end

describe "Restarting queues" do
q_name = "restart"
it "should restart a closed queue" do
with_amqp_server do |s|
with_channel(s) do |ch|
q = ch.queue(q_name, durable: true)
queue = s.vhosts["/"].queues[q_name].as(LavinMQ::AMQP::DurableQueue)

# Publish a message
q.publish_confirm "test message"
queue.message_count.should eq 1

# Close the queue
queue.close
queue.closed?.should be_true

# Restart the queue & verify
queue.restart!.should be_true
queue.closed?.should be_false
queue.message_count.should eq 1
msg = q.get(no_ack: true)
msg.should_not be_nil
msg.not_nil!.body_io.to_s.should eq "test message"
end
end
end

it "should restart after corrupt data closes the queue" do
with_amqp_server do |s|
vhost = s.vhosts.create("restart_vhost")
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue(q_name, durable: true)
queue = vhost.queues[q_name].as(LavinMQ::AMQP::DurableQueue)
q.publish_confirm "test message"
queue.message_count.should eq 1

# Write corrupt data to the segment file
mfile = queue.@[email protected]_value
File.open(mfile.path, "w+") do |f|
f.seek(mfile.size - mfile.size + 4)
f.write(("x"*10).to_slice)
end

# Try to consume, which will trigger the close due to corrupt data
q.subscribe(tag: "tag", no_ack: false, &.ack)
should_eventually(be_true) { queue.state.closed? }

# Delete corrupted segment file
File.delete(mfile.path)

# Restart the queue & verify that it is running
queue.restart!.should be_true
queue.closed?.should be_false
queue.state.running?.should be_true
queue.message_count.should eq 0
end
end
end

it "should expire msgs after restarting a queue" do
with_amqp_server do |s|
with_channel(s) do |ch|
q = ch.queue(q_name, durable: true, args: AMQP::Client::Arguments.new(
{"x-message-ttl" => 500, "x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => "dlq"}
))
queue = s.vhosts["/"].queues[q_name].as(LavinMQ::AMQP::DurableQueue)

# Publish a message
q.publish_confirm "test message"
queue.message_count.should eq 1

# Close the queue
queue.close
queue.closed?.should be_true

# Restart the queue & verify
queue.restart!.should be_true
queue.closed?.should be_false
queue.message_count.should eq 1
should_eventually(be_true) { queue.message_count == 0 }
end
end
end

it "should expire queue after restarting a queue" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue(q_name, durable: true, args: AMQP::Client::Arguments.new({"x-expires" => 100}))
queue = s.vhosts["/"].queues[q_name].as(LavinMQ::AMQP::DurableQueue)

# Close the queue
queue.close
queue.closed?.should be_true

# Restart the queue & verify
queue.restart!.should be_true
queue.closed?.should be_false
should_eventually(be_true) { queue.closed? }
end
end
end

it "should not restart if queue is still running" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.queue(q_name, durable: true)
queue = s.vhosts["/"].queues[q_name].as(LavinMQ::AMQP::DurableQueue)

# Try to restart without closing
queue.restart!.should be_false
end
end
end
end

describe "Purge" do
x_name = "purge"
q_name = "purge"
Expand Down
60 changes: 60 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -648,4 +648,64 @@ describe LavinMQ::AMQP::Stream do
end
end
end

describe "Restarting stream" do
queue_name = Random::Secure.hex
it "should restart a closed stream" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
args = {"x-queue-type": "stream"}
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
stream = s.vhosts["/"].queues[queue_name].as(LavinMQ::AMQP::Stream)
q.publish_confirm "test message"
stream.message_count.should eq 1

# Close the stream
stream.close
stream.closed?.should be_true

# Restart the stream & verify
stream.restart!
stream.closed?.should be_false
stream.message_count.should eq 1

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": 0})) do |msg|
msgs.send msg
msg.ack
end
if msg = msgs.receive
msg.body_io.to_s.should eq "test message"
else
fail("Did not receive message after stream restart")
end
end
end
end

it "should resume consuming from the correct position after a restart" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": "true"})

with_amqp_server do |s|
StreamSpecHelpers.publish(s, queue_name, 2)

# tracks offset
msg = StreamSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1

stream = s.vhosts["/"].queues[queue_name].as(LavinMQ::AMQP::Stream)
stream.close
stream.closed?.should be_true
stream.restart!
stream.closed?.should be_false

# should continue from tracked offset
msg = StreamSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamSpecHelpers.offset_from_headers(msg.properties.headers).should eq 2
end
end
end
end
52 changes: 41 additions & 11 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,48 @@ module LavinMQ::AMQP
@metadata = ::Log::Metadata.new(nil, {queue: @name, vhost: @vhost.name})
@log = Logger.new(Log, @metadata)
File.open(File.join(@data_dir, ".queue"), "w") { |f| f.sync = true; f.print @name }
if File.exists?(File.join(@data_dir, ".paused")) # Migrate '.paused' files to 'paused'
File.rename(File.join(@data_dir, ".paused"), File.join(@data_dir, "paused"))
end
if File.exists?(File.join(@data_dir, "paused"))
@state = QueueState::Paused
@paused.set(true)
end
@msg_store = init_msg_store(@data_dir)
@empty = @msg_store.empty
start
end

private def start : Bool
if @msg_store.closed
close
!close
else
if File.exists?(File.join(@data_dir, ".paused")) # Migrate '.paused' files to 'paused'
File.rename(File.join(@data_dir, ".paused"), File.join(@data_dir, "paused"))
end
if File.exists?(File.join(@data_dir, "paused"))
@state = QueueState::Paused
@paused.set(true)
end
handle_arguments
spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}" if @expires
spawn message_expire_loop, name: "Queue#message_expire_loop #{@vhost.name}/#{@name}"
true
end
end

def restart! : Bool
return false unless @closed
reset_queue_state
@msg_store = init_msg_store(@data_dir)
@empty = @msg_store.empty
handle_arguments
spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}" if @expires
spawn message_expire_loop, name: "Queue#message_expire_loop #{@vhost.name}/#{@name}"
start
end

private def reset_queue_state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we already have a lot of state to keep track of, while this doesn't add more states it adds more places where we need to keep track of all the states for a queue.
Not sure if there is a better solution for this at the moment. I just wanted to flag that the more state changes we add to more difficult it will be to keep everything in correct state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there is a lot to keep track of. I guess another option would be to delete the queue object in VHost and recreate it, but I think that just moves the complexity to VHost instead.

@closed = false
@state = QueueState::Running
# Recreate channels that were closed
@queue_expiration_ttl_change = ::Channel(Nil).new
@message_ttl_change = ::Channel(Nil).new
@paused = BoolChannel.new(false)
@consumers_empty = BoolChannel.new(true)
@single_active_consumer_change = ::Channel(Client::Channel::Consumer).new
@unacked_count.set(0u32, :relaxed)
@unacked_bytesize.set(0u64, :relaxed)
end

# own method so that it can be overriden in other queue implementations
Expand Down Expand Up @@ -390,6 +417,9 @@ module LavinMQ::AMQP
@msg_store_lock.synchronize do
@msg_store.close
end
@deliveries.clear
@basic_get_unacked.clear
@deduper = nil
# TODO: When closing due to ReadError, queue is deleted if exclusive
delete if !durable? || @exclusive
Fiber.yield
Expand Down
11 changes: 10 additions & 1 deletion src/lavinmq/amqp/stream/stream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ module LavinMQ::AMQP
@exclusive = false, @auto_delete = false,
@arguments = AMQP::Table.new)
super
spawn unmap_and_remove_segments_loop, name: "Stream#unmap_and_remove_segments_loop"
end

private def apply_policy_argument(key : String, value : JSON::Any)
Expand Down Expand Up @@ -75,6 +74,16 @@ module LavinMQ::AMQP
# Streams doesn't handle queue expiration
end

private def start : Bool
if @msg_store.closed
!close
else
handle_arguments
spawn unmap_and_remove_segments_loop, name: "Stream#unmap_and_remove_segments_loop"
true
end
end

private def init_msg_store(data_dir)
replicator = @vhost.@replicator
@msg_store = StreamMessageStore.new(data_dir, replicator, metadata: @metadata)
Expand Down
12 changes: 12 additions & 0 deletions src/lavinmq/http/controller/queues.cr
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ module LavinMQ
end
end

put "/api/queues/:vhost/:name/restart" do |context, params|
with_vhost(context, params) do |vhost|
refuse_unless_management(context, user(context), vhost)
q = find_queue(context, params, vhost)
if q.restart!
context.response.status_code = 204
else
bad_request(context, "Queue was not restarted")
end
end
end

delete "/api/queues/:vhost/:name" do |context, params|
with_vhost(context, params) do |vhost|
refuse_unless_management(context, user(context), vhost)
Expand Down
Loading
Loading