Skip to content

Commit

Permalink
реализация модуля шины
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasilii Ermilov committed Aug 30, 2020
1 parent bbe4ba9 commit 43af732
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,5 @@ dist

# TernJS port file
.tern-port

*.swp
186 changes: 186 additions & 0 deletions lib/impl/AiBus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
const IBus = require('../interfaces/IBus');
const IonError = require('core/IonError');
const uuid = require('uuid');

class AiBus extends IBus {
/**
* @param {{}} options
* @param {IQueue} options.queue
* @param {Repository} options.storage
* @param {Number} [options.timeout]
*/
constructor(options) {
super();
if (!options || !options.queue || !options.storage)
throw new IonError('Invalid options');

this.live = false;
this.queue = options.queue;
this.storage = options.storage;
this.defaultTimeout = options.timeout || 10000;
this.events = {};
this.handleOnce = {};
}

/**
* @param {ISignal} signal
* @returns {Promise}
*/
_handle(signal) {
signal.thread = signal.thread || uuid.v1();
return this.queue.enqueue(signal);
}

/**
* @param {String} code
* @param {String} sender
* @param {String} thread
* @param {String} id
* @param {Function} handler
*/
_addEvent(code, sender, thread, id, handler) {
sender = sender || '';
this.events[code] = this.events[code] || {};
this.events[code][thread] = this.events[code][thread] || {};
this.events[code][thread][sender] = this.events[code][thread][sender] || {};
if (!this.events[code][thread][sender][id])
this.events[code][thread][sender][id] = handler;
}

/**
* @param {String} code
* @param {String} sender
* @param {String} thread
* @param {Number} [timeout]
* @returns {Promise}
*/
_wait(code, sender, thread, timeout) {
return new Promise((resolve, reject) => {
const handler = function(signal) {
return resolve(signal);
};
const timeOver = function() {
return reject(new IonError(`Timeout for ${code} ${sender} ${thread} is over`));
};
const id = `wait-${uuid.v1()}`;
if (code && thread)
this._addEvent(code, sender, thread, id, handler);
else
this.handleOnce[id] = handler;

setTimeout(timeOver, timeout || this.defaultTimeout);
});
}

/**
* @param {IAgent} agent
* @param {ISignal} trigger
* @returns {Promise}
*/
async _register(agent, trigger) {
const handler = agent.construct(this);
if (typeof handler !== 'function')
throw new IonError('No handler');

const agentData = await this.storage.get(agent.id);
if (!agentData)
await this.storage.set(agent.id, {});

if (trigger) {
const {
code, thread, sender
} = trigger;
this._addEvent(code, sender, thread, agent.id, handler);
} else {
this.handleOnce[agent.id] = handler;
}
}

/**
* @param {String} id
* @returns {Promise}
*/
async _eject(id) {
await this.storage.set(id, {});
delete this.handleOnce[id];
Object.values(this.events).forEach((code) => {
Object.values(code).forEach((thread) => {
Object.values(thread).forEach((sender) => {
if (Object.keys(sender).includes(id))
delete sender[id];
});
});
});
}

/**
* @param {String} id
* @returns {Promise}
*/
async _status(id) {
const agentData = await this.storage.get(id);
if (agentData)
return agentData.status;

throw new IonError('No agent');
}

/**
* @param {String} id
* @param {{}} data
* @returns {Promise}
*/
async _setStatus(id, data) {
let agentData = await this.storage.get(id);
agentData = agentData || {};
agentData.status = data;
return this.storage.set(id, agentData);
}

/**
* @param {ISignal} signal
*/
_onSignal(signal) {
if (Object.keys(this.handleOnce).length) {
Object.values(this.handleOnce).forEach((handler) => {
handler(signal);
});
this.handleOnce = {};
}
if (this.events[signal.code] && this.events[signal.code][signal.thread]) {
const sender = signal.sender || '';
if (this.events[signal.code][signal.thread][sender])
Object.values(this.events[signal.code][signal.thread][sender]).forEach(handler => handler(signal));
}
}

/**
* @returns {undefined}
*/
_serve() {
if (this.live)
return;

this.live = true;
const worker = async () => {
if (!this.live)
return;

const signal = await this.queue.dequeue();
if (signal)
this._onSignal(signal);

setImmediate(worker);
};
worker();
}

/**
* @returns {undefined}
*/
_stop() {
this.live = false;
}
}

module.exports = AiBus;
25 changes: 25 additions & 0 deletions lib/interfaces/IAgent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
class IAgent {
/**
* @returns {String}
*/
get id() {
return this._getId();
}

/**
* @param {String} value
*/
set id(value) {
return this._setId(value);
}

/**
* @param {IBus} bus
* @returns {Function}
*/
construct(bus) {
return this._construct(bus);
}
}

module.exports = IAgent;
70 changes: 70 additions & 0 deletions lib/interfaces/IBus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
class IBus {
/**
* @param {ISignal} signal
* @returns {Promise}
*/
handle(signal) {
return this._handle(signal);
}

/**
* @param {String} code
* @param {String} sender
* @param {String} thread
* @param {Number} [timeout]
* @returns {Promise}
*/
wait(code, sender, thread, timeout) {
return this._wait(code, sender, thread, timeout);
}

/**
* @param {IAgent} agent
* @param {ISignal} trigger
* @returns {Promise}
*/
register(agent, trigger) {
return this._register(agent, trigger);
}

/**
* @param {String} id
* @returns {Promise}
*/
eject(id) {
return this._eject(id);
}

/**
* @param {String} id
* @returns {Promise}
*/
status(id) {
return this._status(id);
}

/**
* @param {String} id
* @param {{}} data
* @returns {Promise}
*/
setStatus(id, data) {
return this._setStatus(id, data);
}

/**
* @returns {undefined}
*/
serve() {
return this._serve();
}

/**
* @returns {undefined}
*/
stop() {
return this._stop();
}
}

module.exports = IBus;
18 changes: 18 additions & 0 deletions lib/interfaces/IQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class IQueue {
/**
* @param {ISignal} signal
* @returns {Promise}
*/
enqueue(signal) {
return this._enqueue(signal);
}

/**
* @returns {Promise.<ISignal>}
*/
dequeue() {
return this._dequeue();
}
}

module.exports = IQueue;
59 changes: 59 additions & 0 deletions lib/interfaces/ISignal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
class ISignal {
/**
* @returns {String}
*/
get code() {
return this._getCode();
}

/**
* @param {String} value
*/
set code(value) {
return this._setCode(value);
}

/**
* @returns {String}
*/
get sender() {
return this._getSender();
}

/**
* @param {String} value
*/
set sender(value) {
return this._setSender(value);
}

/**
* @returns {String}
*/
get thread() {
return this._getThread();
}

/**
* @param {String} value
*/
set thread(value) {
return this._setThread(value);
}

/**
* @returns {{}}
*/
get data() {
return this._getData();
}

/**
* @param {{}} value
*/
set data(value) {
return this._setData(value);
}
}

module.exports = ISignal;

0 comments on commit 43af732

Please sign in to comment.