From 959082d21f3c07ac390eee2eb7bb7842db2f6007 Mon Sep 17 00:00:00 2001 From: skadefro Date: Wed, 3 Jan 2024 13:23:57 +0100 Subject: [PATCH 01/15] bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 104c0d70..987c98d8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.7.20", + "version": "1.5.8.1.1", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 26b1732fcccfccbf6a6f1d829fd4a1a499a34050 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 9 Jan 2024 00:02:41 +0100 Subject: [PATCH 02/15] test using exchanges --- OpenFlow/src/DatabaseConnection.ts | 41 ++++++++++++++++++++---------- package.json | 2 +- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/OpenFlow/src/DatabaseConnection.ts b/OpenFlow/src/DatabaseConnection.ts index bd16ac9f..e271ee88 100644 --- a/OpenFlow/src/DatabaseConnection.ts +++ b/OpenFlow/src/DatabaseConnection.ts @@ -168,17 +168,31 @@ export class DatabaseConnection extends events.EventEmitter { Logger.instanse.silly("Really connected to mongodb", span); const errEvent = (error) => { this.isConnected = false; + this.streams = []; + Logger.instanse.info("mongodb.error", span); + Logger.instanse.error(error, span); + this.emit("disconnected"); + } + const parseErrEvent = (error) => { + this.isConnected = false; + this.streams = []; + Logger.instanse.info("mongodb.parseError", span); Logger.instanse.error(error, span); this.emit("disconnected"); } const closeEvent = () => { + this.streams = []; this.isConnected = false; + Logger.instanse.info("mongodb.close", span); Logger.instanse.silly("Disconnected from mongodb", span); this.emit("disconnected"); } this.cli + .on('connectionReady', () => { + Logger.instanse.info("mongodb.connectionRead", span); + }) .on('error', errEvent) - .on('parseError', errEvent) + .on('parseError', parseErrEvent) .on('timeout', errEvent) .on('close', closeEvent) // .on("commandStarted", (event) => { @@ -251,14 +265,7 @@ export class DatabaseConnection extends events.EventEmitter { // Logger.instanse.debug("supports_watch: " + Config.supports_watch, span); // if (Config.supports_watch && this.registerGlobalWatches) { // let collections = await DatabaseConnection.toArray(this.db.listCollections()); - let collections = await Logger.DBHelper.GetCollections(span); - collections = collections.filter(x => x.name.indexOf("system.") === -1); - for (var c = 0; c < collections.length; c++) { - if(["agents", "config", "mq", "nodered", "openrpa", "users", "workflow", "workitems"].indexOf(collections[c].name) == -1) continue; - if (collections[c].type != "collection") continue; - if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue; - this.registerGlobalWatch(collections[c].name, span); - } + this.doRegisterGlobalWatches(span); // } this.isConnected = true; Logger.otel.endSpan(span); @@ -598,6 +605,16 @@ export class DatabaseConnection extends events.EventEmitter { if (!discardspan) span?.end(); } } + async doRegisterGlobalWatches(span: Span) { + let collections = await Logger.DBHelper.GetCollections(span); + collections = collections.filter(x => x.name.indexOf("system.") === -1); + for (var c = 0; c < collections.length; c++) { + if(["agents", "config", "mq", "nodered", "openrpa", "users", "workflow", "workitems"].indexOf(collections[c].name) == -1) continue; + if (collections[c].type != "collection") continue; + if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue; + this.registerGlobalWatch(collections[c].name, span); + } + } registerGlobalWatch(collectionname: string, span: Span) { if (!this.registerGlobalWatches) return; span?.addEvent("registerGlobalWatch", { collection: collectionname }); @@ -4896,11 +4913,7 @@ export class DatabaseConnection extends events.EventEmitter { // if (Config.supports_watch) { Logger.instanse.info("Register global watches for each collection", span); - for (var c = 0; c < collections.length; c++) { - if (collections[c].type != "collection") continue; - if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue; - this.registerGlobalWatch(collections[c].name, span); - } + this.doRegisterGlobalWatches(span); // } DatabaseConnection.timeseries_collections = []; diff --git a/package.json b/package.json index 987c98d8..cbabdef3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.1.1", + "version": "1.5.8.2", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 29230d1cc5f8c399a5af754fa4ad341456c23379 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 9 Jan 2024 11:56:13 +0100 Subject: [PATCH 03/15] Add pre-register logic for exchanges --- OpenFlow/src/DatabaseConnection.ts | 10 ++++- OpenFlow/src/amqpwrapper.ts | 63 ++++++++++++++++++++++++++++++ OpenFlow/src/index.ts | 9 +++++ package.json | 2 +- 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/DatabaseConnection.ts b/OpenFlow/src/DatabaseConnection.ts index e271ee88..8e1a1189 100644 --- a/OpenFlow/src/DatabaseConnection.ts +++ b/OpenFlow/src/DatabaseConnection.ts @@ -189,7 +189,6 @@ export class DatabaseConnection extends events.EventEmitter { } this.cli .on('connectionReady', () => { - Logger.instanse.info("mongodb.connectionRead", span); }) .on('error', errEvent) .on('parseError', parseErrEvent) @@ -2057,6 +2056,9 @@ export class DatabaseConnection extends events.EventEmitter { // @ts-ignore item._id = result.insertedId; + if (collectionname === "mq" && item._type === "exchange") { + await amqpwrapper.Instance().PreRegisterExchange(item, span); + } if (collectionname === "users" && item._type === "user") { Base.addRight(item, item._id, item.name, [Rights.read, Rights.update, Rights.invoke]); @@ -2333,6 +2335,9 @@ export class DatabaseConnection extends events.EventEmitter { if (item._type == "exchange") item.name = item.name.toLowerCase(); if (item._type == "queue") item.name = item.name.toLowerCase(); if (item._type == "workitemqueue") { hadWorkitemQueue = true; wiqids.push(item._id); } + if (item._type === "exchange") { + await amqpwrapper.Instance().PreRegisterExchange(item, span); + } } if (collectionname == "workitems" && item._type == "workitem") { // @ts-ignore @@ -2988,6 +2993,9 @@ export class DatabaseConnection extends events.EventEmitter { if (!NoderedUtil.IsNullEmpty(q.item.name)) { if (q.item._type == "exchange") q.item.name = q.item.name.toLowerCase(); if (q.item._type == "queue") q.item.name = q.item.name.toLowerCase(); + if (q.item._type === "exchange") { + await amqpwrapper.Instance().PreRegisterExchange(q.item, span); + } } } if (!DatabaseConnection.usemetadata(q.collectionname)) { diff --git a/OpenFlow/src/amqpwrapper.ts b/OpenFlow/src/amqpwrapper.ts index 63867e88..5bed974f 100644 --- a/OpenFlow/src/amqpwrapper.ts +++ b/OpenFlow/src/amqpwrapper.ts @@ -347,6 +347,69 @@ export class amqpwrapper extends events.EventEmitter { Logger.otel.endSpan(span); } } + async checkAndDeleteExchange(exchangeName) { + let conn = await amqplib.connect(this.connectionstring); + try { + const channel = await conn.createChannel(); + try { + // Try to check if exchange exists by declaring it passively + await channel.checkExchange(exchangeName); + + // If no error is thrown, exchange exists, so delete it + await channel.deleteExchange(exchangeName); + console.log(`Exchange '${exchangeName}' deleted.`); + } catch (err) { + // Error means exchange does not exist + console.log(`Exchange '${exchangeName}' does not exist or there was an error checking it.`); + } + } catch (error) { + console.error('Error connecting to RabbitMQ:', error); + } finally { + conn.close(); + } + } + async PreAssertExchange(exchangeName: string, algorithm: string, ExchangeOptions: any): Promise { + let conn = await amqplib.connect(this.connectionstring); + try { + const channel = await conn.createChannel(); + try { + const _ok = await channel.assertExchange(exchangeName, algorithm, ExchangeOptions); + console.log(`Exchange '${exchangeName}' exists.`); + return true; + } catch (err) { + // Error means exchange does not exist + console.log(`Exchange '${exchangeName}' has wrong config`); + return false; + } + } catch (error) { + console.error('Error connecting to RabbitMQ:', error); + } finally { + conn.close(); + } + + } + async PreRegisterExchange(exchange: any, parent: Span) { + if(exchange.name == "openflow") { + return + } + // @ts-ignore + let { algorithm, routingkey, exclusive } = exchange; + if(algorithm == null || algorithm == "") algorithm = "fanout" + if(routingkey == null || routingkey == "") routingkey = "" + if(exclusive == null || exclusive == "") exclusive = true + const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions)); + AssertExchangeOptions.exclusive = exclusive; + if (exchange.name != Config.amqp_dlx && exchange.name != "openflow" && exchange.name != "openflow_logs") AssertExchangeOptions.autoDelete = true; + + // try and create exchange + if(! await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions)) { + // config differs, so delete and recreate + await this.checkAndDeleteExchange(exchange.name); + await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions); + } + // await amqpwrapper.Instance().AddExchangeConsumer( + // Crypt.rootUser(), exchange.name, algorithm, routingkey, AssertExchangeOptions, Crypt.rootToken(), false, null, parent); + } async AddExchangeConsumer(user: TokenUser | User, exchange: string, algorithm: exchangealgorithm, routingkey: string, ExchangeOptions: any, jwt: string, addqueue: boolean, callback: QueueOnMessage, parent: Span): Promise { const span: Span = Logger.otel.startSubSpan("amqpwrapper.AddExchangeConsumer", parent); try { diff --git a/OpenFlow/src/index.ts b/OpenFlow/src/index.ts index 91d09012..2bde10ac 100644 --- a/OpenFlow/src/index.ts +++ b/OpenFlow/src/index.ts @@ -348,6 +348,13 @@ async function initDatabase(parent: Span): Promise { Logger.otel.endSpan(span); } } +async function PreRegisterExchanges(span: Span) { + var exchanges = await Config.db.query({ query: { _type: "exchange" }, collectionname: "mq", jwt: Crypt.rootToken() }, span); + for(let i = 0; i < exchanges.length; i++) { + const exchange = exchanges[i]; + await amqpwrapper.Instance().PreRegisterExchange(exchange, span); + } +} process.on('beforeExit', (code) => { Logger.instanse.error(code as any, null); @@ -461,6 +468,7 @@ var server: http.Server = null; try { await Config.db.connect(span); await initamqp(span); + await PreRegisterExchanges(span); Logger.instanse.info("VERSION: " + Config.version, span); server = await WebServer.configure(Config.baseurl(), span); if (GrafanaProxy != null) { @@ -487,3 +495,4 @@ var server: http.Server = null; Logger.otel.endSpan(span); } })(); + diff --git a/package.json b/package.json index cbabdef3..6530a91f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.2", + "version": "1.5.8.3", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 1a0d87f706dd736291aa2964d43f60d5df3bb786 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 9 Jan 2024 12:06:44 +0100 Subject: [PATCH 04/15] add translations for agents page --- OpenFlow/src/public/Agent.html | 8 ++++---- OpenFlow/src/public/languages/da-DK/web.json | 19 +++++++++++++++++- OpenFlow/src/public/languages/en-US/web.json | 21 ++++++++++++++++++-- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/OpenFlow/src/public/Agent.html b/OpenFlow/src/public/Agent.html index c15b15c5..d353e072 100644 --- a/OpenFlow/src/public/Agent.html +++ b/OpenFlow/src/public/Agent.html @@ -706,7 +706,7 @@

{{ctrl.sizewarningtitle}}

- terminateIfRunning + terminateifrunning
{{ctrl.sizewarningtitle}}
- allowConcurrentRuns + allowconcurrentruns
{{ctrl.sizewarningtitle}}
+ ng-click="ctrl.removepackage(schedule)" lib="web">removeschedule
@@ -762,7 +762,7 @@

+ ng-click="ctrl.addpackage(ctrl.newpackage._id)" lib="web">addschedule

diff --git a/OpenFlow/src/public/languages/da-DK/web.json b/OpenFlow/src/public/languages/da-DK/web.json index 30362e43..14c05e62 100644 --- a/OpenFlow/src/public/languages/da-DK/web.json +++ b/OpenFlow/src/public/languages/da-DK/web.json @@ -158,5 +158,22 @@ "formresource": "Form ressource", "addformresource": "Tilføj form ressource", "chat": "Chat", - "newchat": "Ny chat" + "newchat": "Ny chat", + "pods": "pods", + "docker": "docker", + "assistant": "assistent", + "daemon": "dæmon", + "packages": "Pakker", + "addagent": "Tilføj agent", + "os": "OS", + "cpu": "CPU", + "mem": "Mem", + "arch": "arch", + "options": "Indstillinger", + "addschedule": "Tilføj tidsplan", + "cron": "cron", + "enabled": "Aktiveret", + "terminateifrunning": "Afslut hvis kørende", + "allowconcurrentruns": "Tillad samtidige kørsler", + "removeschedule": "Fjern tidsplan" } \ No newline at end of file diff --git a/OpenFlow/src/public/languages/en-US/web.json b/OpenFlow/src/public/languages/en-US/web.json index ad18817a..ea2e014d 100644 --- a/OpenFlow/src/public/languages/en-US/web.json +++ b/OpenFlow/src/public/languages/en-US/web.json @@ -157,5 +157,22 @@ "formresource": "Form resource", "addformresource": "Add form resource", "chat": "Chat", - "newchat": "New chat" -} \ No newline at end of file + "newchat": "New chat", + "pods": "pods", + "docker": "docker", + "assistant": "assistant", + "daemon": "daemon", + "packages": "Packages", + "addagent": "Add agent", + "os": "OS", + "cpu": "CPU", + "mem": "Mem", + "arch": "arch", + "options": "options", + "addschedule": "Add schedule", + "cron": "cron", + "enabled": "enabled", + "terminateifrunning": "Terminate if running", + "allowconcurrentruns": "Allow concurrent runs", + "removeschedule": "Remove schedule" + } \ No newline at end of file From 3b0e793bbcfbb875505132b97e2f40cc20950006 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 9 Jan 2024 12:18:22 +0100 Subject: [PATCH 05/15] force autoDelete false on exchanges --- OpenFlow/src/amqpwrapper.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/OpenFlow/src/amqpwrapper.ts b/OpenFlow/src/amqpwrapper.ts index 5bed974f..18d2038f 100644 --- a/OpenFlow/src/amqpwrapper.ts +++ b/OpenFlow/src/amqpwrapper.ts @@ -399,7 +399,8 @@ export class amqpwrapper extends events.EventEmitter { if(exclusive == null || exclusive == "") exclusive = true const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions)); AssertExchangeOptions.exclusive = exclusive; - if (exchange.name != Config.amqp_dlx && exchange.name != "openflow" && exchange.name != "openflow_logs") AssertExchangeOptions.autoDelete = true; + // if (exchange.name != Config.amqp_dlx && exchange.name != "openflow" && exchange.name != "openflow_logs") AssertExchangeOptions.autoDelete = true; + AssertExchangeOptions.autoDelete = false; // try and create exchange if(! await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions)) { From 01842306ac978ac2a2e22bff9c174c2323ef7735 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 9 Jan 2024 12:57:35 +0100 Subject: [PATCH 06/15] fix missing autoDelete force --- OpenFlow/src/amqpwrapper.ts | 10 +++++++++- package.json | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/amqpwrapper.ts b/OpenFlow/src/amqpwrapper.ts index 18d2038f..0413ebf1 100644 --- a/OpenFlow/src/amqpwrapper.ts +++ b/OpenFlow/src/amqpwrapper.ts @@ -418,7 +418,8 @@ export class amqpwrapper extends events.EventEmitter { if (this.channel == null || this.conn == null) throw new Error("Cannot Add new Exchange Consumer, not connected to rabbitmq"); const q: amqpexchange = new amqpexchange(); q.ExchangeOptions = Object.assign({}, (ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions)); - if (exchange != Config.amqp_dlx && exchange != "openflow" && exchange != "openflow_logs") q.ExchangeOptions.autoDelete = true; + // if (exchange != Config.amqp_dlx && exchange != "openflow" && exchange != "openflow_logs") q.ExchangeOptions.autoDelete = true; + q.ExchangeOptions.autoDelete = false; q.exchange = exchange; q.algorithm = algorithm; q.routingkey = routingkey; q.callback = callback; const _ok = await this.channel.assertExchange(q.exchange, q.algorithm, q.ExchangeOptions); if (addqueue) { @@ -538,6 +539,10 @@ export class amqpwrapper extends events.EventEmitter { WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue }); } else { if (NoderedUtil.IsNullEmpty(routingkey)) routingkey = ""; + if(exchange != "openflow" && exchange != "openflow_logs") { + console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId); + } + this.PreRegisterExchange this.channel.publish(exchange, routingkey, Buffer.from(data), options); } } @@ -585,6 +590,9 @@ export class amqpwrapper extends events.EventEmitter { if (!NoderedUtil.IsNullUndefinded(WebSocketServer.websocket_queue_message_count)) WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue }); } else { + if(exchange != "openflow" && exchange != "openflow_logs") { + console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId); + } this.channel.publish(exchange, routingkey, Buffer.from(data), options); } } diff --git a/package.json b/package.json index 6530a91f..fb4f322f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.3", + "version": "1.5.8.5", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 604a68b3e2c0e537d1149c2373d5a837a27f3aad Mon Sep 17 00:00:00 2001 From: skadefro Date: Thu, 11 Jan 2024 13:51:29 +0100 Subject: [PATCH 07/15] update nodeapi, to fix addqueue option --- OpenFlow/src/Messages/Message.ts | 1 + OpenFlow/src/proto/client.ts | 2 +- package.json | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index 7bb0930f..4e50f50f 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -896,6 +896,7 @@ export class Message { var res = await cli.RegisterExchange(tuser, msg.exchangename, msg.algorithm, msg.routingkey, addqueue, parent); msg.queuename = res.queuename; msg.exchangename = res.exchangename; + if(msg.queuename == null) msg.queuename = ""; delete msg.jwt; this.data = JSON.stringify(msg); } diff --git a/OpenFlow/src/proto/client.ts b/OpenFlow/src/proto/client.ts index 1fc27138..f7edfd24 100644 --- a/OpenFlow/src/proto/client.ts +++ b/OpenFlow/src/proto/client.ts @@ -321,7 +321,7 @@ export class flowclient extends client { } for (let i = this._exchanges.length - 1; i >= 0; i--) { const e = this._exchanges[i]; - if (e && (e.queue != null && e.queue.queue == queuename || e.queue.queuename == queuename)) { + if (e && (e.queue != null && e.queue?.queue == queuename || e.queue?.queuename == queuename)) { try { amqpwrapper.Instance().RemoveQueueConsumer(user, this._exchanges[i].queue, span).catch((err) => { Logger.instanse.error(err, span, Logger.parsecli(this as any)); diff --git a/package.json b/package.json index fb4f322f..e252d909 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.5", + "version": "1.5.8.6", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { @@ -38,7 +38,7 @@ "@fortawesome/fontawesome-free": "^5.15.3", "@kubernetes/client-node": "^0.19.0", "@node-saml/passport-saml": "^4.0.1", - "@openiap/nodeapi": "^0.0.31", + "@openiap/nodeapi": "^0.0.33", "@openiap/openflow-api": "^2.1.11", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.43.0", "@opentelemetry/sdk-node": "^0.43.0", From fddb29b1553a318caaa02571c75825f2305d1600 Mon Sep 17 00:00:00 2001 From: skadefro Date: Thu, 11 Jan 2024 23:47:22 +0100 Subject: [PATCH 08/15] fix grpc not understanding null --- OpenFlow/src/Messages/Message.ts | 14 +++++++++++++- package.json | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index 4e50f50f..ed282090 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -2044,6 +2044,11 @@ export class Message { if (Config.otel_trace_interval > 0) msg.otel_trace_interval = Config.otel_trace_interval; if (Config.otel_metric_interval > 0) msg.otel_metric_interval = Config.otel_metric_interval; msg.enable_analytics = Config.enable_analytics; + if(msg.user != null) { + if(msg.user.email == null || msg.user.email == "") { + msg.user.email = ""; + } + } this.data = JSON.stringify(msg); // hrend = process.hrtime(hrstart) } finally { @@ -5042,13 +5047,16 @@ export class Message { await this.DuplicateWorkitem(wi, failed_wiq, failed_wiqid, this.jwt, parent); } } + if(msg.result != null) { + if(msg.result.nextrun == null) delete msg.result.nextrun; + if(msg.result.lastrun == null) delete msg.result.lastrun; + } delete msg.jwt; this.data = JSON.stringify(msg); } - async PopWorkitem(parent: Span): Promise { this.Reply(); let msg: PopWorkitemMessage; @@ -5141,6 +5149,10 @@ export class Message { // } // } delete msg.jwt; + if(msg.result != null) { + if(msg.result.nextrun == null) delete msg.result.nextrun; + if(msg.result.lastrun == null) delete msg.result.lastrun; + } this.data = JSON.stringify(msg); } diff --git a/package.json b/package.json index e252d909..37c9cb7c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.6", + "version": "1.5.8.10", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 28a8fc832e6d25129869f758962c57d4fc0628f9 Mon Sep 17 00:00:00 2001 From: skadefro Date: Fri, 12 Jan 2024 00:30:49 +0100 Subject: [PATCH 09/15] re add ugly hack --- OpenFlow/src/WebServer.ts | 5 ++++- package.json | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/OpenFlow/src/WebServer.ts b/OpenFlow/src/WebServer.ts index 1ae8e1bf..15fa4837 100644 --- a/OpenFlow/src/WebServer.ts +++ b/OpenFlow/src/WebServer.ts @@ -24,7 +24,7 @@ import { flowclient } from "./proto/client"; import { WebSocketServer } from "./WebSocketServer"; import { Message } from "./Messages/Message"; import { GridFSBucket, ObjectId } from "mongodb"; -import { config, protowrap, GetElementResponse, UploadResponse, DownloadResponse, BeginStream, EndStream, Stream, ErrorResponse, Workitem } from "@openiap/nodeapi"; +import { config, protowrap, GetElementResponse, UploadResponse, DownloadResponse, BeginStream, EndStream, Stream, ErrorResponse, Workitem, RegisterExchangeRequest } from "@openiap/nodeapi"; const { info, warn, err } = config; import { Any } from "@openiap/nodeapi/lib/proto/google/protobuf/any"; import { Timestamp } from "@openiap/nodeapi/lib/proto/google/protobuf/timestamp"; @@ -431,6 +431,9 @@ export class WebServer { try { [command, msg, reply] = protowrap.unpack(message); if(message.command == "") throw new Error("Invalid/empty command"); + if(command == "registerexchange") { + msg = RegisterExchangeRequest.decode(message.data.value); + } } catch (error) { err(error); message.command = "error"; diff --git a/package.json b/package.json index 37c9cb7c..8f635fb7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.10", + "version": "1.5.8.11", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { @@ -38,7 +38,7 @@ "@fortawesome/fontawesome-free": "^5.15.3", "@kubernetes/client-node": "^0.19.0", "@node-saml/passport-saml": "^4.0.1", - "@openiap/nodeapi": "^0.0.33", + "@openiap/nodeapi": "^0.0.35", "@openiap/openflow-api": "^2.1.11", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.43.0", "@opentelemetry/sdk-node": "^0.43.0", From a6b566238aa6c5d082b6620546f2df7fbd97d680 Mon Sep 17 00:00:00 2001 From: skadefro Date: Fri, 12 Jan 2024 11:58:41 +0100 Subject: [PATCH 10/15] add package Reinstall button --- OpenFlow/src/public/Controllers.ts | 13 +++++++++++++ OpenFlow/src/public/RunPackage.html | 4 +++- OpenFlow/src/public/languages/da-DK/web.json | 2 ++ OpenFlow/src/public/languages/en-US/web.json | 2 ++ package.json | 2 +- 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/public/Controllers.ts b/OpenFlow/src/public/Controllers.ts index f52edbd1..4d0e6ddf 100644 --- a/OpenFlow/src/public/Controllers.ts +++ b/OpenFlow/src/public/Controllers.ts @@ -8849,6 +8849,19 @@ export class RunPackageCtrl extends entityCtrl { await NoderedUtil.Queue({ data: payload, queuename: _a.slug + "agent", correlationId: streamid }) } + async Reinstall() : Promise { + var _a = this.agents.find(x => x._id == this.id); + if (_a == null) return; + const streamid = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); + var payload ={ + "command": "reinstallpackage", + "id": this.package, + "stream": true, + "queuename": this.queuename + } + await NoderedUtil.Queue({ data: payload, queuename: _a.slug + "agent", correlationId: streamid }) + + } async submit(): Promise { var _a = this.agents.find(x => x._id == this.id); if (_a == null) return; diff --git a/OpenFlow/src/public/RunPackage.html b/OpenFlow/src/public/RunPackage.html index e3a63acd..a183ba2b 100644 --- a/OpenFlow/src/public/RunPackage.html +++ b/OpenFlow/src/public/RunPackage.html @@ -69,7 +69,9 @@
+ lib="web">run +
diff --git a/OpenFlow/src/public/languages/da-DK/web.json b/OpenFlow/src/public/languages/da-DK/web.json index 14c05e62..a979566b 100644 --- a/OpenFlow/src/public/languages/da-DK/web.json +++ b/OpenFlow/src/public/languages/da-DK/web.json @@ -52,6 +52,8 @@ "webserver": "WebServer", "sleep": "Sleep", "runas": "Run as", + "run": "Run", + "reinstall": "Geninstaller", "openrpa": "OpenRPA", "audit": "Log", "empty": "Tøm", diff --git a/OpenFlow/src/public/languages/en-US/web.json b/OpenFlow/src/public/languages/en-US/web.json index ea2e014d..c319127c 100644 --- a/OpenFlow/src/public/languages/en-US/web.json +++ b/OpenFlow/src/public/languages/en-US/web.json @@ -52,6 +52,8 @@ "webserver": "WebServer", "sleep": "Sleep", "runas": "Run as", + "run": "Run", + "reinstall": "Reinstall", "openrpa": "OpenRPA", "audit": "Audit", "empty": "Empty", diff --git a/package.json b/package.json index 8f635fb7..0ee859d8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.11", + "version": "1.5.8.12", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { From 3dba9c2749e693acb1d90b95c949ee0f75ecb4a6 Mon Sep 17 00:00:00 2001 From: skadefro Date: Sat, 13 Jan 2024 13:17:17 +0100 Subject: [PATCH 11/15] skip enable_openflow_amqp on dev --- OpenFlow/src/Config.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/Config.ts b/OpenFlow/src/Config.ts index a8647071..82cedb78 100644 --- a/OpenFlow/src/Config.ts +++ b/OpenFlow/src/Config.ts @@ -69,7 +69,7 @@ export class dbConfig extends Base { "NODE_ENV", "validate_emails", "amqp_url", "port", "saml_issuer", "saml_federation_metadata", "api_ws_url", "domain", "enable_openapi", "enable_openapiauth", "ping_clients_interval", "tls_crt", "tls_key", "tls_ca", "otel_metric_url", "otel_trace_url", "multi_tenant", "auto_hourly_housekeeping", "housekeeping_skip_calculate_size", "housekeeping_skip_update_user_size", - "stripe_api_secret", "stripe_api_key"].indexOf(key) > -1 ) { + "stripe_api_secret", "stripe_api_key", "enable_openflow_amqp"].indexOf(key) > -1 ) { if(os.hostname().toLowerCase() == "nixos") { continue; @@ -121,7 +121,7 @@ export class dbConfig extends Base { "NODE_ENV", "validate_emails", "amqp_url", "port", "saml_issuer", "saml_federation_metadata", "api_ws_url", "domain", "enable_openapi", "enable_openapiauth", "ping_clients_interval", "tls_crt", "tls_key", "tls_ca", "otel_metric_url", "otel_trace_url", "multi_tenant", "auto_hourly_housekeeping", "housekeeping_skip_calculate_size", "housekeeping_skip_update_user_size", - "stripe_api_secret", "stripe_api_key" ].indexOf(key) > -1 ) { + "stripe_api_secret", "stripe_api_key", "enable_openflow_amqp" ].indexOf(key) > -1 ) { if(os.hostname().toLowerCase() == "nixos") { continue; } From c5eae24d883f17a95018ee5bb08815830499202d Mon Sep 17 00:00:00 2001 From: skadefro Date: Sat, 13 Jan 2024 13:17:29 +0100 Subject: [PATCH 12/15] remove debug messages --- OpenFlow/src/amqpwrapper.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/OpenFlow/src/amqpwrapper.ts b/OpenFlow/src/amqpwrapper.ts index 0413ebf1..837d96d3 100644 --- a/OpenFlow/src/amqpwrapper.ts +++ b/OpenFlow/src/amqpwrapper.ts @@ -357,7 +357,7 @@ export class amqpwrapper extends events.EventEmitter { // If no error is thrown, exchange exists, so delete it await channel.deleteExchange(exchangeName); - console.log(`Exchange '${exchangeName}' deleted.`); + // console.log(`Exchange '${exchangeName}' deleted.`); } catch (err) { // Error means exchange does not exist console.log(`Exchange '${exchangeName}' does not exist or there was an error checking it.`); @@ -374,7 +374,7 @@ export class amqpwrapper extends events.EventEmitter { const channel = await conn.createChannel(); try { const _ok = await channel.assertExchange(exchangeName, algorithm, ExchangeOptions); - console.log(`Exchange '${exchangeName}' exists.`); + // console.log(`Exchange '${exchangeName}' exists.`); return true; } catch (err) { // Error means exchange does not exist @@ -540,7 +540,7 @@ export class amqpwrapper extends events.EventEmitter { } else { if (NoderedUtil.IsNullEmpty(routingkey)) routingkey = ""; if(exchange != "openflow" && exchange != "openflow_logs") { - console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId); + // console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId); } this.PreRegisterExchange this.channel.publish(exchange, routingkey, Buffer.from(data), options); @@ -591,7 +591,7 @@ export class amqpwrapper extends events.EventEmitter { WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue }); } else { if(exchange != "openflow" && exchange != "openflow_logs") { - console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId); + // console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId); } this.channel.publish(exchange, routingkey, Buffer.from(data), options); } From 06bd069c0c5084f112b2ecdad9a9ab2ea089ac95 Mon Sep 17 00:00:00 2001 From: skadefro Date: Sun, 14 Jan 2024 19:54:36 +0100 Subject: [PATCH 13/15] add metadata for uploadfile --- OpenFlow/src/WebServer.ts | 9 ++++++++- package.json | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/OpenFlow/src/WebServer.ts b/OpenFlow/src/WebServer.ts index 15fa4837..b8b4f150 100644 --- a/OpenFlow/src/WebServer.ts +++ b/OpenFlow/src/WebServer.ts @@ -335,12 +335,19 @@ export class WebServer { return new Promise((resolve, reject) => { const bucket = new GridFSBucket(Config.db.db); var metadata = new Base(); - metadata.name = msg.filename; metadata._acl = []; metadata._createdby = "root"; metadata._createdbyid = WellknownIds.root; metadata._modifiedby = "root"; metadata._modifiedbyid = WellknownIds.root; + if(msg.metadata != null && msg.metadata != null) { + try { + metadata = Object.assign(metadata, JSON.parse(msg.metadata)); + } catch (error) { + Logger.instanse.error(error, null); + } + } + if(metadata.name == null || metadata.name == "") metadata.name = msg.filename; if(client.user) { Base.addRight(metadata, client.user._id , client.user.name, [Rights.full_control]); diff --git a/package.json b/package.json index 0ee859d8..c303039c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.12", + "version": "1.5.8.13", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": { @@ -38,7 +38,7 @@ "@fortawesome/fontawesome-free": "^5.15.3", "@kubernetes/client-node": "^0.19.0", "@node-saml/passport-saml": "^4.0.1", - "@openiap/nodeapi": "^0.0.35", + "@openiap/nodeapi": "^0.0.37", "@openiap/openflow-api": "^2.1.11", "@opentelemetry/exporter-metrics-otlp-grpc": "^0.43.0", "@opentelemetry/sdk-node": "^0.43.0", From 9ab72b7a7929d192a4b48a48a0f9e80597aa4458 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 16 Jan 2024 11:07:51 +0100 Subject: [PATCH 14/15] Add crud parameter parsing for RPAWorkflowCtrl --- OpenFlow/src/public/Controllers.ts | 77 +++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/OpenFlow/src/public/Controllers.ts b/OpenFlow/src/public/Controllers.ts index 4d0e6ddf..22e2579b 100644 --- a/OpenFlow/src/public/Controllers.ts +++ b/OpenFlow/src/public/Controllers.ts @@ -1413,21 +1413,94 @@ export class RPAWorkflowCtrl extends entityCtrl { }); if (!this.$scope.$$phase) { this.$scope.$apply(); } } + public parseBoolean(s: any): boolean { + let val: string; + if (typeof s === "number") { + val = s.toString(); + } else if (typeof s === "string") { + val = s.toLowerCase().trim(); + } else if (typeof s === "boolean") { + val = s.toString(); + } else { + throw new Error("Unknown type!"); + } + switch (val) { + case "true": case "yes": case "1": return true; + case "false": case "no": case "0": case null: return false; + default: return Boolean(s); + } + } async submit(): Promise { try { this.errormessage = ""; + if (this.arguments === null || this.arguments === undefined) { this.arguments = {}; } + + var keys = Object.keys(this.arguments); + for(let i = 0; i < keys.length; i++) { + const key = keys[i]; + const param = this.model.Parameters.find(x=> x.name == key); + if(param && param.type == "System.String") this.arguments[key] = this.arguments[key] ?? ""; + if(param && param.type == "System.Int32") this.arguments[key] = parseInt(this.arguments[key]); + if(param && param.type == "System.Boolean") this.arguments[key] = this.parseBoolean(this.arguments[key]); + if(param && param.type == "System.DateTime") { + if(this.arguments[key] != null && this.arguments[key] != "") { + this.arguments[key] = new Date(this.arguments[key]).toISOString(); + } else { + this.arguments[key] = undefined; + } + } + if(param && param.type == "System.String[]" && Array.isArray(this.arguments[key]) == false ) { + var arr = this.arguments[key].split(","); + this.arguments[key] = arr; + } + if(param && param.type == "System.Int32[]" && Array.isArray(this.arguments[key]) == false ) { + var arr = this.arguments[key].split(","); + arr = arr.map(x=> parseInt(x)); + this.arguments[key] = arr; + } + if(param && param.type == "System.Boolean[]" && Array.isArray(this.arguments[key]) == false ) { + var arr = this.arguments[key].split(","); + arr = arr.map(x=> this.parseBoolean(x)); + this.arguments[key] = arr; + } + if(param && param.type == "System.DateTime[]" && Array.isArray(this.arguments[key]) == false ) { + var arr = this.arguments[key].split(","); + arr = arr.map(x=> new Date(x).toISOString()); + this.arguments[key] = arr; + } + console.log(key, this.arguments[key]) + } + const rpacommand = { command: "invoke", workflowid: this.model._id, - data: this.arguments + data: {...this.arguments} + } + for(let i = 0; i < keys.length; i++) { + const key = keys[i]; + const param = this.model.Parameters.find(x=> x.name == key); + // console.log(key, this.arguments[key]) + if(param && param.type == "System.String[]" && Array.isArray(this.arguments[key]) == true ) { + this.arguments[key] = this.arguments[key].join(","); + } + if(param && param.type == "System.Int32[]" && Array.isArray(this.arguments[key]) == true ) { + this.arguments[key] = this.arguments[key].join(","); + } + if(param && param.type == "System.Boolean[]" && Array.isArray(this.arguments[key]) == true ) { + this.arguments[key] = this.arguments[key].join(","); + } + if(param && param.type == "System.DateTime[]" && Array.isArray(this.arguments[key]) == true ) { + this.arguments[key] = this.arguments[key].join(","); + } + console.log(key, this.arguments[key]) } - if (this.arguments === null || this.arguments === undefined) { this.arguments = {}; } const result: any = await NoderedUtil.Queue({ queuename: this.user._id, replyto: this.queuename, data: rpacommand, expiration: parseInt(this.timeout), striptoken: true }); try { // result = JSON.parse(result); } catch (error) { } } catch (error) { + console.error(error); this.errormessage = error.message ? error.message : error; } if (!this.$scope.$$phase) { this.$scope.$apply(); } From eb12ff68910ab3f7ac0425679fec12491b001bd3 Mon Sep 17 00:00:00 2001 From: skadefro Date: Tue, 16 Jan 2024 11:20:51 +0100 Subject: [PATCH 15/15] up --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index c303039c..1ccca95e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@openiap/openflow", - "version": "1.5.8.13", + "version": "1.5.8.14", "description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)", "main": "index.js", "scripts": {