Skip to content

Commit

Permalink
Merge pull request #50 from ncthbrt/persistence-query
Browse files Browse the repository at this point in the history
Persistence query
  • Loading branch information
ncthbrt authored May 18, 2018
2 parents 91cad12 + bcd8c0f commit 17a10cf
Show file tree
Hide file tree
Showing 8 changed files with 525 additions and 34 deletions.
4 changes: 2 additions & 2 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Actor {
}

assertNotStopped () { assert(!this.stopped); return true; }
postMessage () { }
afterMessage () { }

dispatch (message, sender = new Nobody()) {
this.assertNotStopped();
Expand Down Expand Up @@ -189,7 +189,7 @@ class Actor {
let ctx = this.createContext(sender);
let next = await Promise.resolve(this.f.call(ctx, this.state, message, ctx));
this.state = next;
this.postMessage();
this.afterMessage();
this.processNext();
} catch (e) {
this.handleFault(message, sender, e);
Expand Down
3 changes: 2 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { spawn, spawnStateless } = require('./actor');
const { stop, query, dispatch } = require('./functions');
const { spawnPersistent, configurePersistence } = require('./persistence');
const { spawnPersistent, configurePersistence, persistentQuery } = require('./persistence');
const { logNothing, logToConsole } = require('./monitoring');
const { Nobody } = require('./references');
const time = require('./time');
Expand All @@ -11,6 +11,7 @@ module.exports = {
spawn,
spawnStateless,
query,
persistentQuery,
dispatch,
stop,
spawnPersistent,
Expand Down
3 changes: 2 additions & 1 deletion lib/persistence/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { spawnPersistent } = require('./persistent-actor');
const { persistentQuery } = require('./persistent-query');
const { PersistedEvent, PersistedSnapshot, AbstractPersistenceEngine } = require('./persistence-engine');

const configurePersistence = (engine) => (system) => {
Expand All @@ -8,4 +9,4 @@ const configurePersistence = (engine) => (system) => {
return Object.assign(system, { persistenceEngine: engine });
};

module.exports = { configurePersistence, spawnPersistent, PersistedSnapshot, PersistedEvent, AbstractPersistenceEngine };
module.exports = { configurePersistence, spawnPersistent, PersistedSnapshot, PersistedEvent, AbstractPersistenceEngine, persistentQuery };
47 changes: 18 additions & 29 deletions lib/persistence/persistent-actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ class PersistentActor extends Actor {
this.snapshotEncoder = snapshotEncoder;
this.decoder = decoder;
this.encoder = encoder;

if (snapshotEvery) {
if (typeof (snapshotEvery) !== 'number') {
throw new Error('Shutdown should be specified as a number. The value indicates how many persisted messages ');
}
this.snapshotMessageInterval = snapshotEvery;
if (!snapshotEvery) {
this.postMessage = () => {};
this.snapshotEvery = Number.POSITIVE_INFINITY;
} else if (typeof (snapshotEvery) !== 'number') {
throw new Error('snapshotEvery should be specified as a number. The value indicates how many messages are persisted between snapshots');
} else {
this.snapshotEvery = snapshotEvery;
}

this.immediate = setImmediate(() => this.recover());
}

postMessage () {
if (this.snapshotMessageInterval && this.messagesToNextSnapshot <= 0) {
this.messagesToNextSnapshot = this.snapshotMessageInterval;
afterMessage () {
if (this.messagesToNextSnapshot <= 0) {
const snapshotState = this.snapshotEncoder(this.state);
this.messagesToNextSnapshot = this.snapshotEvery;
const sequenceNumber = this.sequenceNumber;
const key = this.key;
this.takeSnapshot(this.state, sequenceNumber, key);
const snapshot = new PersistedSnapshot(snapshotState, sequenceNumber, key);
return this.persistenceEngine.takeSnapshot(snapshot).catch(e => console.error('Failed to take snapshot due to error: ', e));
}
}

Expand Down Expand Up @@ -79,15 +81,13 @@ class PersistentActor extends Actor {
// Subscribe to the end result and start processing new messages
let snapshot = await this.persistenceEngine.latestSnapshot(this.key);

let sequenceNumber = 0;
let initialState;
if (snapshot) {
initialState = this.snapshotDecoder(snapshot.data);
sequenceNumber = snapshot.sequenceNumber;
this.state = this.snapshotDecoder(snapshot.data);
this.sequenceNumber = snapshot.sequenceNumber;
}

let result = await this.persistenceEngine
.events(this.key, sequenceNumber)
.events(this.key, this.sequenceNumber)
.reduce(async (prev, msg, index) => {
if (await prev) {
const [state, prevIndex] = await prev;
Expand All @@ -108,12 +108,11 @@ class PersistentActor extends Actor {
}
}
}
}, Promise.resolve([initialState, 0, sequenceNumber]));
// Message count can be different to sequenceNumber if events have been deleted from the database
}, Promise.resolve([this.state, 0, this.sequenceNumber]));
if (result) {
const [state, messageCount, seq] = result;
this.sequenceNumber = seq;
this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount;
this.messagesToNextSnapshot = this.snapshotEvery - messageCount;
this.state = state;
this.processNext();
}
Expand All @@ -122,16 +121,6 @@ class PersistentActor extends Actor {
}
}

async takeSnapshot (state, sequenceNumber, key) {
const snapshotState = this.snapshotEncoder(state);
try {
const snapshot = new PersistedSnapshot(snapshotState, sequenceNumber, key);
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to take snapshot ${e}`);
}
}

reset () {
this.state = undefined;
this.busy = true;
Expand Down
147 changes: 147 additions & 0 deletions lib/persistence/persistent-query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
const { PersistedSnapshot } = require('./persistence-engine');
const { applyOrThrowIfStopped } = require('../system-map');
const id = x => x;

class PersistentQuery {
constructor (system, f, key, persistenceEngine, { snapshotKey, snapshotEvery, cacheDuration, snapshotEncoder = id, snapshotDecoder = id, encoder = id, decoder = id } = {}) {
if (!key) {
throw new Error('Persistence key required');
}
if (typeof (key) !== 'string') {
throw new Error('Persistence key must be a string');
}
this.persistenceEngine = persistenceEngine;

this.clearCache();
this.proxy = new Proxy(() => {}, {
apply: () => {
if (!this.promise) {
this.promise = this.query();
} else {
this.resetTimeout();
}
return this.promise;
}
});
this.f = f;
this.key = key;
this.snapshotKey = snapshotKey;
if (!snapshotKey) {
this.latestSnapshot = () => { this.snapshotRestored = true; return Promise.resolve(); };
} else if (typeof (snapshotKey) !== 'string') {
throw new Error('snapshotKey must be of type string');
}
this.snapshotDecoder = snapshotDecoder;
this.snapshotEncoder = snapshotEncoder;
this.decoder = decoder;
this.encoder = encoder;
this.promise = undefined;

if (!snapshotEvery) {
this.takeSnapshot = () => Promise.resolve();
this.snapshotEvery = Number.POSITIVE_INFINITY;
} else if (typeof (snapshotEvery) !== 'number') {
throw new Error('snapshotEvery should be specified as a number. The value indicates how many messages are persisted between snapshots');
} else if (!snapshotKey) {
throw new Error('snapshotKey must be specified in order to enable snapshotting');
} else {
this.snapshotEvery = snapshotEvery;
this.messagesToNextSnapshot = snapshotEvery;
}
if (!cacheDuration) {
this.setTimeout = () => {};
} else if (typeof (cacheDuration) !== 'number') {
throw new Error('cacheDuration should be specified as a number in milliseconds');
} else {
this.cacheDuration = PersistentQuery.getSafeTimeout(cacheDuration);
this.setTimeout();
}
}

clearCache () {
this.state = undefined;
this.sequenceNumber = 0;
this.messagesToNextSnapshot = undefined;
this.snapshotRestored = false;
}

static getSafeTimeout (timeoutDuration) {
timeoutDuration = timeoutDuration | 0;
const MAX_TIMEOUT = 2147483647;
return Math.min(MAX_TIMEOUT, timeoutDuration);
}

setTimeout () {
this.timeout = setTimeout(() => this.clearCache(), this.cacheDuration);
}

clearTimeout () {
clearTimeout(this.timeout);
}

resetTimeout () {
this.clearTimeout();
this.setTimeout();
}

async takeSnapshot () {
if (this.messagesToNextSnapshot <= 0) {
const snapshotState = this.snapshotEncoder(this.state);
const sequenceNumber = this.sequenceNumber;
const key = this.snapshotKey;
try {
const snapshot = new PersistedSnapshot(snapshotState, sequenceNumber, key);
await this.persistenceEngine.takeSnapshot(snapshot);
this.messagesToNextSnapshot = this.snapshotEvery;
} catch (e) {
console.error(`Failed to take snapshot ${e}`);
}
}
}

async latestSnapshot () {
if (!this.snapshotRestored) {
const snapshot = await this.persistenceEngine.latestSnapshot(this.snapshotKey);
if (snapshot) {
this.state = this.snapshotDecoder(snapshot.data);
this.sequenceNumber = snapshot.sequenceNumber;
}
this.messagesToNextSnapshot = this.snapshotEvery;
this.snapshotRestored = true;
}
}

async query () {
this.clearTimeout();
await this.latestSnapshot();
let result = await this.persistenceEngine
.events(this.key, this.sequenceNumber)
.reduce(async (prev, msg, index) => {
const [state, prevIndex] = await prev;
if (msg.isDeleted) {
return [state, prevIndex, msg.sequenceNumber];
} else {
const decodedMsg = this.decoder(msg.data);
// Might not be an async function. Using promise.resolve to force it into that form
const nextState = await Promise.resolve(this.f.call(undefined, state, decodedMsg));
return [nextState, index, msg.sequenceNumber];
}
}, Promise.resolve([this.state, 0, this.sequenceNumber]));
const [state, messageCount, seq] = result;
this.sequenceNumber = seq;
this.messagesToNextSnapshot -= messageCount;
this.state = state;
this.takeSnapshot();
this.promise = undefined;
this.resetTimeout();
return state;
}
}

const persistentQuery = (parent, f, key, properties) =>
applyOrThrowIfStopped(
parent,
parent => new PersistentQuery(parent.system, f, key, parent.system.persistenceEngine, properties).proxy
);

module.exports.persistentQuery = persistentQuery;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "7.0.2",
"version": "7.1.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
Expand Down
Loading

0 comments on commit 17a10cf

Please sign in to comment.