Skip to content

Commit

Permalink
Changed behaviour of stateless actor
Browse files Browse the repository at this point in the history
  • Loading branch information
ncthbrt committed Oct 28, 2017
1 parent 48b18d0 commit f15ff70
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 16 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ This should print `Hello Erlich Bachman` to the console.

To complete this example, we need to shutdown our system. We can do this by calling `system.stop()`

> Note: Stateless actors can service multiple requests at the same time. Statelessness means that the actor
> does not have to cater for concurrency issues.
## Stateful Actors
[![Remix on Glitch](https://cdn.glitch.com/2703baf2-b643-4da7-ab91-7ee2a2d00b5b%2Fremix-button.svg)](https://glitch.com/edit/#!/remix/nact-stateful-greeter)

Expand Down
4 changes: 2 additions & 2 deletions lib/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ const spawn = (parent, f, name) =>

const spawnStateless = (parent, f, name) =>
spawn(parent, function (state, msg, ctx) {
const next = f.call(ctx, msg, ctx);
return (next && next.then && next.then(() => true)) || true;
f.call(ctx, msg, ctx);
return true;
}, name);

module.exports.spawn = spawn;
Expand Down
88 changes: 74 additions & 14 deletions test/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const retry = async (assertion, remainingAttempts, retryInterval = 0) => {
return assertion();
} else {
try {
assertion();
await Promise.resolve(assertion());
} catch (e) {
await delay(retryInterval);
await retry(assertion, remainingAttempts - 1, retryInterval);
Expand Down Expand Up @@ -76,11 +76,12 @@ describe('Actor', function () {

it('allows promises to resolve inside actor', async function () {
const getMockValue = () => Promise.resolve(2);
let child = spawnStateless(
let child = spawn(
system,
async function (msg) {
async function (state = {}, msg) {
let result = await getMockValue();
this.sender.dispatch(result, this.self);
return state;
}
);

Expand Down Expand Up @@ -108,24 +109,67 @@ describe('Actor', function () {
result.should.equal('Hello World. The time has come!!');
});

it('evalutes in order when returning a promise from the actor function', async function () {
it('evalutes in order when returning a promise from a stateful actor function', async function () {
let child = spawn(
system,
async function (state = {}, msg) {
if (msg.number === 2) {
await delay(30);
}
msg.listener.dispatch({ number: msg.number });
return state;
},
'testActor'
);

let listener = spawn(
system,
async function (state = [], msg) {
if (msg.number) {
return [...state, msg.number];
} else {
this.sender.dispatch(state);
}
return state;
},
'listener'
);

child.dispatch({ listener, number: 1 });
child.dispatch({ listener, number: 2 });
child.dispatch({ listener, number: 3 });
await retry(async () => (await listener.query({})).should.deep.equal([1, 2, 3]), 5, 10);
});

it('evalutes out of order when returning a promise from a stateless actor function', async function () {
let child = spawnStateless(
system,
async function (msg) {
if (msg === 2) {
await delay(10);
if (msg.number === 2) {
await delay(30);
}
this.sender.dispatch(msg, this.self);
msg.listener.dispatch({ number: msg.number });
},
'testActor'
);

let result1 = await child.query(1);
let result2 = await child.query(2);
let result3 = await child.query(3);
result1.should.equal(1);
result2.should.equal(2);
result3.should.equal(3);
let listener = spawn(
system,
async function (state = [], msg) {
if (msg.number) {
return [...state, msg.number];
} else {
this.sender.dispatch(state);
}
return state;
},
'listener'
);

child.dispatch({ listener, number: 1 });
child.dispatch({ listener, number: 2 });
child.dispatch({ listener, number: 3 });
await retry(async () => (await listener.query({})).should.deep.equal([1, 3, 2]), 5, 10);
});

it('should automatically stop if error is thrown', async function () {
Expand All @@ -137,7 +181,7 @@ describe('Actor', function () {

it('should automatically stop if rejected promise is thrown', async function () {
console.error = ignore;
let child = spawnStateless(system, (msg) => Promise.reject(new Error('testError')));
let child = spawn(system, (state = {}, msg) => Promise.reject(new Error('testError')));
child.dispatch();
await retry(() => isStopped(child).should.be.true, 12, 10);
});
Expand All @@ -159,6 +203,22 @@ describe('Actor', function () {
(() => spawnStateless(child, () => ignore)).should.throw(Error);
});

it('should not process any more messages after being stopped', async function () {
let child = spawn(system, async (state = {}, msg, ctx) => {
if (msg === 1) {
await delay(20);
} else {
ctx.sender.dispatch(msg);
}
return state;
});
child.dispatch(1);
let resultPromise = child.query(2, 100);
await delay(20);
child.stop();
await resultPromise.should.be.rejectedWith(Error);
});

it('stops children when parent is stopped', async function () {
let actor = spawnChildrenEchoer(system);
let child1 = spawnChildrenEchoer(actor, 'child1');
Expand Down

0 comments on commit f15ff70

Please sign in to comment.