Skip to content

Commit

Permalink
Updated readme and moved persistence code to a more sensible location…
Browse files Browse the repository at this point in the history
…. Breaks postgresql plugin but not user facing API
  • Loading branch information
ncthbrt committed Nov 4, 2017
1 parent 0add6e0 commit eb04e1d
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 18 deletions.
83 changes: 79 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ const contactsService = spawn(
// All these message types require an existing contact
// So check if the contact exists
const contact = state.contacts[msg.contactId];
if (contact) {
if (contact) {
switch(msg.type) {
case GET_CONTACT: {
dispatch(ctx.sender, { payload: contact, type: SUCCESS });
Expand Down Expand Up @@ -442,7 +442,82 @@ Now the only thing remaining for a MVP of our contacts service is some way of pe
## Persistence
[![Remix on Glitch](https://cdn.glitch.com/2703baf2-b643-4da7-ab91-7ee2a2d00b5b%2Fremix-button.svg)](https://glitch.com/edit/#!/remix/nact-contacts-3)

The contacts service we've been working on STILL isn't very useful. While we've extended the service to support multiple users, it has the unfortunate limitation that it loses the contacts each time the machine restarts. To remedy this, nact extends stateful actors by adding a new method: `persist`
The contacts service we've been working on *still* isn't very useful. While we've extended the service to support multiple users, it has the unfortunate limitation that it loses the contacts each time the machine restarts. To remedy this, nact extends stateful actors by adding a new method: `persist`

To use `persist`, the first thing we need to do is specify a persistence engine. Currently only a [PostgreSQL](https://github.com/ncthbrt/nact-persistence-postgres) engine is available (though it should be easy to create your own). To work with the PostgreSQL engine, install the persistent provider package using the command `npm install --save nact-persistence-postgres`. Assuming you've stored a connection string to a running database instance under the environment variable `DATABASE_URL` , we'll need to modify the code creating the system to look something like the following:

```js
const { start } = require('nact');
const { configurePersistence, spawnPersistent } = require('nact/persistence');
const { PostgresPersistenceEngine } = require('nact-persistence-postgres');
const connectionString = process.env.DATABASE_URL;
const system = start(configurePersistence(new PostgresPersistenceEngine(connectionString)));
```

The `configurePersistence` method adds the the persistence plugin to the system using the specified persistence engine.

Now the only remaining work is to modify the contacts service to allow persistence. We want to save messages which modify state and replay them when the actor starts up again:

```js

const spawnUserContactService = (parent, userId) => spawnPersistent(
parent,
async (state = { contacts:{} }, msg, ctx) => {
if(msg.type === GET_CONTACTS) {
dispatch(ctx.sender, { payload: Object.values(state.contacts), type: SUCCESS });
} else if (msg.type === CREATE_CONTACT) {
const newContact = { id: uuid(), ...msg.payload };
const nextState = { contacts: { ...state.contacts, [newContact.id]: newContact } };

// We only want to save messages which haven't been previously persisted
// Note the persist call should always be awaited. If persist is not awaited,
// then the actor will process the next message in the queue before the
// message has been safely committed.
if(!ctx.recovering) { await ctx.persist(msg); }

// Safe to dispatch while recovering.
// The message just goes to Nobody and is ignored.
dispatch(ctx.sender, { type: SUCCESS, payload: newContact });
return nextState;
} else {
const contact = state.contacts[msg.contactId];
if (contact) {
switch(msg.type) {
case GET_CONTACT: {
dispatch(ctx.sender, { payload: contact, type: SUCCESS }, ctx.self);
break;
}
case REMOVE_CONTACT: {
const nextState = { ...state.contacts, [contact.id]: undefined };
if(!ctx.recovering) { await ctx.persist(msg); }
dispatch(ctx.sender, { type: SUCCESS, payload: contact }, ctx.self);
return nextState;
}
case UPDATE_CONTACT: {
const updatedContact = {...contact, ...msg.payload };
const nextState = { ...state.contacts, [contact.id]: updatedContact };
if(!ctx.recovering) { await ctx.persist(msg); }
dispatch(ctx.sender,{ type: SUCCESS, payload: updatedContact }, ctx.self);
return nextState;
}
}
} else {
dispatch(ctx.sender, { type: NOT_FOUND, contactId: msg.contactId }, ctx.sender);
}
}
return state;
},
// Persistence key. If we want to restore actor state,
// the key must be the same. Be careful about namespacing here.
// For example if we'd just used userId, another developer might accidentally
// use the same key for an actor of a different type. This could cause difficult to
// debug runtime errors
`contacts:${userId}`,
userId
);
```



# API

Expand All @@ -458,7 +533,7 @@ The contacts service we've been working on STILL isn't very useful. While we've
| `start(...plugins)` | `SystemReference` | Starts the actor system. Plugins is a variadic list of middleware. Currently this is only being used with `configurePersistence` |
| `state$(actor)` | `Observable<'state>` | Creates an observable which streams the current state of the actor to subscribers. |

## communication
### communication

| Method | Returns | Description |
| ---------------------------------------- | :-------------- | ---------------------------------------- |
Expand Down Expand Up @@ -499,7 +574,7 @@ The contacts service we've been working on STILL isn't very useful. While we've



#### Persistenct Actors
#### Persistenct Actor

| Property | Returns | Description |
| -------------- | :-------------- | ---------------------------------------- |
Expand Down
9 changes: 4 additions & 5 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
const { spawn, spawnStateless } = require('./actor');
const { spawnPersistent, configurePersistence } = require('./extensions/persistence');
const { stop, state$, query, dispatch } = require('./functions');

const { spawnPersistent, configurePersistence } = require('./persistence');
module.exports = {
...require('./system'),
spawn,
spawnStateless,
spawnPersistent,
configurePersistence,
query,
dispatch,
stop,
state$
state$,
spawnPersistent,
configurePersistence
};
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require('rxjs');
const { PersistedEvent } = require('./persistence-engine');
const { Actor } = require('../../actor');
const { Actor } = require('../actor');
const { Promise } = require('bluebird');
const freeze = require('deep-freeze-node');

Expand Down Expand Up @@ -61,7 +61,7 @@ class PersistentActor extends Actor {
}
}

const { applyOrThrowIfStopped } = require('../../references');
const { applyOrThrowIfStopped } = require('../references');
const spawnPersistent = (reference, f, key, name) =>
applyOrThrowIfStopped(
reference,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "3.1.3",
"version": "3.1.4",
"description": "Implementation of the actor model for node",
"main": "lib/index.js",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion test/broken-persistence-engine.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { AbstractPersistenceEngine } = require('../lib/extensions/persistence');
const { AbstractPersistenceEngine } = require('../lib/persistence');

class BrokenPersistenceEngine extends AbstractPersistenceEngine {
events (persistenceKey, offset, limit, tags) {
Expand Down
2 changes: 1 addition & 1 deletion test/mock-persistence-engine.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { AbstractPersistenceEngine } = require('../lib/extensions/persistence');
const { AbstractPersistenceEngine } = require('../lib/persistence');
const { Observable } = require('rxjs');

class MockPersistenceEngine extends AbstractPersistenceEngine {
Expand Down
2 changes: 1 addition & 1 deletion test/partially-broken-persistence-engine.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { AbstractPersistenceEngine } = require('../lib/extensions/persistence');
const { AbstractPersistenceEngine } = require('../lib/persistence');
const Rx = require('rxjs');
const { Observable } = Rx;

Expand Down
2 changes: 1 addition & 1 deletion test/persistence-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable no-unused-expressions */
const chai = require('chai');
chai.should();
const { PersistedEvent, AbstractPersistenceEngine } = require('../lib/extensions/persistence');
const { PersistedEvent, AbstractPersistenceEngine } = require('../lib/persistence');

describe('PersistedEvent', function () {
it('should be immutable', function () {
Expand Down
4 changes: 2 additions & 2 deletions test/persistent-actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ chai.should();
const { MockPersistenceEngine } = require('./mock-persistence-engine');
const { BrokenPersistenceEngine } = require('./broken-persistence-engine');
const { PartiallyBrokenPersistenceEngine } = require('./partially-broken-persistence-engine');
const { start, spawnPersistent, configurePersistence, dispatch, query, stop } = require('../lib');
const { PersistedEvent } = require('../lib/extensions/persistence');
const { start, dispatch, query, stop } = require('../lib');
const { PersistedEvent, spawnPersistent, configurePersistence } = require('../lib/persistence');
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const { Promise } = require('bluebird');
Expand Down

0 comments on commit eb04e1d

Please sign in to comment.