Skip to content

Commit cc86ffe

Browse files
MarcialRosalesmichaelklishin
authored andcommitted
Fix issue around rendering a mqtt qos0 queue
1 parent 8e78c10 commit cc86ffe

File tree

11 files changed

+293
-5
lines changed

11 files changed

+293
-5
lines changed

deps/rabbitmq_management/priv/www/js/main.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1762,7 +1762,14 @@ function is_internal(queue) {
17621762
}
17631763

17641764
function get_queue_type (queue) {
1765-
return queue.type;
1765+
switch(queue.type) {
1766+
case "classic":
1767+
case "quorum":
1768+
case "stream":
1769+
return queue.type;
1770+
default:
1771+
return "default"
1772+
}
17661773
}
17671774

17681775
function is_quorum(queue) {

selenium/full-suite-management-ui

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ authnz-mgt/oauth-with-keycloak.sh
1414
authnz-mgt/oauth-with-keycloak-with-verify-none.sh
1515
authnz-mgt/oauth-with-uaa-down-but-with-basic-auth.sh
1616
authnz-mgt/oauth-with-uaa-down.sh
17+
mgt/amqp10-connections.sh
18+
mgt/mqtt-connections.sh
1719
mgt/vhosts.sh
1820
mgt/definitions.sh
1921
mgt/exchanges.sh

selenium/short-suite-management-ui

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ mgt/exchanges.sh
88
mgt/queuesAndStreams.sh
99
mgt/limits.sh
1010
mgt/amqp10-connections.sh
11+
mgt/mqtt-connections.sh
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/usr/bin/env bash
2+
3+
SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
4+
5+
TEST_CASES_PATH=/connections/mqtt
6+
TEST_CONFIG_PATH=/basic-auth
7+
8+
source $SCRIPT/../../bin/suite_template $@
9+
run
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
[rabbitmq_management,rabbitmq_stream,rabbitmq_stream_common,rabbitmq_stream_management,
2-
rabbitmq_top,rabbitmq_tracing,rabbitmq_federation_management,rabbitmq_shovel_management].
2+
rabbitmq_top,rabbitmq_tracing,rabbitmq_federation_management,rabbitmq_shovel_management,
3+
rabbitmq_mqtt].
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
const { By, Key, until, Builder } = require('selenium-webdriver')
2+
require('chromedriver')
3+
const assert = require('assert')
4+
const { buildDriver, goToHome, captureScreensFor, teardown, doUntil } = require('../../utils')
5+
const { openConnection, getConnectionOptions } = require('../../mqtt')
6+
7+
const LoginPage = require('../../pageobjects/LoginPage')
8+
const OverviewPage = require('../../pageobjects/OverviewPage')
9+
const ConnectionsPage = require('../../pageobjects/ConnectionsPage');
10+
11+
12+
describe('List MQTT connections', function () {
13+
let login
14+
let overview
15+
let captureScreen
16+
let mqttClient
17+
18+
before(async function () {
19+
driver = buildDriver()
20+
await goToHome(driver)
21+
login = new LoginPage(driver)
22+
overview = new OverviewPage(driver)
23+
connectionsPage = new ConnectionsPage(driver)
24+
captureScreen = captureScreensFor(driver, __filename)
25+
26+
await login.login('management', 'guest')
27+
if (!await overview.isLoaded()) {
28+
throw new Error('Failed to login')
29+
}
30+
31+
})
32+
33+
it('mqtt 5.0 connection', async function () {
34+
mqttClient = openConnection(getConnectionOptions())
35+
36+
let connected = new Promise((resolve, reject) => {
37+
mqttClient.on('error', function(err) {
38+
reject(err)
39+
assert.fail("Mqtt connection failed due to " + err)
40+
}),
41+
mqttClient.on('connect', function(err2) {
42+
resolve("ok")
43+
})
44+
})
45+
assert.equal("ok", await connected)
46+
47+
try {
48+
await overview.clickOnConnectionsTab()
49+
50+
let table = await doUntil(async function() {
51+
return connectionsPage.getConnectionsTable()
52+
}, function(table) {
53+
return table.length > 0
54+
}, 6000)
55+
assert.equal(table[0][5], "MQTT 5-0")
56+
57+
} finally {
58+
if (mqttClient) mqttClient.end()
59+
}
60+
61+
})
62+
63+
after(async function () {
64+
await teardown(driver, this, captureScreen)
65+
66+
})
67+
})

selenium/test/exchanges/management.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,12 @@ describe('Exchange management', function () {
7676
return table.length > 0
7777
})
7878

79+
log("Opening selectable columns popup...")
7980
await exchanges.clickOnSelectTableColumns()
81+
log("Getting all selectable dolumns ...")
8082
let table = await exchanges.getSelectableTableColumns()
8183

84+
log("Asserting selectable dolumns ...")
8285
let overviewGroup = {
8386
"name" : "Overview:",
8487
"columns": [

selenium/test/mqtt.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
const mqtt = require('mqtt')
2+
3+
module.exports = {
4+
5+
openConnection: (mqttOptions) => {
6+
let rabbit = process.env.RABBITMQ_HOSTNAME || 'localhost'
7+
let mqttUrl = process.env.RABBITMQ_MQTT_URL || "mqtt://" + rabbit + ":1883"
8+
return mqtt.connect(mqttUrl, mqttOptions)
9+
},
10+
getConnectionOptions: () => {
11+
let mqttProtocol = process.env.MQTT_PROTOCOL || 'mqtt'
12+
let usemtls = process.env.MQTT_USE_MTLS || false
13+
let username = process.env.RABBITMQ_AMQP_USERNAME || 'management'
14+
let password = process.env.RABBITMQ_AMQP_PASSWORD || 'guest'
15+
let client_id = process.env.RABBITMQ_AMQP_USERNAME || 'selenium-client'
16+
17+
mqttOptions = {
18+
clientId: client_id,
19+
protocolId: 'MQTT',
20+
protocol: mqttProtocol,
21+
protocolVersion: 5,
22+
keepalive: 10000,
23+
clean: false,
24+
reconnectPeriod: '1000',
25+
properties: {
26+
sessionExpiryInterval: 0
27+
}
28+
}
29+
30+
if (mqttProtocol == 'mqtts') {
31+
mqttOptions["ca"] = [fs.readFileSync(process.env.RABBITMQ_CERTS + "/ca_rabbitmq_certificate.pem")]
32+
}
33+
if (usemtls) {
34+
mqttOptions["cert"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_certificate.pem")
35+
mqttOptions["key"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_key.pem")
36+
} else {
37+
mqttOptions["username"] = username
38+
mqttOptions["password"] = password
39+
}
40+
return mqttOptions
41+
}
42+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
const { By, Key, until, Builder } = require('selenium-webdriver')
2+
require('chromedriver')
3+
const assert = require('assert')
4+
const { buildDriver, goToHome, goToQueue, captureScreensFor, teardown, doUntil, findTableRow } = require('../utils')
5+
const { createQueue, deleteQueue, getManagementUrl, basicAuthorization } = require('../mgt-api')
6+
const mqtt = require('mqtt')
7+
8+
const LoginPage = require('../pageobjects/LoginPage')
9+
const OverviewPage = require('../pageobjects/OverviewPage')
10+
const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
11+
const QueuePage = require('../pageobjects/QueuePage')
12+
const ConnectionsPage = require('../pageobjects/ConnectionsPage');
13+
14+
15+
describe('Given a mqtt 5.0 connection with a qos 0 subscription with zero sessionExpiryInterval', function () {
16+
let login
17+
let queuesAndStreams
18+
let queuePage
19+
let overview
20+
let captureScreen
21+
let queueName
22+
let mqttOptions
23+
24+
let mqttProtocol = process.env.MQTT_PROTOCOL || 'mqtt'
25+
let usemtls = process.env.MQTT_USE_MTLS || false
26+
let rabbit = process.env.RABBITMQ_HOSTNAME || 'localhost'
27+
let mqttUrl = process.env.RABBITMQ_MQTT_URL || "mqtt://" + rabbit + ":1883"
28+
let username = process.env.RABBITMQ_AMQP_USERNAME || 'management'
29+
let password = process.env.RABBITMQ_AMQP_PASSWORD || 'guest'
30+
let client_id = process.env.RABBITMQ_AMQP_USERNAME || 'selenium-client'
31+
let mqttClient
32+
33+
before(async function () {
34+
driver = buildDriver()
35+
await goToHome(driver)
36+
login = new LoginPage(driver)
37+
overview = new OverviewPage(driver)
38+
queuePage = new QueuePage(driver)
39+
connectionsPage = new ConnectionsPage(driver)
40+
queuesAndStreamsPage = new QueuesAndStreamsPage(driver)
41+
captureScreen = captureScreensFor(driver, __filename)
42+
43+
await login.login('management', 'guest')
44+
if (!await overview.isLoaded()) {
45+
throw new Error('Failed to login')
46+
}
47+
//await overview.selectRefreshOption("Do not refresh")
48+
49+
queueName = "test_" + Math.floor(Math.random() * 1000)
50+
createQueue(getManagementUrl(), basicAuthorization("management", "guest"),
51+
"/", queueName, {
52+
"x-queue-type": "quorum"
53+
})
54+
55+
mqttOptions = {
56+
clientId: client_id,
57+
protocolId: 'MQTT',
58+
protocol: mqttProtocol,
59+
protocolVersion: 5,
60+
keepalive: 10000,
61+
clean: false,
62+
reconnectPeriod: '1000',
63+
properties: {
64+
sessionExpiryInterval: 0
65+
}
66+
}
67+
if (mqttProtocol == 'mqtts') {
68+
mqttOptions["ca"] = [fs.readFileSync(process.env.RABBITMQ_CERTS + "/ca_rabbitmq_certificate.pem")]
69+
}
70+
if (usemtls) {
71+
mqttOptions["cert"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_certificate.pem")
72+
mqttOptions["key"] = fs.readFileSync(process.env.RABBITMQ_CERTS + "/client_rabbitmq_key.pem")
73+
} else {
74+
mqttOptions["username"] = username
75+
mqttOptions["password"] = password
76+
}
77+
78+
mqttClient = mqtt.connect(mqttUrl, mqttOptions)
79+
let subscribed = new Promise((resolve, reject) => {
80+
mqttClient.on('error', function(err) {
81+
reject(err)
82+
assert.fail("Mqtt connection failed due to " + err)
83+
}),
84+
mqttClient.on('connect', function(err) {
85+
mqttClient.subscribe(queueName, {qos:0}, function (err2) {
86+
if (!err2) {
87+
resolve("ok")
88+
}else {
89+
reject(err2)
90+
}
91+
})
92+
})
93+
})
94+
assert.equal("ok", await subscribed)
95+
96+
})
97+
98+
it('should be an mqtt connection listed', async function () {
99+
await overview.clickOnConnectionsTab()
100+
101+
let table = await doUntil(async function() {
102+
return connectionsPage.getConnectionsTable()
103+
}, function(table) {
104+
return table.length > 0
105+
}, 6000)
106+
assert.equal(table[0][5], "MQTT 5-0")
107+
108+
})
109+
110+
it('should be an mqtt qos0 queue listed', async function () {
111+
await overview.clickOnQueuesTab()
112+
113+
await doUntil(function() {
114+
return queuesAndStreamsPage.getQueuesTable()
115+
}, function(table) {
116+
return findTableRow(table, function(row) {
117+
return row[2] === 'rabbit_mqtt_qos0_queue'
118+
})
119+
})
120+
121+
})
122+
123+
it('can view mqtt qos0 queue', async function () {
124+
await overview.clickOnQueuesTab()
125+
126+
let table = await doUntil(function() {
127+
return queuesAndStreamsPage.getQueuesTable()
128+
}, function(t) {
129+
return findTableRow(t, function(row) {
130+
return row[2] === 'rabbit_mqtt_qos0_queue'
131+
})
132+
})
133+
let mqttQueueName = findTableRow(table, function(row) {
134+
return row[2] === 'rabbit_mqtt_qos0_queue'
135+
})[1]
136+
137+
await goToQueue(driver, "/", mqttQueueName)
138+
await queuePage.isLoaded()
139+
140+
})
141+
142+
after(async function () {
143+
await teardown(driver, this, captureScreen)
144+
if (mqttClient) mqttClient.end()
145+
deleteQueue(getManagementUrl(), basicAuthorization("management", "guest"),
146+
"/", queueName)
147+
})
148+
})

selenium/test/queuesAndStreams/view-qq-consumers.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe('Given a quorum queue configured with SAC', function () {
8383
ch1Consumer = ch1.consume(queueName, (msg) => {}, {consumerTag: "one"})
8484
})
8585

86-
it('it should have one consumer as active', async function() {
86+
it('it should have one consumer listed as active', async function() {
8787
await doUntil(async function() {
8888
await queuePage.refresh()
8989
await queuePage.isLoaded()
@@ -111,7 +111,7 @@ describe('Given a quorum queue configured with SAC', function () {
111111
ch2Consumer = ch2.consume(queueName, (msg) => {}, {consumerTag: "two", priority: 10})
112112
})
113113

114-
it('the latter consumer should be active and the former waiting', async function() {
114+
it('the latter consumer should be listed as active and the former waiting', async function() {
115115

116116
await doUntil(async function() {
117117
await queuePage.refresh()
@@ -177,7 +177,7 @@ describe('Given a quorum queue configured with SAC', function () {
177177
ch1Consumer = ch1.consume(queueName, (msg) => {}, {consumerTag: "one", priority: 10})
178178
})
179179

180-
it('it should have one consumer as active', async function() {
180+
it('it should have one consumer listed as active', async function() {
181181
await doUntil(async function() {
182182
await queuePage.refresh()
183183
await queuePage.isLoaded()

selenium/test/utils.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,18 @@ module.exports = {
126126
return d.driver.get(d.baseUrl + '#/login?access_token=' + token)
127127
},
128128

129+
goToConnections: (d) => {
130+
return d.driver.get(d.baseUrl + '#/connections')
131+
},
132+
129133
goToExchanges: (d) => {
130134
return d.driver.get(d.baseUrl + '#/exchanges')
131135
},
132136

137+
goToQueues: (d) => {
138+
return d.driver.get(d.baseUrl + '#/queues')
139+
},
140+
133141
goToQueue(d, vhost, queue) {
134142
return d.driver.get(d.baseUrl + '#/queues/' + encodeURIComponent(vhost) + '/' + encodeURIComponent(queue))
135143
},

0 commit comments

Comments
 (0)