Skip to content

Commit d8b3288

Browse files
Merge pull request #14006 from rabbitmq/delete-qos0-queue
Delete mqtt qos0 queue when mqtt 5.0 connection is closed
2 parents 3a086e8 + ae9e195 commit d8b3288

File tree

7 files changed

+148
-3
lines changed

7 files changed

+148
-3
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1906,7 +1906,7 @@ log_delayed_will_failure(Topic, ClientId, Reason) ->
19061906
[Topic, ClientId, Reason]).
19071907

19081908
maybe_delete_mqtt_qos0_queue(
1909-
State = #state{cfg = #cfg{clean_start = true},
1909+
State = #state{cfg = #cfg{session_expiry_interval_secs = 0},
19101910
auth_state = #auth_state{user = #user{username = Username}}}) ->
19111911
case get_queue(?QOS_0, State) of
19121912
{ok, Q} ->

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ init_per_testcase(T, Config)
211211
init_per_testcase0(T, Config);
212212
init_per_testcase(T, Config)
213213
when T =:= clean_session_disconnect_client;
214+
T =:= zero_session_expiry_interval_disconnect_client;
214215
T =:= clean_session_node_restart;
215216
T =:= clean_session_node_kill;
216217
T =:= notify_consumer_qos0_queue_deleted ->
@@ -229,6 +230,7 @@ end_per_testcase(T, Config)
229230
end_per_testcase0(T, Config);
230231
end_per_testcase(T, Config)
231232
when T =:= clean_session_disconnect_client;
233+
T =:= zero_session_expiry_interval_disconnect_client;
232234
T =:= clean_session_node_restart;
233235
T =:= clean_session_node_kill;
234236
T =:= notify_consumer_qos0_queue_deleted ->

deps/rabbitmq_mqtt/test/v5_SUITE.erl

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ cluster_size_1_tests() ->
7171
session_expiry_reconnect_non_zero,
7272
session_expiry_reconnect_zero,
7373
session_expiry_reconnect_infinity_to_zero,
74+
zero_session_expiry_disconnect_autodeletes_qos0_queue,
7475
client_publish_qos2,
7576
client_rejects_publish,
7677
client_receive_maximum_min,
@@ -188,6 +189,12 @@ init_per_testcase(T, Config)
188189
ok = rpc(Config, application, set_env, [?APP, Par, infinity]),
189190
Config1 = rabbit_ct_helpers:set_config(Config, {Par, Default}),
190191
init_per_testcase0(T, Config1);
192+
193+
init_per_testcase(T, Config)
194+
when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue ->
195+
rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]),
196+
init_per_testcase0(T, Config);
197+
191198
init_per_testcase(T, Config) ->
192199
init_per_testcase0(T, Config).
193200

@@ -202,6 +209,11 @@ end_per_testcase(T, Config)
202209
Default = ?config(Par, Config),
203210
ok = rpc(Config, application, set_env, [?APP, Par, Default]),
204211
end_per_testcase0(T, Config);
212+
end_per_testcase(T, Config)
213+
when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue ->
214+
ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]),
215+
init_per_testcase0(T, Config);
216+
205217
end_per_testcase(T, Config) ->
206218
end_per_testcase0(T, Config).
207219

@@ -389,6 +401,22 @@ session_expiry_quorum_queue_disconnect_decrease(Config) ->
389401
ok = session_expiry_disconnect_decrease(rabbit_quorum_queue, Config),
390402
ok = rpc(Config, application, unset_env, [?APP, durable_queue_type]).
391403

404+
zero_session_expiry_disconnect_autodeletes_qos0_queue(Config) ->
405+
ClientId = ?FUNCTION_NAME,
406+
C = connect(ClientId, Config, [
407+
{clean_start, false},
408+
{properties, #{'Session-Expiry-Interval' => 0}}]),
409+
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
410+
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
411+
?assertEqual(1, length(QsQos0)),
412+
413+
ok = emqtt:disconnect(C),
414+
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
415+
%% Give the node some time to clean up the MQTT QoS 0 queue.
416+
timer:sleep(200),
417+
L = rpc(Config, rabbit_amqqueue, list, []),
418+
?assertEqual(0, length(L)).
419+
392420
session_expiry_disconnect_decrease(QueueType, Config) ->
393421
ClientId = ?FUNCTION_NAME,
394422
C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 100}}]),

deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,4 @@ maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
103103
notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
104104
notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
105105
notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
106+
zero_session_expiry_interval_disconnect_client(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).

selenium/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"author": "",
1313
"license": "ISC",
1414
"dependencies": {
15-
"chromedriver": "^135.0",
15+
"chromedriver": "^137.0",
1616
"ejs": "^3.1.8",
1717
"express": "^4.18.2",
1818
"geckodriver": "^3.0.2",
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
auth_backends.1 = rabbit_auth_backend_internal
22

3-
management.login_session_timeout = 1
43
load_definitions = ${IMPORT_DIR}/users.json
54

5+
management.login_session_timeout = 1
6+
67
loopback_users = none
8+
9+
log.console.level = debug
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
const { By, Key, until, Builder } = require('selenium-webdriver')
2+
require('chromedriver')
3+
const assert = require('assert')
4+
const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils')
5+
const { createQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
6+
const { openConnection, getConnectionOptions } = require('../mqtt')
7+
8+
const LoginPage = require('../pageobjects/LoginPage')
9+
const OverviewPage = require('../pageobjects/OverviewPage')
10+
const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
11+
const QueuePage = require('../pageobjects/QueuePage')
12+
const ConnectionsPage = require('../pageobjects/ConnectionsPage');
13+
14+
15+
describe('Given an MQTT 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () {
16+
let login
17+
let queuesAndStreamsPage
18+
let queuePage
19+
let overview
20+
let captureScreen
21+
let queueName
22+
23+
let mqttClient
24+
25+
before(async function () {
26+
driver = buildDriver()
27+
await goToHome(driver)
28+
login = new LoginPage(driver)
29+
overview = new OverviewPage(driver)
30+
queuePage = new QueuePage(driver)
31+
connectionsPage = new ConnectionsPage(driver)
32+
queuesAndStreamsPage = new QueuesAndStreamsPage(driver)
33+
captureScreen = captureScreensFor(driver, __filename)
34+
35+
await login.login('management', 'guest')
36+
if (!await overview.isLoaded()) {
37+
throw new Error('Failed to login')
38+
}
39+
//await overview.selectRefreshOption("Do not refresh")
40+
41+
queueName = "test_" + Math.floor(Math.random() * 1000)
42+
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
43+
"/", queueName, {
44+
"x-queue-type": "quorum"
45+
})
46+
47+
mqttClient = openConnection(getConnectionOptions())
48+
let subscribed = new Promise((resolve, reject) => {
49+
mqttClient.on('error', function(err) {
50+
reject(err)
51+
assert.fail("Mqtt connection failed due to " + err)
52+
}),
53+
mqttClient.on('connect', function(err) {
54+
mqttClient.subscribe(queueName, {qos:0}, function (err2) {
55+
if (!err2) {
56+
resolve("ok")
57+
}else {
58+
reject(err2)
59+
}
60+
})
61+
})
62+
})
63+
assert.equal("ok", await subscribed)
64+
65+
})
66+
67+
it('can view mqtt qos0 queue', async function () {
68+
await overview.clickOnQueuesTab()
69+
70+
let table = await doUntil(function() {
71+
return queuesAndStreamsPage.getQueuesTable()
72+
}, function(t) {
73+
return findTableRow(t, function(row) {
74+
return row[2] === 'rabbit_mqtt_qos0_queue'
75+
})
76+
})
77+
let mqttQueueName = findTableRow(table, function(row) {
78+
return row[2] === 'rabbit_mqtt_qos0_queue'
79+
})[1]
80+
81+
await goToQueue(driver, "/", mqttQueueName)
82+
await queuePage.isLoaded()
83+
84+
})
85+
86+
it('when the connection is closed, the mqtt qos0 queue should be removed', async function () {
87+
88+
mqttClient.end()
89+
90+
await overview.clickOnConnectionsTab()
91+
await doUntil(async function() {
92+
return connectionsPage.getPagingSectionHeaderText()
93+
}, function(header) {
94+
return header === "All connections (0)"
95+
}, 6000)
96+
97+
await overview.clickOnQueuesTab()
98+
await doUntil(function() {
99+
return queuesAndStreamsPage.getQueuesTable()
100+
}, function(table) {
101+
return !findTableRow(table, function(row) {
102+
return row[2] === 'rabbit_mqtt_qos0_queue'
103+
})
104+
})
105+
106+
})
107+
108+
after(async function () {
109+
await teardown(driver, this, captureScreen)
110+
})
111+
})

0 commit comments

Comments
 (0)