diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 22c95cfadb04..18ad6d9735bf 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1898,7 +1898,7 @@ log_delayed_will_failure(Topic, ClientId, Reason) -> [Topic, ClientId, Reason]). maybe_delete_mqtt_qos0_queue( - State = #state{cfg = #cfg{clean_start = true}, + State = #state{cfg = #cfg{session_expiry_interval_secs = 0}, auth_state = #auth_state{user = #user{username = Username}}}) -> case get_queue(?QOS_0, State) of {ok, Q} -> diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 1db38072c43c..d6964017dec1 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -211,6 +211,7 @@ init_per_testcase(T, Config) init_per_testcase0(T, Config); init_per_testcase(T, Config) when T =:= clean_session_disconnect_client; + T =:= zero_session_expiry_interval_disconnect_client; T =:= clean_session_node_restart; T =:= clean_session_node_kill; T =:= notify_consumer_qos0_queue_deleted -> @@ -229,6 +230,7 @@ end_per_testcase(T, Config) end_per_testcase0(T, Config); end_per_testcase(T, Config) when T =:= clean_session_disconnect_client; + T =:= zero_session_expiry_interval_disconnect_client; T =:= clean_session_node_restart; T =:= clean_session_node_kill; T =:= notify_consumer_qos0_queue_deleted -> diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 44a195094430..d0cff4eda23b 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -71,6 +71,7 @@ cluster_size_1_tests() -> session_expiry_reconnect_non_zero, session_expiry_reconnect_zero, session_expiry_reconnect_infinity_to_zero, + zero_session_expiry_disconnect_autodeletes_qos0_queue, client_publish_qos2, client_rejects_publish, client_receive_maximum_min, @@ -188,6 +189,12 @@ init_per_testcase(T, Config) ok = rpc(Config, application, set_env, [?APP, Par, infinity]), Config1 = rabbit_ct_helpers:set_config(Config, {Par, Default}), init_per_testcase0(T, Config1); + +init_per_testcase(T, Config) + when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue -> + rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]), + init_per_testcase0(T, Config); + init_per_testcase(T, Config) -> init_per_testcase0(T, Config). @@ -202,6 +209,11 @@ end_per_testcase(T, Config) Default = ?config(Par, Config), ok = rpc(Config, application, set_env, [?APP, Par, Default]), end_per_testcase0(T, Config); +end_per_testcase(T, Config) + when T =:= zero_session_expiry_disconnect_autodeletes_qos0_queue -> + ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]), + init_per_testcase0(T, Config); + end_per_testcase(T, Config) -> end_per_testcase0(T, Config). @@ -389,6 +401,22 @@ session_expiry_quorum_queue_disconnect_decrease(Config) -> ok = session_expiry_disconnect_decrease(rabbit_quorum_queue, Config), ok = rpc(Config, application, unset_env, [?APP, durable_queue_type]). +zero_session_expiry_disconnect_autodeletes_qos0_queue(Config) -> + ClientId = ?FUNCTION_NAME, + C = connect(ClientId, Config, [ + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 0}}]), + {ok, _, _} = emqtt:subscribe(C, <<"topic0">>, qos0), + QsQos0 = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_mqtt_qos0_queue]), + ?assertEqual(1, length(QsQos0)), + + ok = emqtt:disconnect(C), + %% After terminating a clean session, we expect any session state to be cleaned up on the server. + %% Give the node some time to clean up the MQTT QoS 0 queue. + timer:sleep(200), + L = rpc(Config, rabbit_amqqueue, list, []), + ?assertEqual(0, length(L)). + session_expiry_disconnect_decrease(QueueType, Config) -> ClientId = ?FUNCTION_NAME, C1 = connect(ClientId, Config, [{properties, #{'Session-Expiry-Interval' => 100}}]), diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index 8083d481578f..dc5fd9377378 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -103,3 +103,5 @@ maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +zero_session_expiry_interval_disconnect_client(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +zero_session_expiry_disconnect_autodeletes_qos0_queue(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). \ No newline at end of file diff --git a/selenium/package.json b/selenium/package.json index c84f5668ff73..c79d91274d10 100644 --- a/selenium/package.json +++ b/selenium/package.json @@ -12,7 +12,7 @@ "author": "", "license": "ISC", "dependencies": { - "chromedriver": "^135.0", + "chromedriver": "^137.0", "ejs": "^3.1.8", "express": "^4.18.2", "geckodriver": "^3.0.2", diff --git a/selenium/test/basic-auth/rabbitmq.conf b/selenium/test/basic-auth/rabbitmq.conf index 7bacc14af27a..8bdbec84dd39 100644 --- a/selenium/test/basic-auth/rabbitmq.conf +++ b/selenium/test/basic-auth/rabbitmq.conf @@ -1,6 +1,9 @@ auth_backends.1 = rabbit_auth_backend_internal -management.login_session_timeout = 1 load_definitions = ${IMPORT_DIR}/users.json +management.login_session_timeout = 1 + loopback_users = none + +log.console.level = debug diff --git a/selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js b/selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js new file mode 100644 index 000000000000..1e90f82d02c1 --- /dev/null +++ b/selenium/test/queuesAndStreams/autodelete-mqtt-qos0.js @@ -0,0 +1,111 @@ +const { By, Key, until, Builder } = require('selenium-webdriver') +require('chromedriver') +const assert = require('assert') +const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils') +const { createQueue, getManagementUrl, basicAuthorization } = require('../mgt-api') +const { openConnection, getConnectionOptions } = require('../mqtt') + +const LoginPage = require('../pageobjects/LoginPage') +const OverviewPage = require('../pageobjects/OverviewPage') +const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage') +const QueuePage = require('../pageobjects/QueuePage') +const ConnectionsPage = require('../pageobjects/ConnectionsPage'); + + +describe('Given an MQTT 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () { + let login + let queuesAndStreamsPage + let queuePage + let overview + let captureScreen + let queueName + + let mqttClient + + before(async function () { + driver = buildDriver() + await goToHome(driver) + login = new LoginPage(driver) + overview = new OverviewPage(driver) + queuePage = new QueuePage(driver) + connectionsPage = new ConnectionsPage(driver) + queuesAndStreamsPage = new QueuesAndStreamsPage(driver) + captureScreen = captureScreensFor(driver, __filename) + + await login.login('management', 'guest') + if (!await overview.isLoaded()) { + throw new Error('Failed to login') + } + //await overview.selectRefreshOption("Do not refresh") + + queueName = "test_" + Math.floor(Math.random() * 1000) + createQueue(getManagementUrl(), basicAuthorization("management", "guest"), + "/", queueName, { + "x-queue-type": "quorum" + }) + + mqttClient = openConnection(getConnectionOptions()) + let subscribed = new Promise((resolve, reject) => { + mqttClient.on('error', function(err) { + reject(err) + assert.fail("Mqtt connection failed due to " + err) + }), + mqttClient.on('connect', function(err) { + mqttClient.subscribe(queueName, {qos:0}, function (err2) { + if (!err2) { + resolve("ok") + }else { + reject(err2) + } + }) + }) + }) + assert.equal("ok", await subscribed) + + }) + + it('can view mqtt qos0 queue', async function () { + await overview.clickOnQueuesTab() + + let table = await doUntil(function() { + return queuesAndStreamsPage.getQueuesTable() + }, function(t) { + return findTableRow(t, function(row) { + return row[2] === 'rabbit_mqtt_qos0_queue' + }) + }) + let mqttQueueName = findTableRow(table, function(row) { + return row[2] === 'rabbit_mqtt_qos0_queue' + })[1] + + await goToQueue(driver, "/", mqttQueueName) + await queuePage.isLoaded() + + }) + + it('when the connection is closed, the mqtt qos0 queue should be removed', async function () { + + mqttClient.end() + + await overview.clickOnConnectionsTab() + await doUntil(async function() { + return connectionsPage.getPagingSectionHeaderText() + }, function(header) { + return header === "All connections (0)" + }, 6000) + + await overview.clickOnQueuesTab() + await doUntil(function() { + return queuesAndStreamsPage.getQueuesTable() + }, function(table) { + return !findTableRow(table, function(row) { + return row[2] === 'rabbit_mqtt_qos0_queue' + }) + }) + + }) + + after(async function () { + await teardown(driver, this, captureScreen) + }) +})