diff --git a/nodes/ff-mqtt.html b/nodes/ff-mqtt.html
index d0c0b42..33a2a2d 100644
--- a/nodes/ff-mqtt.html
+++ b/nodes/ff-mqtt.html
@@ -158,6 +158,10 @@
+
@@ -227,12 +236,178 @@
+
+
diff --git a/nodes/ff-mqtt.js b/nodes/ff-mqtt.js
index afb9b60..e01c3ec 100644
--- a/nodes/ff-mqtt.js
+++ b/nodes/ff-mqtt.js
@@ -59,34 +59,9 @@ module.exports = function (RED) {
agent: ProxyHelper.getHTTPProxyAgent(forgeSettings.forgeURL, { timeout: 4000 })
})
- /** @type {MQTTBrokerNode} */
- const sharedBroker = new MQTTBrokerNode(mqttSettings)
-
- /* Monitor link status and attept to relink if node has users but is unlinked */
+ // /* Monitor link status and attempt to relink if node has users but is unlinked */
let linkTryCount = 0
const MAX_LINK_ATTEMPTS = 5
- sharedBroker.linkMonitorInterval = setInterval(async function () {
- if (Object.keys(sharedBroker.users).length < 1) {
- return // no users registered (yet)
- }
- if (sharedBroker.linked && !sharedBroker.linkFailed) {
- clearInterval(sharedBroker.linkMonitorInterval)
- sharedBroker.linkMonitorInterval = null
- }
- if (sharedBroker.linkFailed) {
- try {
- linkTryCount++
- await sharedBroker.link()
- linkTryCount = 0
- } catch (_err) {
- if (linkTryCount >= MAX_LINK_ATTEMPTS) {
- clearInterval(sharedBroker.linkMonitorInterval)
- sharedBroker.linkMonitorInterval = null
- sharedBroker.warn('Maximum Failed Link Attempts. Restart or redeploy to re-establish the connection.')
- }
- }
- }
- }, (Math.floor(Math.random() * 10000) + 55000)) // 55-65 seconds
const mqtt = require('mqtt')
const isUtf8 = require('is-utf8')
@@ -449,9 +424,9 @@ module.exports = function (RED) {
function updateStatus (node, allNodes) {
let setStatus = setStatusDisconnected
- if (sharedBroker?.connecting) {
+ if (node?.connecting) {
setStatus = setStatusConnecting
- } else if (sharedBroker?.connected) {
+ } else if (node?.connected) {
setStatus = setStatusConnected
}
setStatus(node, allNodes)
@@ -573,6 +548,7 @@ module.exports = function (RED) {
// #region "Broker node"
function MQTTBrokerNode (n) {
/** @type {MQTTBrokerNode} */
+ RED.nodes.createNode(this, n)
const node = this
node._initialised = false
node._initialising = false
@@ -614,6 +590,29 @@ module.exports = function (RED) {
node._linked = mqttSettings.linked || false
node._linkFailed = false
+ node.linkMonitorInterval = setInterval(async function () {
+ if (Object.keys(node.users).length < 1) {
+ return // no users registered (yet)
+ }
+ if (node.linked && !node.linkFailed) {
+ clearInterval(node.linkMonitorInterval)
+ node.linkMonitorInterval = null
+ }
+ if (node.linkFailed) {
+ try {
+ linkTryCount++
+ await node.link()
+ linkTryCount = 0
+ } catch (_err) {
+ if (linkTryCount >= MAX_LINK_ATTEMPTS) {
+ clearInterval(node.linkMonitorInterval)
+ node.linkMonitorInterval = null
+ node.warn('Maximum Failed Link Attempts. Restart or redeploy to re-establish the connection.')
+ }
+ }
+ }
+ }, (Math.floor(Math.random() * 10000) + 55000)) // 55-65 seconds
+
node.link = async function () {
if (node.linkPromise) {
return node.linkPromise // already linking, return the existing promise
@@ -679,6 +678,21 @@ module.exports = function (RED) {
settings.keepalive = mqttSettings.keepalive || 60
settings.cleansession = mqttSettings.cleansession !== false // default to true
settings.topicAliasMaximum = mqttSettings.topicAliasMaximum || 0
+ settings.birthTopic = n.birthTopic
+ settings.birthPayload = n.birthPayload
+ settings.birthRetain = n.birthRetain
+ settings.birthQos = n.birthQos
+ settings.birthMsg = n.birthMsg
+ settings.closeTopic = n.closeTopic
+ settings.closePayload = n.closePayload
+ settings.closeRetain = n.closeRetain
+ settings.closeQos = n.closeQos
+ settings.closeMsg = n.closeMsg
+ settings.willTopic = n.willTopic
+ settings.willPayload = n.willPayload
+ settings.willRetain = n.willRetain
+ settings.willQos = n.willQos
+ settings.willMsg = n.willMsg
node.setOptions(settings, true) // initial options
node._initialised = true
} catch (error) {
@@ -1091,7 +1105,6 @@ module.exports = function (RED) {
node._clientOn('error', function (error) {
})
} catch (err) {
- // eslint-disable-next-line no-console
console.error(err)
}
}
@@ -1447,12 +1460,11 @@ module.exports = function (RED) {
}
}
- // no `on` or `close` handlers for the static broker node
- // node.on('close', function (done) {
- // node.disconnect(function () {
- // done()
- // })
- // })
+ node.on('close', function (done) {
+ node.disconnect(function () {
+ done()
+ })
+ })
// fake the node.status function if it is not already defined
if (typeof node.status !== 'function') {
@@ -1523,6 +1535,8 @@ module.exports = function (RED) {
}
}
+ RED.nodes.registerType('ff-mqtt-conf', MQTTBrokerNode, {})
+
// #endregion "Broker node"
// #region MQTTIn node
@@ -1530,7 +1544,7 @@ module.exports = function (RED) {
RED.nodes.createNode(this, n)
/** @type {MQTTInNode} */const node = this
- /** @type {MQTTBrokerNode} */node.brokerConn = sharedBroker // RED.nodes.getNode(node.broker)
+ /** @type {MQTTBrokerNode} */node.brokerConn = RED.nodes.getNode(n.lwt) // RED.nodes.getNode(node.broker)
node.dynamicSubs = {}
node.isDynamic = hasProperty(n, 'inputs') && +n.inputs === 1
@@ -1759,7 +1773,7 @@ module.exports = function (RED) {
// node.payloadFormatIndicator = n.payloadFormatIndicator; //https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901111
// node.subscriptionIdentifier = n.subscriptionIdentifier;//https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117
- /** @type {MQTTBrokerNode} */node.brokerConn = sharedBroker // RED.nodes.getNode(node.broker)
+ /** @type {MQTTBrokerNode} */node.brokerConn = RED.nodes.getNode(n.lwt) // RED.nodes.getNode(node.broker)
const Actions = {
CONNECT: 'connect',
diff --git a/test/unit/ff-mqtt_spec.js b/test/unit/ff-mqtt_spec.js
index e456d4b..0869de7 100644
--- a/test/unit/ff-mqtt_spec.js
+++ b/test/unit/ff-mqtt_spec.js
@@ -105,7 +105,7 @@ describe('FF MQTT Nodes', function () {
it('should be loaded and have default values', async function () {
this.timeout = 2000
- const { flow } = buildBasicMQTTSendRecvFlow({ id: 'mqtt.in', topic: 'in_topic' }, { id: 'mqtt.out', topic: 'out_topic' })
+ const { flow } = buildBasicMQTTSendRecvFlow({ id: 'mqtt.in', topic: 'in_topic', lwt: 'broker.node' }, { id: 'mqtt.out', topic: 'out_topic', lwt: 'broker.node' })
await helper.load(mqttNodes, flow)
const mqttIn = helper.getNode('mqtt.in')
@@ -125,13 +125,13 @@ describe('FF MQTT Nodes', function () {
mqttOut.should.have.property('brokerConn').and.be.type('object')
mqttOut.brokerConn.should.equal(mqttIn.brokerConn) // should be the same broker connection
- const mqttBroker = mqttOut.brokerConn
- clearInterval(mqttBroker.linkMonitorInterval) // clear the link monitor interval so the test can exit
+ // const mqttBroker = mqttOut.brokerConn
+ // clearInterval(mqttBroker.linkMonitorInterval) // clear the link monitor interval so the test can exit
})
it('should publish with QoS 1 and retain false by default', async function () {
this.timeout = 2000
- const { flow } = buildBasicMQTTSendRecvFlow({ id: 'mqtt.in', topic: 'in_topic' }, { id: 'mqtt.out', topic: 'out_topic' })
+ const { flow } = buildBasicMQTTSendRecvFlow({ id: 'mqtt.in', topic: 'in_topic', lwt: 'broker.node' }, { id: 'mqtt.out', topic: 'out_topic', lwt: 'broker.node' })
await helper.load(mqttNodes, flow)
const mqttOut = helper.getNode('mqtt.out')
@@ -161,7 +161,7 @@ describe('FF MQTT Nodes', function () {
it('should publish with specified QoS and retain values', async function () {
this.timeout = 2000
- const { flow } = buildBasicMQTTSendRecvFlow({ id: 'mqtt.in', topic: 'in_topic' }, { id: 'mqtt.out', topic: '' })
+ const { flow } = buildBasicMQTTSendRecvFlow({ id: 'mqtt.in', topic: 'in_topic', lwt: 'broker.node' }, { id: 'mqtt.out', topic: '', lwt: 'broker.node' })
await helper.load(mqttNodes, flow)
const mqttOut = helper.getNode('mqtt.out')
@@ -823,6 +823,7 @@ function testSendRecv (brokerOptions, inNodeOptions, outNodeOptions, options, ho
function buildBasicMQTTSendRecvFlow (inOptions, outOptions) {
const inNode = buildMQTTInNode(inOptions.id, inOptions.name, inOptions.topic, inOptions, ['helper.node'])
const outNode = buildMQTTOutNode(outOptions.id, outOptions.name, outOptions.topic, outOptions)
+ const brokerNode = buildMQTTBrokerNode('broker', 'mqtt.broker', 'localhost', 1880, {})
const helper = buildNode('helper', 'helper.node', 'helper_node', {})
const catchNode = buildNode('catch', 'catch.node', 'catch_node', { scope: ['mqtt.in'] }, ['helper.node'])
return {
@@ -830,9 +831,10 @@ function buildBasicMQTTSendRecvFlow (inOptions, outOptions) {
[inNode.name]: inNode,
[outNode.name]: outNode,
[helper.name]: helper,
- [catchNode.name]: catchNode
+ [catchNode.name]: catchNode,
+ [brokerNode.name]: brokerNode
},
- flow: [inNode, outNode, helper, catchNode]
+ flow: [brokerNode, inNode, outNode, helper, catchNode]
}
}
@@ -840,7 +842,7 @@ function buildMQTTBrokerNode (id, name, brokerHost, brokerPort, options) {
// url,broker,port,clientid,autoConnect,usetls,usews,verifyservercert,compatmode,protocolVersion,keepalive,
// cleansession,sessionExpiry,topicAliasMaximum,maximumPacketSize,receiveMaximum,userProperties,userPropertiesType,autoUnsubscribe
options = options || {}
- const node = buildNode('mqtt-broker', id || 'mqtt.broker', name || 'mqtt_broker', options)
+ const node = buildNode('ff-mqtt-conf', id || 'mqtt.broker', name || 'mqtt_broker', options)
node.url = options.url
node.broker = brokerHost || options.broker || BROKER_HOST
node.port = brokerPort || options.port || BROKER_PORT