From 43af732c0ade85660fef2a506a96e2cc9fea88c1 Mon Sep 17 00:00:00 2001 From: Vasilii Ermilov Date: Sun, 30 Aug 2020 18:24:57 +1000 Subject: [PATCH] =?UTF-8?q?=D1=80=D0=B5=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0?= =?UTF-8?q?=D1=86=D0=B8=D1=8F=20=D0=BC=D0=BE=D0=B4=D1=83=D0=BB=D1=8F=20?= =?UTF-8?q?=D1=88=D0=B8=D0=BD=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + lib/impl/AiBus.js | 186 ++++++++++++++++++++++++++++++++++++++ lib/interfaces/IAgent.js | 25 +++++ lib/interfaces/IBus.js | 70 ++++++++++++++ lib/interfaces/IQueue.js | 18 ++++ lib/interfaces/ISignal.js | 59 ++++++++++++ 6 files changed, 360 insertions(+) create mode 100644 lib/impl/AiBus.js create mode 100644 lib/interfaces/IAgent.js create mode 100644 lib/interfaces/IBus.js create mode 100644 lib/interfaces/IQueue.js create mode 100644 lib/interfaces/ISignal.js diff --git a/.gitignore b/.gitignore index 6704566..eea40bb 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,5 @@ dist # TernJS port file .tern-port + +*.swp diff --git a/lib/impl/AiBus.js b/lib/impl/AiBus.js new file mode 100644 index 0000000..8ba04a2 --- /dev/null +++ b/lib/impl/AiBus.js @@ -0,0 +1,186 @@ +const IBus = require('../interfaces/IBus'); +const IonError = require('core/IonError'); +const uuid = require('uuid'); + +class AiBus extends IBus { + /** + * @param {{}} options + * @param {IQueue} options.queue + * @param {Repository} options.storage + * @param {Number} [options.timeout] + */ + constructor(options) { + super(); + if (!options || !options.queue || !options.storage) + throw new IonError('Invalid options'); + + this.live = false; + this.queue = options.queue; + this.storage = options.storage; + this.defaultTimeout = options.timeout || 10000; + this.events = {}; + this.handleOnce = {}; + } + + /** + * @param {ISignal} signal + * @returns {Promise} + */ + _handle(signal) { + signal.thread = signal.thread || uuid.v1(); + return this.queue.enqueue(signal); + } + + /** + * @param {String} code + * @param {String} sender + * @param {String} thread + * @param {String} id + * @param {Function} handler + */ + _addEvent(code, sender, thread, id, handler) { + sender = sender || ''; + this.events[code] = this.events[code] || {}; + this.events[code][thread] = this.events[code][thread] || {}; + this.events[code][thread][sender] = this.events[code][thread][sender] || {}; + if (!this.events[code][thread][sender][id]) + this.events[code][thread][sender][id] = handler; + } + + /** + * @param {String} code + * @param {String} sender + * @param {String} thread + * @param {Number} [timeout] + * @returns {Promise} + */ + _wait(code, sender, thread, timeout) { + return new Promise((resolve, reject) => { + const handler = function(signal) { + return resolve(signal); + }; + const timeOver = function() { + return reject(new IonError(`Timeout for ${code} ${sender} ${thread} is over`)); + }; + const id = `wait-${uuid.v1()}`; + if (code && thread) + this._addEvent(code, sender, thread, id, handler); + else + this.handleOnce[id] = handler; + + setTimeout(timeOver, timeout || this.defaultTimeout); + }); + } + + /** + * @param {IAgent} agent + * @param {ISignal} trigger + * @returns {Promise} + */ + async _register(agent, trigger) { + const handler = agent.construct(this); + if (typeof handler !== 'function') + throw new IonError('No handler'); + + const agentData = await this.storage.get(agent.id); + if (!agentData) + await this.storage.set(agent.id, {}); + + if (trigger) { + const { + code, thread, sender + } = trigger; + this._addEvent(code, sender, thread, agent.id, handler); + } else { + this.handleOnce[agent.id] = handler; + } + } + + /** + * @param {String} id + * @returns {Promise} + */ + async _eject(id) { + await this.storage.set(id, {}); + delete this.handleOnce[id]; + Object.values(this.events).forEach((code) => { + Object.values(code).forEach((thread) => { + Object.values(thread).forEach((sender) => { + if (Object.keys(sender).includes(id)) + delete sender[id]; + }); + }); + }); + } + + /** + * @param {String} id + * @returns {Promise} + */ + async _status(id) { + const agentData = await this.storage.get(id); + if (agentData) + return agentData.status; + + throw new IonError('No agent'); + } + + /** + * @param {String} id + * @param {{}} data + * @returns {Promise} + */ + async _setStatus(id, data) { + let agentData = await this.storage.get(id); + agentData = agentData || {}; + agentData.status = data; + return this.storage.set(id, agentData); + } + + /** + * @param {ISignal} signal + */ + _onSignal(signal) { + if (Object.keys(this.handleOnce).length) { + Object.values(this.handleOnce).forEach((handler) => { + handler(signal); + }); + this.handleOnce = {}; + } + if (this.events[signal.code] && this.events[signal.code][signal.thread]) { + const sender = signal.sender || ''; + if (this.events[signal.code][signal.thread][sender]) + Object.values(this.events[signal.code][signal.thread][sender]).forEach(handler => handler(signal)); + } + } + + /** + * @returns {undefined} + */ + _serve() { + if (this.live) + return; + + this.live = true; + const worker = async () => { + if (!this.live) + return; + + const signal = await this.queue.dequeue(); + if (signal) + this._onSignal(signal); + + setImmediate(worker); + }; + worker(); + } + + /** + * @returns {undefined} + */ + _stop() { + this.live = false; + } +} + +module.exports = AiBus; diff --git a/lib/interfaces/IAgent.js b/lib/interfaces/IAgent.js new file mode 100644 index 0000000..d6726b5 --- /dev/null +++ b/lib/interfaces/IAgent.js @@ -0,0 +1,25 @@ +class IAgent { + /** + * @returns {String} + */ + get id() { + return this._getId(); + } + + /** + * @param {String} value + */ + set id(value) { + return this._setId(value); + } + + /** + * @param {IBus} bus + * @returns {Function} + */ + construct(bus) { + return this._construct(bus); + } +} + +module.exports = IAgent; diff --git a/lib/interfaces/IBus.js b/lib/interfaces/IBus.js new file mode 100644 index 0000000..e67d771 --- /dev/null +++ b/lib/interfaces/IBus.js @@ -0,0 +1,70 @@ +class IBus { + /** + * @param {ISignal} signal + * @returns {Promise} + */ + handle(signal) { + return this._handle(signal); + } + + /** + * @param {String} code + * @param {String} sender + * @param {String} thread + * @param {Number} [timeout] + * @returns {Promise} + */ + wait(code, sender, thread, timeout) { + return this._wait(code, sender, thread, timeout); + } + + /** + * @param {IAgent} agent + * @param {ISignal} trigger + * @returns {Promise} + */ + register(agent, trigger) { + return this._register(agent, trigger); + } + + /** + * @param {String} id + * @returns {Promise} + */ + eject(id) { + return this._eject(id); + } + + /** + * @param {String} id + * @returns {Promise} + */ + status(id) { + return this._status(id); + } + + /** + * @param {String} id + * @param {{}} data + * @returns {Promise} + */ + setStatus(id, data) { + return this._setStatus(id, data); + } + + /** + * @returns {undefined} + */ + serve() { + return this._serve(); + } + + /** + * @returns {undefined} + */ + stop() { + return this._stop(); + } +} + +module.exports = IBus; diff --git a/lib/interfaces/IQueue.js b/lib/interfaces/IQueue.js new file mode 100644 index 0000000..6fbad4d --- /dev/null +++ b/lib/interfaces/IQueue.js @@ -0,0 +1,18 @@ +class IQueue { + /** + * @param {ISignal} signal + * @returns {Promise} + */ + enqueue(signal) { + return this._enqueue(signal); + } + + /** + * @returns {Promise.} + */ + dequeue() { + return this._dequeue(); + } +} + +module.exports = IQueue; diff --git a/lib/interfaces/ISignal.js b/lib/interfaces/ISignal.js new file mode 100644 index 0000000..5b06874 --- /dev/null +++ b/lib/interfaces/ISignal.js @@ -0,0 +1,59 @@ +class ISignal { + /** + * @returns {String} + */ + get code() { + return this._getCode(); + } + + /** + * @param {String} value + */ + set code(value) { + return this._setCode(value); + } + + /** + * @returns {String} + */ + get sender() { + return this._getSender(); + } + + /** + * @param {String} value + */ + set sender(value) { + return this._setSender(value); + } + + /** + * @returns {String} + */ + get thread() { + return this._getThread(); + } + + /** + * @param {String} value + */ + set thread(value) { + return this._setThread(value); + } + + /** + * @returns {{}} + */ + get data() { + return this._getData(); + } + + /** + * @param {{}} value + */ + set data(value) { + return this._setData(value); + } +} + +module.exports = ISignal;