Skip to content
This repository has been archived by the owner on Sep 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #50 from aws-samples/multiple-tasks
Browse files Browse the repository at this point in the history
don't start task in error handler
  • Loading branch information
svozza authored Sep 13, 2019
2 parents 32fe49e + 6fa9de3 commit 328cdb8
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 75 deletions.
46 changes: 24 additions & 22 deletions src/backend/functions/orchestrator/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ const startTranscription = R.curry((ecs, {cluster, taskName, tasksTableName, med

return ecs.runTask(params)
.promise()
.then(({tasks}) => ({tasksTableName, mediaUrl, taskArn: tasks[0].taskArn}))
.then(({tasks}) => {
console.log(`Task started: ${tasks[0].taskArn}`);
return {tasksTableName, mediaUrl, taskArn: tasks[0].taskArn}
})
});

// startTranscription :: ({k: v} -> Promise {k: v}) -> {k: v} -> Promise {k: v}
Expand Down Expand Up @@ -117,45 +120,41 @@ function createDeleteParams({mediaUrl, tasksTableName}) {
// terminated :: ({k:v} -> Promise {k: v}) -> {k: v} -> {k: v} -> Promise {k:v}
const terminated = R.curry(deleteItem => o(deleteItem, createDeleteParams));

// errorUpdate :: ({k: v} -> Promise {k: v}) -> {k: v} -> Promise {k: v}
const errorUpdate = R.curry((updateItem, {tasksTableName, mediaUrl, taskArn}) => {
const unrecoverableError = R.curry((updateItem, {tasksTableName, mediaUrl}) => {
const params = {
TableName: tasksTableName,
Key: {
MediaUrl: mediaUrl
},
UpdateExpression: 'ADD Retries :val SET TaskStatus = :status, TaskArn = :task',
UpdateExpression: 'SET TaskStatus = :status',
ExpressionAttributeValues: {
':val': 1,
':status': 'WAITING',
':task': taskArn
':status': 'UNRECOVERABLE_ERROR'
},
ReturnValues: 'ALL_NEW'
};

return updateItem(params)
.then(({Attributes}) => ({...Attributes, tasksTableName}))
return updateItem(params);
});

const unrecoverableError = R.curry((updateItem, {tasksTableName, mediaUrl}) => {
// error :: ({k: v} -> Promise {k: v}) -> {k: v} -> Promise {k: v}
const error = R.curry((updateItem, {tasksTableName, mediaUrl}) => {
const params = {
TableName: tasksTableName,
Key: {
MediaUrl: mediaUrl
},
UpdateExpression: 'SET TaskStatus = :status',
UpdateExpression: 'ADD Retries :val SET TaskStatus = :status',
ExpressionAttributeValues: {
':status': 'UNRECOVERABLE_ERROR'
':val': 1,
':status': 'WAITING'
},
ReturnValues: 'ALL_NEW'
};

return updateItem(params);
return updateItem(params)
.then(({Attributes}) => ({...Attributes, tasksTableName}))
});

// error :: AWS.ECS -> AWS.DynamoDB -> {k: v} -> Promise {k:v}
const error = R.curry((ecs, updateItem) => composeP(errorUpdate(updateItem), startTranscription(ecs)));

function isUnrecoverableError(retries, retryThreshold, taskStatus) {
return retries > retryThreshold && !R.includes(taskStatus, ['TERMINATING', 'TERMINATED']);
}
Expand All @@ -170,7 +169,7 @@ module.exports = (ecs, ddb, env) => {
PROCESSING: noop,
TERMINATING: terminating(ecs),
TERMINATED: terminated(params => ddb.delete(params).promise()),
ERROR: error(ecs, update),
ERROR: error(update),
UNRECOVERABLE_ERROR: noop
};

Expand All @@ -185,13 +184,15 @@ module.exports = (ecs, ddb, env) => {

const params = R.mergeRight(normalisedEnv, convertImage(image));

const {mediaUrl, retries = 0, taskStatus, tasksTableName, retryThreshold} = params;
const {mediaUrl, retries = 0, taskStatus, tasksTableName, retryThreshold = '3'} = params;

console.log(`Status: ${taskStatus}`);

console.log('Status: ' + taskStatus);
if(taskStatus === 'UNRECOVERABLE_ERROR') return Promise.resolve({});

if(isUnrecoverableError(retries, R.defaultTo(3, parseInt(retryThreshold)), taskStatus)) {
console.log('Error threshold exceeded');
return unrecoverableError(update, {mediaUrl, tasksTableName});
console.log(`Error threshold exceeded. Retries: ${retries}`);
return unrecoverableError(update, {mediaUrl, tasksTableName})
}

const handler = R.defaultTo(noop, handlers[taskStatus]);
Expand All @@ -205,7 +206,8 @@ module.exports = (ecs, ddb, env) => {
const {mediaUrl, tasksTableName} = err;

const stop = err.taskArn != null ?
stopTranscription(ecs, {mediaUrl, cluster: normalisedEnv.cluster, taskArn: err.taskArn, tasksTableName}) :
stopTranscription(ecs, {mediaUrl, cluster: normalisedEnv.cluster, taskArn: err.taskArn, tasksTableName})
.then(() => console.log(`${err.taskArn} stopped`)) :
Promise.resolve({});

return stop.then(() => unrecoverableError(update, {tasksTableName, mediaUrl}));
Expand Down
9 changes: 4 additions & 5 deletions src/backend/functions/orchestrator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
"build": "rm -rf dist && rsync -avz . dist/ --exclude 'node_modules' --exclude 'test'",
"test": "mocha --recursive"
},
"dependencies": {
"aws-sdk": "2.369.0",
"ramda": "0.26.1"
},
"dependencies": {},
"devDependencies": {
"aws-sdk": "2.369.0",
"chai": "4.2.0",
"mocha": "6.2.0",
"mocked-env": "^1.3.1",
"mocked-env": "1.3.1",
"ramda": "0.26.1",
"rewire": "4.0.1",
"sinon": "7.3.2"
}
Expand Down
53 changes: 5 additions & 48 deletions src/backend/functions/orchestrator/test/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,57 +214,23 @@ describe('lib/index.js', () => {
})
});

it('should restart transcription when ERROR state received', () => {
const runTaskStub = sinon.stub().returns({
promise: () => Promise.resolve({
tasks: [{
taskArn: 'taskArn'
}]
})
});

it('should transition to WAITING state when ERROR state received', () => {
const updateStub = sinon.stub().returns({promise: () => Promise.resolve('yay')});

const expectedTaskParams = {
taskDefinition: 'transcriber',
cluster:'MyCluster',
launchType: 'FARGATE',
networkConfiguration: {
awsvpcConfiguration: {
subnets: ['subnet1', 'subnet2'],
assignPublicIp: 'DISABLED'
}
},
overrides: {
containerOverrides: [
{
name: 'transcriber',
environment: [
{
name: 'MEDIA_URL',
value: 'https://foo.bar/foo'
}
]
}
]
}
};

const expectedUpdateParams = {
TableName: 'MediaAnalysisTasks',
Key: {
MediaUrl: 'https://foo.bar/foo'
},
UpdateExpression: 'ADD Retries :val SET TaskStatus = :status, TaskArn = :task',
UpdateExpression: 'ADD Retries :val SET TaskStatus = :status',
ExpressionAttributeValues: {
':val': 1,
':status': 'WAITING',
':task': 'taskArn'
':status': 'WAITING'
},
ReturnValues: 'ALL_NEW'
};

const handler = index({runTask: runTaskStub}, {update: updateStub}, {
const handler = index({}, {update: updateStub}, {
TASKS_TABLE_NAME: 'MediaAnalysisTasks',
CLUSTER: 'MyCluster',
RETRY_THRESHOLD: '3',
Expand All @@ -274,20 +240,11 @@ describe('lib/index.js', () => {

return handler(errorEvent, {})
.then(() => {
sinon.assert.calledWith(runTaskStub, expectedTaskParams);
sinon.assert.calledWith(updateStub, expectedUpdateParams);
})
});

it('should not restart transcription when ERROR state received after 3 retries', () => {
const runTaskStub = sinon.stub().returns({
promise: () => Promise.resolve({
tasks: [{
taskArn: 'taskArn'
}]
})
});

const updateStub = sinon.stub()
.returns({promise: () => Promise.resolve({
Attributes: {
Expand All @@ -312,7 +269,7 @@ describe('lib/index.js', () => {
ReturnValues: 'ALL_NEW'
};

const handler = index({runTask: runTaskStub}, {update: updateStub}, {
const handler = index({}, {update: updateStub}, {
TASKS_TABLE_NAME: 'MediaAnalysisTasks',
CLUSTER: 'MyCluster',
RETRY_THRESHOLD: '3',
Expand Down
1 change: 1 addition & 0 deletions src/backend/transcriber/transcriber.iml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</content>
<orderEntry type="jdk" jdkName="1.8" jdkType="JavaSDK" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.25" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.12.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-core:2.12.1" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-simple:1.7.26" level="project" />
Expand Down

0 comments on commit 328cdb8

Please sign in to comment.