diff --git a/src/backend/functions/orchestrator/lib/index.js b/src/backend/functions/orchestrator/lib/index.js index c5d929e..02ae478 100644 --- a/src/backend/functions/orchestrator/lib/index.js +++ b/src/backend/functions/orchestrator/lib/index.js @@ -71,7 +71,10 @@ const startTranscription = R.curry((ecs, {cluster, taskName, tasksTableName, med return ecs.runTask(params) .promise() - .then(({tasks}) => ({tasksTableName, mediaUrl, taskArn: tasks[0].taskArn})) + .then(({tasks}) => { + console.log(`Task started: ${tasks[0].taskArn}`); + return {tasksTableName, mediaUrl, taskArn: tasks[0].taskArn} + }) }); // startTranscription :: ({k: v} -> Promise {k: v}) -> {k: v} -> Promise {k: v} @@ -117,45 +120,41 @@ function createDeleteParams({mediaUrl, tasksTableName}) { // terminated :: ({k:v} -> Promise {k: v}) -> {k: v} -> {k: v} -> Promise {k:v} const terminated = R.curry(deleteItem => o(deleteItem, createDeleteParams)); -// errorUpdate :: ({k: v} -> Promise {k: v}) -> {k: v} -> Promise {k: v} -const errorUpdate = R.curry((updateItem, {tasksTableName, mediaUrl, taskArn}) => { +const unrecoverableError = R.curry((updateItem, {tasksTableName, mediaUrl}) => { const params = { TableName: tasksTableName, Key: { MediaUrl: mediaUrl }, - UpdateExpression: 'ADD Retries :val SET TaskStatus = :status, TaskArn = :task', + UpdateExpression: 'SET TaskStatus = :status', ExpressionAttributeValues: { - ':val': 1, - ':status': 'WAITING', - ':task': taskArn + ':status': 'UNRECOVERABLE_ERROR' }, ReturnValues: 'ALL_NEW' }; - return updateItem(params) - .then(({Attributes}) => ({...Attributes, tasksTableName})) + return updateItem(params); }); -const unrecoverableError = R.curry((updateItem, {tasksTableName, mediaUrl}) => { +// error :: ({k: v} -> Promise {k: v}) -> {k: v} -> Promise {k: v} +const error = R.curry((updateItem, {tasksTableName, mediaUrl}) => { const params = { TableName: tasksTableName, Key: { MediaUrl: mediaUrl }, - UpdateExpression: 'SET TaskStatus = :status', + UpdateExpression: 'ADD Retries :val SET TaskStatus = :status', ExpressionAttributeValues: { - ':status': 'UNRECOVERABLE_ERROR' + ':val': 1, + ':status': 'WAITING' }, ReturnValues: 'ALL_NEW' }; - return updateItem(params); + return updateItem(params) + .then(({Attributes}) => ({...Attributes, tasksTableName})) }); -// error :: AWS.ECS -> AWS.DynamoDB -> {k: v} -> Promise {k:v} -const error = R.curry((ecs, updateItem) => composeP(errorUpdate(updateItem), startTranscription(ecs))); - function isUnrecoverableError(retries, retryThreshold, taskStatus) { return retries > retryThreshold && !R.includes(taskStatus, ['TERMINATING', 'TERMINATED']); } @@ -170,7 +169,7 @@ module.exports = (ecs, ddb, env) => { PROCESSING: noop, TERMINATING: terminating(ecs), TERMINATED: terminated(params => ddb.delete(params).promise()), - ERROR: error(ecs, update), + ERROR: error(update), UNRECOVERABLE_ERROR: noop }; @@ -185,13 +184,15 @@ module.exports = (ecs, ddb, env) => { const params = R.mergeRight(normalisedEnv, convertImage(image)); - const {mediaUrl, retries = 0, taskStatus, tasksTableName, retryThreshold} = params; + const {mediaUrl, retries = 0, taskStatus, tasksTableName, retryThreshold = '3'} = params; + + console.log(`Status: ${taskStatus}`); - console.log('Status: ' + taskStatus); + if(taskStatus === 'UNRECOVERABLE_ERROR') return Promise.resolve({}); if(isUnrecoverableError(retries, R.defaultTo(3, parseInt(retryThreshold)), taskStatus)) { - console.log('Error threshold exceeded'); - return unrecoverableError(update, {mediaUrl, tasksTableName}); + console.log(`Error threshold exceeded. Retries: ${retries}`); + return unrecoverableError(update, {mediaUrl, tasksTableName}) } const handler = R.defaultTo(noop, handlers[taskStatus]); @@ -205,7 +206,8 @@ module.exports = (ecs, ddb, env) => { const {mediaUrl, tasksTableName} = err; const stop = err.taskArn != null ? - stopTranscription(ecs, {mediaUrl, cluster: normalisedEnv.cluster, taskArn: err.taskArn, tasksTableName}) : + stopTranscription(ecs, {mediaUrl, cluster: normalisedEnv.cluster, taskArn: err.taskArn, tasksTableName}) + .then(() => console.log(`${err.taskArn} stopped`)) : Promise.resolve({}); return stop.then(() => unrecoverableError(update, {tasksTableName, mediaUrl})); diff --git a/src/backend/functions/orchestrator/package.json b/src/backend/functions/orchestrator/package.json index 7c4c7f0..2937712 100644 --- a/src/backend/functions/orchestrator/package.json +++ b/src/backend/functions/orchestrator/package.json @@ -8,14 +8,13 @@ "build": "rm -rf dist && rsync -avz . dist/ --exclude 'node_modules' --exclude 'test'", "test": "mocha --recursive" }, - "dependencies": { - "aws-sdk": "2.369.0", - "ramda": "0.26.1" - }, + "dependencies": {}, "devDependencies": { + "aws-sdk": "2.369.0", "chai": "4.2.0", "mocha": "6.2.0", - "mocked-env": "^1.3.1", + "mocked-env": "1.3.1", + "ramda": "0.26.1", "rewire": "4.0.1", "sinon": "7.3.2" } diff --git a/src/backend/functions/orchestrator/test/lib/index.js b/src/backend/functions/orchestrator/test/lib/index.js index 2f3d179..daffd2a 100644 --- a/src/backend/functions/orchestrator/test/lib/index.js +++ b/src/backend/functions/orchestrator/test/lib/index.js @@ -214,57 +214,23 @@ describe('lib/index.js', () => { }) }); - it('should restart transcription when ERROR state received', () => { - const runTaskStub = sinon.stub().returns({ - promise: () => Promise.resolve({ - tasks: [{ - taskArn: 'taskArn' - }] - }) - }); - + it('should transition to WAITING state when ERROR state received', () => { const updateStub = sinon.stub().returns({promise: () => Promise.resolve('yay')}); - const expectedTaskParams = { - taskDefinition: 'transcriber', - cluster:'MyCluster', - launchType: 'FARGATE', - networkConfiguration: { - awsvpcConfiguration: { - subnets: ['subnet1', 'subnet2'], - assignPublicIp: 'DISABLED' - } - }, - overrides: { - containerOverrides: [ - { - name: 'transcriber', - environment: [ - { - name: 'MEDIA_URL', - value: 'https://foo.bar/foo' - } - ] - } - ] - } - }; - const expectedUpdateParams = { TableName: 'MediaAnalysisTasks', Key: { MediaUrl: 'https://foo.bar/foo' }, - UpdateExpression: 'ADD Retries :val SET TaskStatus = :status, TaskArn = :task', + UpdateExpression: 'ADD Retries :val SET TaskStatus = :status', ExpressionAttributeValues: { ':val': 1, - ':status': 'WAITING', - ':task': 'taskArn' + ':status': 'WAITING' }, ReturnValues: 'ALL_NEW' }; - const handler = index({runTask: runTaskStub}, {update: updateStub}, { + const handler = index({}, {update: updateStub}, { TASKS_TABLE_NAME: 'MediaAnalysisTasks', CLUSTER: 'MyCluster', RETRY_THRESHOLD: '3', @@ -274,20 +240,11 @@ describe('lib/index.js', () => { return handler(errorEvent, {}) .then(() => { - sinon.assert.calledWith(runTaskStub, expectedTaskParams); sinon.assert.calledWith(updateStub, expectedUpdateParams); }) }); it('should not restart transcription when ERROR state received after 3 retries', () => { - const runTaskStub = sinon.stub().returns({ - promise: () => Promise.resolve({ - tasks: [{ - taskArn: 'taskArn' - }] - }) - }); - const updateStub = sinon.stub() .returns({promise: () => Promise.resolve({ Attributes: { @@ -312,7 +269,7 @@ describe('lib/index.js', () => { ReturnValues: 'ALL_NEW' }; - const handler = index({runTask: runTaskStub}, {update: updateStub}, { + const handler = index({}, {update: updateStub}, { TASKS_TABLE_NAME: 'MediaAnalysisTasks', CLUSTER: 'MyCluster', RETRY_THRESHOLD: '3', diff --git a/src/backend/transcriber/transcriber.iml b/src/backend/transcriber/transcriber.iml index 6f1dba4..6c41b0e 100644 --- a/src/backend/transcriber/transcriber.iml +++ b/src/backend/transcriber/transcriber.iml @@ -12,6 +12,7 @@ +