Skip to content

Delete mqtt qos0 queue when mqtt 5.0 connection is closed #14006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,7 @@ log_delayed_will_failure(Topic, ClientId, Reason) ->
[Topic, ClientId, Reason]).

maybe_delete_mqtt_qos0_queue(
State = #state{cfg = #cfg{clean_start = true},
State = #state{cfg = #cfg{session_expiry_interval_secs = 0},
auth_state = #auth_state{user = #user{username = Username}}}) ->
case get_queue(?QOS_0, State) of
{ok, Q} ->
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ init_per_testcase(T, Config)
init_per_testcase0(T, Config);
init_per_testcase(T, Config)
when T =:= clean_session_disconnect_client;
T =:= zero_session_expiry_interval_disconnect_client;
T =:= clean_session_node_restart;
T =:= clean_session_node_kill;
T =:= notify_consumer_qos0_queue_deleted ->
Expand All @@ -229,6 +230,7 @@ end_per_testcase(T, Config)
end_per_testcase0(T, Config);
end_per_testcase(T, Config)
when T =:= clean_session_disconnect_client;
T =:= zero_session_expiry_interval_disconnect_client;
T =:= clean_session_node_restart;
T =:= clean_session_node_kill;
T =:= notify_consumer_qos0_queue_deleted ->
Expand Down
28 changes: 28 additions & 0 deletions deps/rabbitmq_mqtt/test/v5_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ cluster_size_1_tests() ->
session_expiry_reconnect_non_zero,
session_expiry_reconnect_zero,
session_expiry_reconnect_infinity_to_zero,
zero_session_expiry_disconnect_autodeletes_qos0_queue,
client_publish_qos2,
client_rejects_publish,
client_receive_maximum_min,
Expand Down Expand Up @@ -188,6 +189,12 @@ init_per_testcase(T, Config)
ok = rpc(Config, application, set_env, [?APP, Par, infinity]),
Config1 = rabbit_ct_helpers:set_config(Config, {Par, Default}),
init_per_testcase0(T, Config1);

init_per_testcase(T, Config)
when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue ->
rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]),
init_per_testcase0(T, Config);

init_per_testcase(T, Config) ->
init_per_testcase0(T, Config).

Expand All @@ -202,6 +209,11 @@ end_per_testcase(T, Config)
Default = ?config(Par, Config),
ok = rpc(Config, application, set_env, [?APP, Par, Default]),
end_per_testcase0(T, Config);
end_per_testcase(T, Config)
when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue ->
ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]),
init_per_testcase0(T, Config);

end_per_testcase(T, Config) ->
end_per_testcase0(T, Config).

Expand Down Expand Up @@ -389,6 +401,22 @@ session_expiry_quorum_queue_disconnect_decrease(Config) ->
ok = session_expiry_disconnect_decrease(rabbit_quorum_queue, Config),
ok = rpc(Config, application, unset_env, [?APP, durable_queue_type]).

zero_session_expiry_disconnect_autodeletes_qos0_queue(Config) ->
ClientId = ?FUNCTION_NAME,
C = connect(ClientId, Config, [
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 0}}]),
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
?assertEqual(1, length(QsQos0)),

ok = emqtt:disconnect(C),
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
%% Give the node some time to clean up the MQTT QoS 0 queue.
timer:sleep(200),
L = rpc(Config, rabbit_amqqueue, list, []),
?assertEqual(0, length(L)).

session_expiry_disconnect_decrease(QueueType, Config) ->
ClientId = ?FUNCTION_NAME,
C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 100}}]),
Expand Down
1 change: 1 addition & 0 deletions deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
zero_session_expiry_interval_disconnect_client(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
2 changes: 1 addition & 1 deletion selenium/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"author": "",
"license": "ISC",
"dependencies": {
"chromedriver": "^135.0",
"chromedriver": "^137.0",
"ejs": "^3.1.8",
"express": "^4.18.2",
"geckodriver": "^3.0.2",
Expand Down
5 changes: 4 additions & 1 deletion selenium/test/basic-auth/rabbitmq.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
auth_backends.1 = rabbit_auth_backend_internal

management.login_session_timeout = 1
load_definitions = ${IMPORT_DIR}/users.json

management.login_session_timeout = 1

loopback_users = none

log.console.level = debug
111 changes: 111 additions & 0 deletions selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
const { By, Key, until, Builder } = require('selenium-webdriver')
require('chromedriver')
const assert = require('assert')
const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils')
const { createQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
const { openConnection, getConnectionOptions } = require('../mqtt')

const LoginPage = require('../pageobjects/LoginPage')
const OverviewPage = require('../pageobjects/OverviewPage')
const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
const QueuePage = require('../pageobjects/QueuePage')
const ConnectionsPage = require('../pageobjects/ConnectionsPage');


describe('Given an MQTT 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () {
let login
let queuesAndStreamsPage
let queuePage
let overview
let captureScreen
let queueName

let mqttClient

before(async function () {
driver = buildDriver()
await goToHome(driver)
login = new LoginPage(driver)
overview = new OverviewPage(driver)
queuePage = new QueuePage(driver)
connectionsPage = new ConnectionsPage(driver)
queuesAndStreamsPage = new QueuesAndStreamsPage(driver)
captureScreen = captureScreensFor(driver, __filename)

await login.login('management', 'guest')
if (!await overview.isLoaded()) {
throw new Error('Failed to login')
}
//await overview.selectRefreshOption("Do not refresh")

queueName = "test_" + Math.floor(Math.random() * 1000)
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
"/", queueName, {
"x-queue-type": "quorum"
})

mqttClient = openConnection(getConnectionOptions())
let subscribed = new Promise((resolve, reject) => {
mqttClient.on('error', function(err) {
reject(err)
assert.fail("Mqtt connection failed due to " + err)
}),
mqttClient.on('connect', function(err) {
mqttClient.subscribe(queueName, {qos:0}, function (err2) {
if (!err2) {
resolve("ok")
}else {
reject(err2)
}
})
})
})
assert.equal("ok", await subscribed)

})

it('can view mqtt qos0 queue', async function () {
await overview.clickOnQueuesTab()

let table = await doUntil(function() {
return queuesAndStreamsPage.getQueuesTable()
}, function(t) {
return findTableRow(t, function(row) {
return row[2] === 'rabbit_mqtt_qos0_queue'
})
})
let mqttQueueName = findTableRow(table, function(row) {
return row[2] === 'rabbit_mqtt_qos0_queue'
})[1]

await goToQueue(driver, "/", mqttQueueName)
await queuePage.isLoaded()

})

it('when the connection is closed, the mqtt qos0 queue should be removed', async function () {

mqttClient.end()

await overview.clickOnConnectionsTab()
await doUntil(async function() {
return connectionsPage.getPagingSectionHeaderText()
}, function(header) {
return header === "All connections (0)"
}, 6000)

await overview.clickOnQueuesTab()
await doUntil(function() {
return queuesAndStreamsPage.getQueuesTable()
}, function(table) {
return !findTableRow(table, function(row) {
return row[2] === 'rabbit_mqtt_qos0_queue'
})
})

})

after(async function () {
await teardown(driver, this, captureScreen)
})
})
Loading