Skip to content

Commit

Permalink
Merge pull request #41 from ncthbrt/refactor/supervision
Browse files Browse the repository at this point in the history
Refactor/supervision
  • Loading branch information
ncthbrt authored Mar 5, 2018
2 parents 7ffb695 + 7bfaf43 commit 0cd4065
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 45 deletions.
6 changes: 1 addition & 5 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"editor.tabSize": 2,
"prettier.tabWidth": 2,
"javascript.validate.enable": false,
"standard.semistandard": true,
"standard.enable": true,
Expand All @@ -12,8 +11,5 @@
"**/CVS": true,
"**/.DS_Store": true,
"**/coverage": true
}
}
}
},

}
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
[![Coveralls](https://img.shields.io/coveralls/ncthbrt/nact.svg?style=flat-square)]() [![Dependencies](https://david-dm.org/ncthbrt/nact.svg?branch=master&style=flat-square)](https://david-dm.org/ncthbrt/nact) [![DeepScan Grade](https://deepscan.io/api/projects/908/branches/1863/badge/grade.svg)](https://deepscan.io/dashboard/#view=project&pid=908&bid=1863)


[![npm](https://img.shields.io/npm/v/nact.svg?style=flat-square)](https://www.npmjs.com/package/nact) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fncthbrt%2Fnact.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fncthbrt%2Fnact?ref=badge_shield)

[![js-semistandard-style](https://img.shields.io/badge/code%20style-semistandard-blue.svg?style=flat-square)](https://github.com/Flet/semistandard)
[![we are reactive](https://img.shields.io/badge/we_are-reactive-blue.svg?style=flat-square)](https://www.reactivemanifesto.org/)
[![npm](https://img.shields.io/npm/v/nact.svg?style=flat-square)](https://www.npmjs.com/package/nact) [![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fncthbrt%2Fnact.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fncthbrt%2Fnact?ref=badge_shield) [![js-semistandard-style](https://img.shields.io/badge/code%20style-semistandard-blue.svg?style=flat-square)](https://github.com/Flet/semistandard) [![we are reactive](https://img.shields.io/badge/we_are-reactive-blue.svg?style=flat-square)](https://www.reactivemanifesto.org/)



Expand Down
33 changes: 17 additions & 16 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ const { stop } = require('./functions');
const { defaultSupervisionPolicy, SupervisionActions } = require('./supervision');

class Actor {
constructor (parent, name, system, f, { shutdownAfter, whenChildCrashes } = {}) {
constructor (parent, name, system, f, { shutdownAfter, onCrash } = {}) {
this.parent = parent;
if (!name) {
name = `anonymous-${Math.abs(Math.random() * Number.MAX_SAFE_INTEGER) | 0}`;
}
if (name && parent.children.has(name)) {
if (parent.children.has(name)) {
throw new Error(`child actor of name ${name} already exists`);
}
this.name = name;
Expand All @@ -33,7 +33,8 @@ class Actor {
this.mailbox = new Queue();
this.immediate = undefined;
this.parent.childSpawned(this);
this.whenChildCrashes = whenChildCrashes || defaultSupervisionPolicy;

this.onCrash = onCrash || defaultSupervisionPolicy;

if (shutdownAfter) {
if (typeof (shutdownAfter) !== 'number') {
Expand Down Expand Up @@ -120,7 +121,7 @@ class Actor {
this.clearTimeout();
this.parent && this.parent.childStopped(this);
delete this.parent;
[...this.children.values()].map(stop);
[...this.children.values()].forEach(stop);
this.stopped = true;
this.subject.complete();
}
Expand Down Expand Up @@ -151,24 +152,24 @@ class Actor {
}
}

async handleFault (child, msg, sender, error) {
const ctx = this.createSupervisionContext(child, msg, sender, error);
const decision = await Promise.resolve(this.whenChildCrashes(msg, error, ctx));
async handleFault (msg, sender, error) {
const ctx = this.createSupervisionContext(msg, sender, error);
const decision = await Promise.resolve(this.onCrash(msg, error, ctx));
switch (decision) {
case SupervisionActions.stop:
child.stop();
this.stop();
break;
case SupervisionActions.stopAll:
[...this.children.values()].forEach(x => x.stop());
[...this.parent.children.values()].forEach(x => x.stop());
break;
case SupervisionActions.resume:
child.resume();
this.resume();
break;
case SupervisionActions.reset:
child.reset();
this.reset();
break;
case SupervisionActions.resetAll:
[...this.children.values()].forEach(x => x.reset());
[...this.parent.children.values()].forEach(x => x.reset());
break;
case SupervisionActions.escalate:
default:
Expand All @@ -181,9 +182,9 @@ class Actor {
this.processNext(this.state, true);
}

createSupervisionContext (child, msg, sender, error) {
createSupervisionContext (msg, sender, error) {
const ctx = this.createContext(this);
return { ...ctx, ...SupervisionActions, child: child.reference };
return { ...ctx, ...SupervisionActions };
}

createContext (sender) {
Expand All @@ -205,12 +206,12 @@ class Actor {
let ctx = this.createContext(sender);
let next = this.f.call(ctx, freeze(this.state), message, ctx);
if (next && next.then && next.catch) {
next.then(result => this.processNext(result)).catch(err => this.parent.handleFault(this, message, sender, err));
next.then(result => this.processNext(result)).catch(err => this.handleFault(message, sender, err));
} else {
this.processNext(next);
}
} catch (e) {
this.parent.handleFault(this, message, sender, e);
this.handleFault(this, message, sender, e);
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion lib/supervision.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const SupervisionActions = {
};

const defaultSupervisionPolicy = (msg, err, ctx) => {
let path = ctx.child.path.toString();
let path = ctx.path.toString();
console.error(`${path}: The following error was raised when processing message %O:\n%O\nTerminating faulted actor`, msg, err);
return ctx.stop;
};
Expand Down
6 changes: 2 additions & 4 deletions lib/system.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const { ActorSystemReference, Nobody } = require('./references');
const { ActorPath } = require('./paths');
const assert = require('assert');
const { stop } = require('./functions');
const { defaultSupervisionPolicy, SupervisionActions } = require('./supervision');
const { LoggingFacade } = require('./monitoring');
const systemMap = require('./system-map');

Expand All @@ -25,7 +24,6 @@ class ActorSystem {
this.tempReferences = new Map();
this.stopped = false;
this.system = this;
this.whenChildCrashes = defaultSupervisionPolicy;
assert(extensions instanceof Array);
systemMap.add(this);
([...(typeof (hd) === 'function') ? [hd] : [], ...tail]).forEach(extension => extension(this));
Expand Down Expand Up @@ -64,7 +62,7 @@ class ActorSystem {
}

handleFault (child, msg, sender, error) {
defaultSupervisionPolicy(msg, error, { child: child.reference, ...SupervisionActions });
console.error('Stopping top level actor,', child.name, 'due to a fault');
child.stop();
}

Expand All @@ -79,7 +77,7 @@ class ActorSystem {
}

stop () {
[...this.children.values()].map(stop);
[...this.children.values()].forEach(stop);
this.stopped = true;
systemMap.remove(this.name);
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "nact",
"version": "4.4.4",
"version": "5.0.0",
"description": "nact ⇒ node.js + actors = your services have never been so µ",
"main": "lib/index.js",
"scripts": {
Expand Down
34 changes: 20 additions & 14 deletions test/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -401,22 +401,23 @@ describe('Actor', function () {
});
});

describe('#whenChildCrashes', function () {
describe('#onCrash', function () {
let system;
beforeEach(() => { system = start(); });
afterEach(() => stop(system));

const createSupervisor = (parent, name, whenChildCrashes) => spawn(parent, (state = true, msg, ctx) => state, name, { whenChildCrashes });
const createSupervisor = (parent, name) => spawn(parent, (state = true, msg, ctx) => state, name);

it('should be able to continue processing messages without loss of state', async function () {
const parent = createSupervisor(system, 'test1', (msg, err, ctx) => ctx.resume);
const resume = (msg, err, ctx) => ctx.resume;
const parent = createSupervisor(system, 'test1');
const child = spawn(parent, (state = 0, msg, ctx) => {
if (state + 1 === 3 && msg !== 'msg3') {
throw new Error('Very bad thing');
}
dispatch(ctx.sender, state + 1);
return state + 1;
});
}, 'test', { onCrash: resume });
dispatch(child, 'msg0');
dispatch(child, 'msg1');
dispatch(child, 'msg2');
Expand All @@ -425,14 +426,15 @@ describe('Actor', function () {
});

it('should be able to be reset', async function () {
const parent = createSupervisor(system, 'test1', (msg, err, ctx) => ctx.reset);
const reset = (msg, err, ctx) => ctx.reset;
const parent = createSupervisor(system, 'test1');
const child = spawn(parent, (state = 0, msg, ctx) => {
if (state + 1 === 3 && msg !== 'msg3') {
throw new Error('Very bad thing');
}
dispatch(ctx.sender, state + 1);
return state + 1;
});
}, 'test', { onCrash: reset });

const grandchild = spawn(child, (state = 0, msg, ctx) => {
dispatch(ctx.sender, state + 1);
Expand All @@ -452,14 +454,15 @@ describe('Actor', function () {
});

it('should be able to stop', async function () {
const parent = createSupervisor(system, 'test1', (msg, err, ctx) => ctx.stop);
const stop = (msg, err, ctx) => ctx.stop;
const parent = createSupervisor(system, 'test1');
const child = spawn(parent, (state = 0, msg, ctx) => {
if (state + 1 === 3 && msg !== 'msg3') {
throw new Error('Very bad thing');
}
dispatch(ctx.sender, state + 1);
return state + 1;
});
}, 'test', { onCrash: stop });
dispatch(child, 'msg0');
dispatch(child, 'msg1');
dispatch(child, 'msg2');
Expand All @@ -468,14 +471,15 @@ describe('Actor', function () {
});

it('should be able to escalate', async function () {
const parent = createSupervisor(system, 'test1', (msg, err, ctx) => ctx.escalate);
const escalate = (msg, err, ctx) => ctx.escalate;
const parent = createSupervisor(system, 'test1');
const child = spawn(parent, (state = 0, msg, ctx) => {
if (state + 1 === 3 && msg !== 'msg3') {
throw new Error('Very bad thing');
}
dispatch(ctx.sender, state + 1);
return state + 1;
});
}, 'test', { onCrash: escalate });
dispatch(child, 'msg0');
dispatch(child, 'msg1');
dispatch(child, 'msg2');
Expand All @@ -485,14 +489,15 @@ describe('Actor', function () {
});

it('should be able to stop all children', async function () {
const parent = createSupervisor(system, 'test1', (msg, err, ctx) => ctx.stopAll);
const stopAll = (msg, err, ctx) => ctx.stopAll;
const parent = createSupervisor(system, 'test1');
const child1 = spawn(parent, (state = 0, msg, ctx) => {
if (state + 1 === 3 && msg !== 'msg3') {
throw new Error('Very bad thing');
}
dispatch(ctx.sender, state + 1);
return state + 1;
});
}, 'test', { onCrash: stopAll });
const child2 = spawn(parent, (state = 0, msg, ctx) => {
dispatch(ctx.sender, state + 1);
return state + 1;
Expand All @@ -506,14 +511,15 @@ describe('Actor', function () {
});

it('should be able to reset all children', async function () {
const parent = createSupervisor(system, 'test1', (msg, err, ctx) => ctx.resetAll);
const resetAll = (msg, err, ctx) => ctx.resetAll;
const parent = createSupervisor(system, 'test1');
const child1 = spawn(parent, (state = 0, msg, ctx) => {
if (state + 1 === 3 && msg !== 'msg3') {
throw new Error('Very bad thing');
}
dispatch(ctx.sender, state + 1);
return state + 1;
});
}, 'test', { onCrash: resetAll });
const child2 = spawn(parent, (state = 0, msg, ctx) => {
dispatch(ctx.sender, state + 1);
return state + 1;
Expand Down

0 comments on commit 0cd4065

Please sign in to comment.