Skip to content

Commit

Permalink
Refactored timeout and snapshotting to be simpler
Browse files Browse the repository at this point in the history
ncthbrt committed Dec 9, 2017
1 parent 1406529 commit 42b04d1
Showing 11 changed files with 66 additions and 211 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -521,41 +521,42 @@ const spawnUserContactService = (parent, userId) => spawnPersistent(

Sometimes actors accumulate a lot of persisted events. This is problematic because it means that it can take a potentially long time for an actor to recover. For time-sensitive applictions, this would make nact an unsuitable proposition. Snapshotting is a way to skip replaying every single event. When a persistent actor starts up again, nact checks to see if there are any snapshots available in the *snapshot store*. Nact selects the latest snapshot. The snapshot contains the sequence number at which it was taken. The snapshot is passed as the initial state to the actor, and only the events which were persisted after the snapshot are replayed.

To modify the user contacts service to support snapshotting, we import the `every` function and refactor the code to the following:
To modify the user contacts service to support snapshotting, we refactor the code to the following:

```js
const { messages } = require('nact');
const spawnUserContactService = (parent, userId) => spawnPersistent(
parent,
// Same function as before
async (state = { contacts:{} }, msg, ctx) => {},
`contacts:${userId}`,
userId,
{ snapshot: every(20).messages.or(12).hours }
{ snapshotEvery: 20 * messages }
);
```

The final argument to `spawnPersistent` is the actor properties object. Here we are using `every` to instruct nact to make a snapshot every 20 messages or 12 hours (the timer till the next snapshot is reset if a snapshot is made sooner and visa-versa).
The final argument to `spawnPersistent` is the actor properties object. Here we are using `snapshotEvery` to instruct nact to make a snapshot every 20 messages.

### Timeouts

While not strictly a part of the persistent actor, timeouts are frequently used with snapshotting. Actors take up memory, which is still a limited resource. If an actor has not processed messages in a while, it makes sense to shut it down until it is again needed; this frees up memory. Adding a timeout to the user contacts service is similar to snapshotting:

```js
const { messages, minutes } = require('nact');
const spawnUserContactService = (parent, userId) => spawnPersistent(
parent,
// Same function as before
async (state = { contacts:{} }, msg, ctx) => {},
`contacts:${userId}`,
userId,
{ snapshot: every(20).messages.or(12).hours,
timeout: after(10).minutes
{ snapshotEvery: 20 * messages,
shutdownAfter: 10 * minutes
}
);
```

In the code above, the user contacts service stops if it hasn't received a new message in 10 minutes.


# API

## Functions
@@ -569,8 +570,7 @@ In the code above, the user contacts service stops if it hasn't received a new m
| `spawnPersistent(parent, func, persistenceKey, name = auto, options = {})` | `ActorReference` | Creates a persistent actor. Persistent actors extend stateful actors but also add a persist method to the actor context. When an actor restarts after persisting messages, the persisted messages are played back in order until no futher messages remain. The actor may then start processing new messages. The `persistenceKey` is used to retrieve the persisted messages from the actor. |
| `start(...plugins)` | `SystemReference` | Starts the actor system. Plugins is a variadic list of middleware. Currently this is only being used with `configurePersistence` |
| `state$(actor)` | `Observable<'state>` | Creates an observable which streams the current state of the actor to subscribers. |
| every(amount).[unit] | `Duration | MessageInterval` | |
| after(amount).[unit] | `Duration | MessageInterval` | |


### communication

10 changes: 5 additions & 5 deletions lib/actor.js
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ const { Subject } = require('rxjs');
const { stop } = require('./functions');

class Actor {
constructor (parent, name, system, f, { shutdown } = {}) {
constructor (parent, name, system, f, { shutdownAfter } = {}) {
this.parent = parent;
if (!name) {
name = `anonymous-${parent.children.size}`;
@@ -31,11 +31,11 @@ class Actor {
this.mailbox = new Queue();
this.immediate = undefined;
this.parent.childSpawned(this);
if (shutdown) {
if (!shutdown.duration) {
throw new Error('Shutdown should be specified as a duration. It is recommended to use the after() function to do this');
if (shutdownAfter) {
if (typeof (shutdownAfter) !== 'number') {
throw new Error('Shutdown should be specified as a number in milliseconds');
}
this.shutdownPeriod = Actor.getSafeTimeout(shutdown.duration);
this.shutdownPeriod = Actor.getSafeTimeout(shutdownAfter);
this.setTimeout();
}
}
4 changes: 2 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
const { spawn, spawnStateless } = require('./actor');
const { stop, state$, query, dispatch } = require('./functions');
const { spawnPersistent, configurePersistence } = require('./persistence');
const utils = require('./utils');
const time = require('./time');
module.exports = {
...require('./system'),
...utils,
...time,
spawn,
spawnStateless,
query,
58 changes: 17 additions & 41 deletions lib/persistence/persistent-actor.js
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ const { Promise } = require('bluebird');
const freeze = require('deep-freeze-node');

class PersistentActor extends Actor {
constructor (parent, name, system, f, key, persistenceEngine, { snapshot, ...properties } = {}) {
constructor (parent, name, system, f, key, persistenceEngine, { snapshotEvery, ...properties } = {}) {
super(parent, name, system, f, properties);
if (!key) {
throw new Error('Persistence key required');
@@ -19,12 +19,11 @@ class PersistentActor extends Actor {
this.busy = true;
this.key = key;

if (snapshot) {
this.snapshotDuration = snapshot.duration ? Actor.getSafeTimeout(snapshot.duration) : false;
this.snapshotMessageInterval = snapshot.messageInterval || false;
if (!this.snapshotMessageInterval && !this.snapshotDuration) {
throw new Error('Snapshot requires a duration and/or messages field. Correctly specifying the snapshot rule is most easily done using every()');
if (snapshotEvery) {
if (typeof (snapshotEvery) !== 'number') {
throw new Error('Shutdown should be specified as a number. The value indicates how many persisted messages ');
}
this.snapshotMessageInterval = snapshotEvery;
}

setImmediate(() => this.recover());
@@ -63,7 +62,6 @@ class PersistentActor extends Actor {
this.messagesToNextSnapshot = this.snapshotMessageInterval - messageCount;
}

this.resetSnapshotInterval();
this.processNext(state, sequenceNumber === 0);
});
});
@@ -72,52 +70,30 @@ class PersistentActor extends Actor {
}
}

resetSnapshotInterval () {
if (this.snapshotDuration) {
clearInterval(this.snapshotInterval);
this.snapshotInterval = setInterval(async () => {
const snapshot = new PersistedSnapshot(this.state, this.sequenceNumber, this.key);
this.messagesToNextSnapshot = this.snapshotMessageInterval;
try {
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to save snapshot ${e}`);
}
}, this.snapshotDuration);
}
}

async processNext (next, initial = false) {
if (!this.stopped && this.snapshotMessageInterval && !initial) {
--this.messagesToNextSnapshot;
if (this.messagesToNextSnapshot <= 0) {
this.resetSnapshotInterval();
this.messagesToNextSnapshot = this.snapshotMessageInterval;
await this.takeSnapshot(next);
}
}
super.processNext(next, initial);
}

async takeSnapshot (state) {
async takeSnapshot (state, sequenceNumber, key) {
try {
const snapshot = new PersistedSnapshot(state, this.sequenceNumber, this.key);
const snapshot = new PersistedSnapshot(state, sequenceNumber, key);
await this.persistenceEngine.takeSnapshot(snapshot);
} catch (e) {
console.error(`Failed to take snapshot ${e}`);
}
}

async persist (msg, tags = []) {
if (this.snapshotMessageInterval) {
--this.messagesToNextSnapshot;
if (this.messagesToNextSnapshot <= 0) {
this.messagesToNextSnapshot = this.snapshotMessageInterval;
const sequenceNumber = this.sequenceNumber;
const state = this.state;
const key = this.key;
this.takeSnapshot(state, sequenceNumber, key);
}
}
const persistedEvent = new PersistedEvent(msg, ++this.sequenceNumber, this.key, tags);
return (await (this.persistenceEngine.persist(persistedEvent))).data;
}

stop () {
super.stop();
clearInterval(this.snapshotInterval);
}

createContext () {
return { ...super.createContext.apply(this, arguments), persist: this.persist.bind(this) };
}
17 changes: 17 additions & 0 deletions lib/time.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const milliseconds = 1;
const seconds = 1000 * milliseconds;
const minutes = 60 * seconds;
const hours = 60 * minutes;

module.exports = {
milliseconds,
millisecond: milliseconds,
seconds,
second: seconds,
minutes,
minute: minutes,
hours,
hour: hours,
messages: 1,
message: 1
};
72 changes: 0 additions & 72 deletions lib/utils.js

This file was deleted.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "4.1.0",
"version": "4.2.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
12 changes: 6 additions & 6 deletions test/actor.js
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ const chai = require('chai');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
chai.should();
const { start, spawn, after, spawnStateless, dispatch, stop, query, state$ } = require('../lib');
const { start, spawn, spawnStateless, dispatch, stop, query, state$, milliseconds } = require('../lib');
const { Promise } = require('bluebird');
const { LocalPath } = require('../lib/paths');
const delay = Promise.delay.bind(Promise);
@@ -195,7 +195,7 @@ describe('Actor', function () {
});
});

describe('timeout', function () {
describe('shutdownAfter', function () {
let system;
beforeEach(() => { system = start(); });
afterEach(() => {
@@ -206,21 +206,21 @@ describe('Actor', function () {

it('should automatically stop after timeout if timeout is specified', async function () {
console.error = ignore;
let child = spawnStateless(system, (msg) => {}, 'test', { shutdown: after(100).milliseconds });
let child = spawnStateless(system, (msg) => {}, 'test', { shutdownAfter: 100 * milliseconds });
await delay(110);
isStopped(child).should.be.true;
});

it('should automatically renew timeout after message', async function () {
let child = spawnStateless(system, ignore, 'test1', { shutdown: after(60).milliseconds });
let child = spawnStateless(system, ignore, 'test1', { shutdownAfter: 60 * milliseconds });
await delay(30);
dispatch(child, {});
await delay(40);
isStopped(child).should.not.be.true;
});

it('should throw if timeout does not include a duration field', async function () {
(() => spawnStateless(system, ignore, 'test1', { shutdown: {} })).should.throw(Error);
it('should throw if timeout is not a number', async function () {
(() => spawnStateless(system, ignore, 'test1', { shutdownAfter: {} })).should.throw(Error);
});
});

2 changes: 1 addition & 1 deletion test/mock-persistence-engine.js
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ class MockPersistenceEngine extends AbstractPersistenceEngine {
this._snapshots[persistedSnapshot.key] = [...prev, persistedSnapshot];
return Promise.resolve(persistedSnapshot);
} else {
throw new Error('Elvis has left the building');
return Promise.reject(new Error('Elvis has left the building'));
}
}

38 changes: 9 additions & 29 deletions test/persistent-actor.js
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ chai.should();
const { MockPersistenceEngine } = require('./mock-persistence-engine');
const { BrokenPersistenceEngine } = require('./broken-persistence-engine');
const { PartiallyBrokenPersistenceEngine } = require('./partially-broken-persistence-engine');
const { start, dispatch, query, stop, every } = require('../lib');
const { start, dispatch, query, stop, messages } = require('../lib');
const { PersistedEvent, PersistedSnapshot, spawnPersistent, configurePersistence } = require('../lib/persistence');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
@@ -203,62 +203,42 @@ describe('PersistentActor', () => {
system = start(configurePersistence(persistenceEngine));
const actor = spawnPersistent(
system,
concatenativeFunction(''),
concatenativeFunction('', async (state, msg, ctx) => { await ctx.persist(msg); }),
'iceland',
'test',
{ snapshot: every(5).messages }
{ snapshotEvery: 5 * messages }
);
const expectedResult = 'iceland is cold';
const expectedResult = 'iceland\'s cold';
expectedResult.split('').forEach(msg => {
dispatch(actor, msg);
});
(await query(actor, '!', 30));
(await query(actor, '', 30));
const snapshots = persistenceEngine._snapshots['iceland'];
snapshots.length.should.equal(3);
snapshots[snapshots.length - 1].data.should.equal(expectedResult);
});

it('should be able to persist a snapshot after a specified duration', async () => {
const persistenceEngine = new MockPersistenceEngine();
system = start(configurePersistence(persistenceEngine));
const actor = spawnPersistent(
system,
concatenativeFunction(''),
'iceland',
'test',
{ snapshot: every(40).milliseconds }
);
const expectedResult = 'iceland is cold';
expectedResult.split('').forEach(msg => {
dispatch(actor, msg);
});
await delay(50);
const snapshots = persistenceEngine._snapshots['iceland'];
snapshots[snapshots.length - 1].data.should.equal(expectedResult);
});

it('should be able to continue processing messages even after failing to save a snapshot when snapshotting', async () => {
console.error = ignore;
const persistenceEngine = new MockPersistenceEngine(undefined, undefined, false); // Disable takeSnapshot
system = start(configurePersistence(persistenceEngine));
const actor = spawnPersistent(
system,
concatenativeFunction(''),
concatenativeFunction('', async (state, msg, ctx) => { await ctx.persist(msg); }),
'iceland',
'test',
{ snapshot: every(5).messages.and(30).milliseconds }
{ snapshotEvery: 5 * messages }
);
const expectedResult = 'iceland is cold';
expectedResult.split('').forEach(msg => {
dispatch(actor, msg);
});
await delay(50);
(await query(actor, '', 30)).should.equal(expectedResult);
});

it('should throw if snapshot does not include a duration field', async function () {
it('should throw if snapshot is not a number', async function () {
const persistenceEngine = new MockPersistenceEngine(); // Disable takeSnapshot
system = start(configurePersistence(persistenceEngine));
(() => spawnPersistent(system, ignore, 'test1', undefined, { snapshot: {} })).should.throw(Error);
(() => spawnPersistent(system, ignore, 'test1', undefined, { snapshotEvery: {} })).should.throw(Error);
});
});
46 changes: 0 additions & 46 deletions test/utils.js

This file was deleted.

2 comments on commit 42b04d1

@qm3ster
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, persistence can only rely on number of messages now, and not time?

@ncthbrt
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. Rehydration duration is proportional to the number of messages, so it makes more sense to rely on that as the snapshot criterion.

Please sign in to comment.