Skip to content

Commit 496b2d0

Browse files
Delete mqtt qos0 when connection closes
1 parent f7a238a commit 496b2d0

File tree

8 files changed

+146
-3
lines changed

8 files changed

+146
-3
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1906,7 +1906,7 @@ log_delayed_will_failure(Topic, ClientId, Reason) ->
19061906
[Topic, ClientId, Reason]).
19071907

19081908
maybe_delete_mqtt_qos0_queue(
1909-
State = #state{cfg = #cfg{clean_start = true},
1909+
State = #state{cfg = #cfg{session_expiry_interval_secs = 0},
19101910
auth_state = #auth_state{user = #user{username = Username}}}) ->
19111911
case get_queue(?QOS_0, State) of
19121912
{ok, Q} ->

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ cluster_size_1_tests() ->
113113
,block
114114
,amqp_to_mqtt_qos0
115115
,clean_session_disconnect_client
116+
,zero_session_expiry_interval_disconnect_client
116117
,clean_session_node_restart
117118
,clean_session_node_kill
118119
,rabbit_status_connection_count
@@ -211,6 +212,7 @@ init_per_testcase(T, Config)
211212
init_per_testcase0(T, Config);
212213
init_per_testcase(T, Config)
213214
when T =:= clean_session_disconnect_client;
215+
T =:= zero_session_expiry_interval_disconnect_client;
214216
T =:= clean_session_node_restart;
215217
T =:= clean_session_node_kill;
216218
T =:= notify_consumer_qos0_queue_deleted ->
@@ -229,6 +231,7 @@ end_per_testcase(T, Config)
229231
end_per_testcase0(T, Config);
230232
end_per_testcase(T, Config)
231233
when T =:= clean_session_disconnect_client;
234+
T =:= zero_session_expiry_interval_disconnect_client;
232235
T =:= clean_session_node_restart;
233236
T =:= clean_session_node_kill;
234237
T =:= notify_consumer_qos0_queue_deleted ->
@@ -1583,6 +1586,18 @@ clean_session_disconnect_client(Config) ->
15831586
L = rpc(Config, rabbit_amqqueue, list, []),
15841587
?assertEqual(0, length(L)).
15851588

1589+
zero_session_expiry_interval_disconnect_client(Config) ->
1590+
C = connect(?FUNCTION_NAME, Config, [{properties, #{'Session-Expiry-Interval' => 0}}]),
1591+
{ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0),
1592+
QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]),
1593+
?assertEqual(1, length(QsQos0)),
1594+
1595+
ok = emqtt:disconnect(C),
1596+
%% After terminating a clean session, we expect any session state to be cleaned up on the server.
1597+
timer:sleep(200), %% Give some time to clean up exclusive classic queue.
1598+
L = rpc(Config, rabbit_amqqueue, list, []),
1599+
?assertEqual(0, length(L)).
1600+
15861601
clean_session_node_restart(Config) ->
15871602
clean_session_node_down(stop_node, Config).
15881603

deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,4 @@ maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
103103
notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
104104
notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
105105
notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
106+
zero_session_expiry_interval_disconnect_client(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).

selenium/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"author": "",
1313
"license": "ISC",
1414
"dependencies": {
15-
"chromedriver": "^135.0",
15+
"chromedriver": "^137.0",
1616
"ejs": "^3.1.8",
1717
"express": "^4.18.2",
1818
"geckodriver": "^3.0.2",
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/usr/bin/env bash
2+
3+
SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
4+
5+
TEST_CASES_PATH=/queuesAndStreams
6+
TEST_CONFIG_PATH=/basic-auth
7+
PROFILES="disable-metrics"
8+
9+
source $SCRIPT/../../bin/suite_template $@
10+
run
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
auth_backends.1 = rabbit_auth_backend_internal
22

3-
management.login_session_timeout = 1
43
load_definitions = ${IMPORT_DIR}/users.json
54

5+
management.login_session_timeout = 1
6+
67
loopback_users = none
8+
9+
log.console.level = debug
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
management.disable_stats = true
3+
management_agent.disable_metrics_collector = true
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
const { By, Key, until, Builder } = require('selenium-webdriver')
2+
require('chromedriver')
3+
const assert = require('assert')
4+
const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils')
5+
const { createQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
6+
const { openConnection, getConnectionOptions } = require('../mqtt')
7+
8+
const LoginPage = require('../pageobjects/LoginPage')
9+
const OverviewPage = require('../pageobjects/OverviewPage')
10+
const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
11+
const QueuePage = require('../pageobjects/QueuePage')
12+
const ConnectionsPage = require('../pageobjects/ConnectionsPage');
13+
14+
15+
describe('Given an MQTT 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () {
16+
let login
17+
let queuesAndStreamsPage
18+
let queuePage
19+
let overview
20+
let captureScreen
21+
let queueName
22+
23+
let mqttClient
24+
25+
before(async function () {
26+
driver = buildDriver()
27+
await goToHome(driver)
28+
login = new LoginPage(driver)
29+
overview = new OverviewPage(driver)
30+
queuePage = new QueuePage(driver)
31+
connectionsPage = new ConnectionsPage(driver)
32+
queuesAndStreamsPage = new QueuesAndStreamsPage(driver)
33+
captureScreen = captureScreensFor(driver, __filename)
34+
35+
await login.login('management', 'guest')
36+
if (!await overview.isLoaded()) {
37+
throw new Error('Failed to login')
38+
}
39+
//await overview.selectRefreshOption("Do not refresh")
40+
41+
queueName = "test_" + Math.floor(Math.random() * 1000)
42+
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
43+
"/", queueName, {
44+
"x-queue-type": "quorum"
45+
})
46+
47+
mqttClient = openConnection(getConnectionOptions())
48+
let subscribed = new Promise((resolve, reject) => {
49+
mqttClient.on('error', function(err) {
50+
reject(err)
51+
assert.fail("Mqtt connection failed due to " + err)
52+
}),
53+
mqttClient.on('connect', function(err) {
54+
mqttClient.subscribe(queueName, {qos:0}, function (err2) {
55+
if (!err2) {
56+
resolve("ok")
57+
}else {
58+
reject(err2)
59+
}
60+
})
61+
})
62+
})
63+
assert.equal("ok", await subscribed)
64+
65+
})
66+
67+
it('can view mqtt qos0 queue', async function () {
68+
await overview.clickOnQueuesTab()
69+
70+
let table = await doUntil(function() {
71+
return queuesAndStreamsPage.getQueuesTable()
72+
}, function(t) {
73+
return findTableRow(t, function(row) {
74+
return row[2] === 'rabbit_mqtt_qos0_queue'
75+
})
76+
})
77+
let mqttQueueName = findTableRow(table, function(row) {
78+
return row[2] === 'rabbit_mqtt_qos0_queue'
79+
})[1]
80+
81+
await goToQueue(driver, "/", mqttQueueName)
82+
await queuePage.isLoaded()
83+
84+
})
85+
86+
it('when the connection is closed, the mqtt qos0 queue should be removed', async function () {
87+
88+
mqttClient.end()
89+
90+
await overview.clickOnConnectionsTab()
91+
await doUntil(async function() {
92+
return connectionsPage.getPagingSectionHeaderText()
93+
}, function(header) {
94+
return header === "All connections (0)"
95+
}, 6000)
96+
97+
await overview.clickOnQueuesTab()
98+
await doUntil(function() {
99+
return queuesAndStreamsPage.getQueuesTable()
100+
}, function(table) {
101+
return !findTableRow(table, function(row) {
102+
return row[2] === 'rabbit_mqtt_qos0_queue'
103+
})
104+
})
105+
106+
})
107+
108+
after(async function () {
109+
await teardown(driver, this, captureScreen)
110+
})
111+
})

0 commit comments

Comments
 (0)