Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release 1.5.8 #291

Merged
merged 15 commits into from
Jan 16, 2024
4 changes: 2 additions & 2 deletions OpenFlow/src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class dbConfig extends Base {
"NODE_ENV", "validate_emails", "amqp_url", "port", "saml_issuer", "saml_federation_metadata", "api_ws_url",
"domain", "enable_openapi", "enable_openapiauth", "ping_clients_interval", "tls_crt", "tls_key", "tls_ca",
"otel_metric_url", "otel_trace_url", "multi_tenant", "auto_hourly_housekeeping", "housekeeping_skip_calculate_size", "housekeeping_skip_update_user_size",
"stripe_api_secret", "stripe_api_key"].indexOf(key) > -1 ) {
"stripe_api_secret", "stripe_api_key", "enable_openflow_amqp"].indexOf(key) > -1 ) {

if(os.hostname().toLowerCase() == "nixos") {
continue;
Expand Down Expand Up @@ -121,7 +121,7 @@ export class dbConfig extends Base {
"NODE_ENV", "validate_emails", "amqp_url", "port", "saml_issuer", "saml_federation_metadata", "api_ws_url",
"domain", "enable_openapi", "enable_openapiauth", "ping_clients_interval", "tls_crt", "tls_key", "tls_ca",
"otel_metric_url", "otel_trace_url", "multi_tenant", "auto_hourly_housekeeping", "housekeeping_skip_calculate_size", "housekeeping_skip_update_user_size",
"stripe_api_secret", "stripe_api_key" ].indexOf(key) > -1 ) {
"stripe_api_secret", "stripe_api_key", "enable_openflow_amqp" ].indexOf(key) > -1 ) {
if(os.hostname().toLowerCase() == "nixos") {
continue;
}
Expand Down
49 changes: 35 additions & 14 deletions OpenFlow/src/DatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,30 @@ export class DatabaseConnection extends events.EventEmitter {
Logger.instanse.silly("Really connected to mongodb", span);
const errEvent = (error) => {
this.isConnected = false;
this.streams = [];
Logger.instanse.info("mongodb.error", span);
Logger.instanse.error(error, span);
this.emit("disconnected");
}
const parseErrEvent = (error) => {
this.isConnected = false;
this.streams = [];
Logger.instanse.info("mongodb.parseError", span);
Logger.instanse.error(error, span);
this.emit("disconnected");
}
const closeEvent = () => {
this.streams = [];
this.isConnected = false;
Logger.instanse.info("mongodb.close", span);
Logger.instanse.silly("Disconnected from mongodb", span);
this.emit("disconnected");
}
this.cli
.on('connectionReady', () => {
})
.on('error', errEvent)
.on('parseError', errEvent)
.on('parseError', parseErrEvent)
.on('timeout', errEvent)
.on('close', closeEvent)
// .on("commandStarted", (event) => {
Expand Down Expand Up @@ -251,14 +264,7 @@ export class DatabaseConnection extends events.EventEmitter {
// Logger.instanse.debug("supports_watch: " + Config.supports_watch, span);
// if (Config.supports_watch && this.registerGlobalWatches) {
// let collections = await DatabaseConnection.toArray(this.db.listCollections());
let collections = await Logger.DBHelper.GetCollections(span);
collections = collections.filter(x => x.name.indexOf("system.") === -1);
for (var c = 0; c < collections.length; c++) {
if(["agents", "config", "mq", "nodered", "openrpa", "users", "workflow", "workitems"].indexOf(collections[c].name) == -1) continue;
if (collections[c].type != "collection") continue;
if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue;
this.registerGlobalWatch(collections[c].name, span);
}
this.doRegisterGlobalWatches(span);
// }
this.isConnected = true;
Logger.otel.endSpan(span);
Expand Down Expand Up @@ -598,6 +604,16 @@ export class DatabaseConnection extends events.EventEmitter {
if (!discardspan) span?.end();
}
}
async doRegisterGlobalWatches(span: Span) {
let collections = await Logger.DBHelper.GetCollections(span);
collections = collections.filter(x => x.name.indexOf("system.") === -1);
for (var c = 0; c < collections.length; c++) {
if(["agents", "config", "mq", "nodered", "openrpa", "users", "workflow", "workitems"].indexOf(collections[c].name) == -1) continue;
if (collections[c].type != "collection") continue;
if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue;
this.registerGlobalWatch(collections[c].name, span);
}
}
registerGlobalWatch(collectionname: string, span: Span) {
if (!this.registerGlobalWatches) return;
span?.addEvent("registerGlobalWatch", { collection: collectionname });
Expand Down Expand Up @@ -2040,6 +2056,9 @@ export class DatabaseConnection extends events.EventEmitter {

// @ts-ignore
item._id = result.insertedId;
if (collectionname === "mq" && item._type === "exchange") {
await amqpwrapper.Instance().PreRegisterExchange(item, span);
}
if (collectionname === "users" && item._type === "user") {
Base.addRight(item, item._id, item.name, [Rights.read, Rights.update, Rights.invoke]);

Expand Down Expand Up @@ -2316,6 +2335,9 @@ export class DatabaseConnection extends events.EventEmitter {
if (item._type == "exchange") item.name = item.name.toLowerCase();
if (item._type == "queue") item.name = item.name.toLowerCase();
if (item._type == "workitemqueue") { hadWorkitemQueue = true; wiqids.push(item._id); }
if (item._type === "exchange") {
await amqpwrapper.Instance().PreRegisterExchange(item, span);
}
}
if (collectionname == "workitems" && item._type == "workitem") {
// @ts-ignore
Expand Down Expand Up @@ -2971,6 +2993,9 @@ export class DatabaseConnection extends events.EventEmitter {
if (!NoderedUtil.IsNullEmpty(q.item.name)) {
if (q.item._type == "exchange") q.item.name = q.item.name.toLowerCase();
if (q.item._type == "queue") q.item.name = q.item.name.toLowerCase();
if (q.item._type === "exchange") {
await amqpwrapper.Instance().PreRegisterExchange(q.item, span);
}
}
}
if (!DatabaseConnection.usemetadata(q.collectionname)) {
Expand Down Expand Up @@ -4896,11 +4921,7 @@ export class DatabaseConnection extends events.EventEmitter {

// if (Config.supports_watch) {
Logger.instanse.info("Register global watches for each collection", span);
for (var c = 0; c < collections.length; c++) {
if (collections[c].type != "collection") continue;
if (collections[c].name == "fs.files" || collections[c].name == "fs.chunks") continue;
this.registerGlobalWatch(collections[c].name, span);
}
this.doRegisterGlobalWatches(span);
// }

DatabaseConnection.timeseries_collections = [];
Expand Down
15 changes: 14 additions & 1 deletion OpenFlow/src/Messages/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ export class Message {
var res = await cli.RegisterExchange(tuser, msg.exchangename, msg.algorithm, msg.routingkey, addqueue, parent);
msg.queuename = res.queuename;
msg.exchangename = res.exchangename;
if(msg.queuename == null) msg.queuename = "";
delete msg.jwt;
this.data = JSON.stringify(msg);
}
Expand Down Expand Up @@ -2043,6 +2044,11 @@ export class Message {
if (Config.otel_trace_interval > 0) msg.otel_trace_interval = Config.otel_trace_interval;
if (Config.otel_metric_interval > 0) msg.otel_metric_interval = Config.otel_metric_interval;
msg.enable_analytics = Config.enable_analytics;
if(msg.user != null) {
if(msg.user.email == null || msg.user.email == "") {
msg.user.email = "";
}
}
this.data = JSON.stringify(msg);
// hrend = process.hrtime(hrstart)
} finally {
Expand Down Expand Up @@ -5041,13 +5047,16 @@ export class Message {
await this.DuplicateWorkitem(wi, failed_wiq, failed_wiqid, this.jwt, parent);
}
}
if(msg.result != null) {
if(msg.result.nextrun == null) delete msg.result.nextrun;
if(msg.result.lastrun == null) delete msg.result.lastrun;
}
delete msg.jwt;
this.data = JSON.stringify(msg);
}




async PopWorkitem(parent: Span): Promise<void> {
this.Reply();
let msg: PopWorkitemMessage;
Expand Down Expand Up @@ -5140,6 +5149,10 @@ export class Message {
// }
// }
delete msg.jwt;
if(msg.result != null) {
if(msg.result.nextrun == null) delete msg.result.nextrun;
if(msg.result.lastrun == null) delete msg.result.lastrun;
}
this.data = JSON.stringify(msg);
}

Expand Down
14 changes: 12 additions & 2 deletions OpenFlow/src/WebServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { flowclient } from "./proto/client";
import { WebSocketServer } from "./WebSocketServer";
import { Message } from "./Messages/Message";
import { GridFSBucket, ObjectId } from "mongodb";
import { config, protowrap, GetElementResponse, UploadResponse, DownloadResponse, BeginStream, EndStream, Stream, ErrorResponse, Workitem } from "@openiap/nodeapi";
import { config, protowrap, GetElementResponse, UploadResponse, DownloadResponse, BeginStream, EndStream, Stream, ErrorResponse, Workitem, RegisterExchangeRequest } from "@openiap/nodeapi";
const { info, warn, err } = config;
import { Any } from "@openiap/nodeapi/lib/proto/google/protobuf/any";
import { Timestamp } from "@openiap/nodeapi/lib/proto/google/protobuf/timestamp";
Expand Down Expand Up @@ -335,12 +335,19 @@ export class WebServer {
return new Promise<string>((resolve, reject) => {
const bucket = new GridFSBucket(Config.db.db);
var metadata = new Base();
metadata.name = msg.filename;
metadata._acl = [];
metadata._createdby = "root";
metadata._createdbyid = WellknownIds.root;
metadata._modifiedby = "root";
metadata._modifiedbyid = WellknownIds.root;
if(msg.metadata != null && msg.metadata != null) {
try {
metadata = Object.assign(metadata, JSON.parse(msg.metadata));
} catch (error) {
Logger.instanse.error(error, null);
}
}
if(metadata.name == null || metadata.name == "") metadata.name = msg.filename;
if(client.user)
{
Base.addRight(metadata, client.user._id , client.user.name, [Rights.full_control]);
Expand Down Expand Up @@ -431,6 +438,9 @@ export class WebServer {
try {
[command, msg, reply] = protowrap.unpack(message);
if(message.command == "") throw new Error("Invalid/empty command");
if(command == "registerexchange") {
msg = RegisterExchangeRequest.decode(message.data.value);
}
} catch (error) {
err(error);
message.command = "error";
Expand Down
74 changes: 73 additions & 1 deletion OpenFlow/src/amqpwrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,79 @@ export class amqpwrapper extends events.EventEmitter {
Logger.otel.endSpan(span);
}
}
async checkAndDeleteExchange(exchangeName) {
let conn = await amqplib.connect(this.connectionstring);
try {
const channel = await conn.createChannel();
try {
// Try to check if exchange exists by declaring it passively
await channel.checkExchange(exchangeName);

// If no error is thrown, exchange exists, so delete it
await channel.deleteExchange(exchangeName);
// console.log(`Exchange '${exchangeName}' deleted.`);
} catch (err) {
// Error means exchange does not exist
console.log(`Exchange '${exchangeName}' does not exist or there was an error checking it.`);
}
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
} finally {
conn.close();
}
}
async PreAssertExchange(exchangeName: string, algorithm: string, ExchangeOptions: any): Promise<boolean> {
let conn = await amqplib.connect(this.connectionstring);
try {
const channel = await conn.createChannel();
try {
const _ok = await channel.assertExchange(exchangeName, algorithm, ExchangeOptions);
// console.log(`Exchange '${exchangeName}' exists.`);
return true;
} catch (err) {
// Error means exchange does not exist
console.log(`Exchange '${exchangeName}' has wrong config`);
return false;
}
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
} finally {
conn.close();
}

}
async PreRegisterExchange(exchange: any, parent: Span) {
if(exchange.name == "openflow") {
return
}
// @ts-ignore
let { algorithm, routingkey, exclusive } = exchange;
if(algorithm == null || algorithm == "") algorithm = "fanout"
if(routingkey == null || routingkey == "") routingkey = ""
if(exclusive == null || exclusive == "") exclusive = true
const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions));
AssertExchangeOptions.exclusive = exclusive;
// if (exchange.name != Config.amqp_dlx && exchange.name != "openflow" && exchange.name != "openflow_logs") AssertExchangeOptions.autoDelete = true;
AssertExchangeOptions.autoDelete = false;

// try and create exchange
if(! await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions)) {
// config differs, so delete and recreate
await this.checkAndDeleteExchange(exchange.name);
await this.PreAssertExchange(exchange.name, algorithm, AssertExchangeOptions);
}
// await amqpwrapper.Instance().AddExchangeConsumer(
// Crypt.rootUser(), exchange.name, algorithm, routingkey, AssertExchangeOptions, Crypt.rootToken(), false, null, parent);
}
async AddExchangeConsumer(user: TokenUser | User, exchange: string, algorithm: exchangealgorithm, routingkey: string, ExchangeOptions: any, jwt: string, addqueue: boolean, callback: QueueOnMessage, parent: Span): Promise<amqpexchange> {
const span: Span = Logger.otel.startSubSpan("amqpwrapper.AddExchangeConsumer", parent);
try {
if (NoderedUtil.IsNullEmpty(exchange)) throw new Error("exchange name cannot be empty");
if (this.channel == null || this.conn == null) throw new Error("Cannot Add new Exchange Consumer, not connected to rabbitmq");
const q: amqpexchange = new amqpexchange();
q.ExchangeOptions = Object.assign({}, (ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions));
if (exchange != Config.amqp_dlx && exchange != "openflow" && exchange != "openflow_logs") q.ExchangeOptions.autoDelete = true;
// if (exchange != Config.amqp_dlx && exchange != "openflow" && exchange != "openflow_logs") q.ExchangeOptions.autoDelete = true;
q.ExchangeOptions.autoDelete = false;
q.exchange = exchange; q.algorithm = algorithm; q.routingkey = routingkey; q.callback = callback;
const _ok = await this.channel.assertExchange(q.exchange, q.algorithm, q.ExchangeOptions);
if (addqueue) {
Expand Down Expand Up @@ -474,6 +539,10 @@ export class amqpwrapper extends events.EventEmitter {
WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue });
} else {
if (NoderedUtil.IsNullEmpty(routingkey)) routingkey = "";
if(exchange != "openflow" && exchange != "openflow_logs") {
// console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId);
}
this.PreRegisterExchange
this.channel.publish(exchange, routingkey, Buffer.from(data), options);
}
}
Expand Down Expand Up @@ -521,6 +590,9 @@ export class amqpwrapper extends events.EventEmitter {
if (!NoderedUtil.IsNullUndefinded(WebSocketServer.websocket_queue_message_count))
WebSocketServer.websocket_queue_message_count.add(1, { ...Logger.otel.defaultlabels, queuename: queue });
} else {
if(exchange != "openflow" && exchange != "openflow_logs") {
// console.log("publishing to exchange: " + exchange + " routingkey: " + routingkey + " correlationId: " + correlationId);
}
this.channel.publish(exchange, routingkey, Buffer.from(data), options);
}
}
Expand Down
9 changes: 9 additions & 0 deletions OpenFlow/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ async function initDatabase(parent: Span): Promise<boolean> {
Logger.otel.endSpan(span);
}
}
async function PreRegisterExchanges(span: Span) {
var exchanges = await Config.db.query<Base>({ query: { _type: "exchange" }, collectionname: "mq", jwt: Crypt.rootToken() }, span);
for(let i = 0; i < exchanges.length; i++) {
const exchange = exchanges[i];
await amqpwrapper.Instance().PreRegisterExchange(exchange, span);
}
}

process.on('beforeExit', (code) => {
Logger.instanse.error(code as any, null);
Expand Down Expand Up @@ -461,6 +468,7 @@ var server: http.Server = null;
try {
await Config.db.connect(span);
await initamqp(span);
await PreRegisterExchanges(span);
Logger.instanse.info("VERSION: " + Config.version, span);
server = await WebServer.configure(Config.baseurl(), span);
if (GrafanaProxy != null) {
Expand All @@ -487,3 +495,4 @@ var server: http.Server = null;
Logger.otel.endSpan(span);
}
})();

2 changes: 1 addition & 1 deletion OpenFlow/src/proto/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export class flowclient extends client {
}
for (let i = this._exchanges.length - 1; i >= 0; i--) {
const e = this._exchanges[i];
if (e && (e.queue != null && e.queue.queue == queuename || e.queue.queuename == queuename)) {
if (e && (e.queue != null && e.queue?.queue == queuename || e.queue?.queuename == queuename)) {
try {
amqpwrapper.Instance().RemoveQueueConsumer(user, this._exchanges[i].queue, span).catch((err) => {
Logger.instanse.error(err, span, Logger.parsecli(this as any));
Expand Down
Loading
Loading