Skip to content

Commit

Permalink
Update queue counters on dropping a document
Browse files Browse the repository at this point in the history
closes #33
  • Loading branch information
marcopeg committed Mar 15, 2021
1 parent 03a0891 commit 2d7f1dc
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 34 deletions.
44 changes: 35 additions & 9 deletions ssr/feature/api-v1/routes/v1-document-drop.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
const schema = require('./v1-document-drop.schema');

const getMetricsToDecrease = ({ status_prev, version_prev }) => {
const queries = ['cnt', `v${version_prev}`];

switch (status_prev) {
case 3:
queries.push('cpl');
break;
case 2:
queries.push('act');
break;
case 1:
queries.push('pnd');
break;
case 0:
queries.push('pln');
break;
case -1:
queries.push('kll');
break;
}

return queries;
};

const buildDecrementSql = (queueName) => (metric) =>
`SELECT FROM "fetchq"."metric_log_decrement"('${queueName}', '${metric}', 1);`;

const v1QueueDocumentDrop = {
method: 'POST',
url: '/api/v1/queues/:name/drop/:subject',
Expand All @@ -11,8 +38,10 @@ const v1QueueDocumentDrop = {
try {
const _sql = `
DELETE FROM "fetchq_data"."${params.name}__docs"
WHERE "subject" = '${params.subject}';
WHERE "subject" = '${params.subject}'
RETURNING status AS status_prev, version AS version_prev;
`;

const res = await fetchq.pool.query(_sql);

// Handle subject or queue not existing
Expand All @@ -27,18 +56,15 @@ const v1QueueDocumentDrop = {
});
}

// Log decrement of the amount of items in the queue
// this is useful to keep the pagination correct.
const _sqlCnt = `SELECT FROM "fetchq"."metric_log_decrement"('${params.name}', 'cnt', 1)`;
await fetchq.pool.query(_sqlCnt);

// TODO: it should look into the status of the deleted document
// and update counters accordingly.
// Update the metrics according to the previous status of the document:
const toSql = buildDecrementSql(params.name);
const _sqlMetrics = getMetricsToDecrease(res.rows[0]).map(toSql).join('');
_sqlMetrics && (await fetchq.pool.query(_sqlMetrics));

reply.send({
success: true,
data: {
_sql,
_sql: [_sql, ..._sqlMetrics].join('\n'),
},
});
} catch (err) {
Expand Down
119 changes: 119 additions & 0 deletions ssr/feature/api-v1/routes/v1-document-drop.test.e2e.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
const getQueueMetrics = async (queue) => {
const _sqlCnt = `SELECT * FROM fetchq.metric_get('${queue}')`;
const res = await global.query(_sqlCnt);
return res.rows.reduce(
(acc, curr) => ({
...acc,
[curr.metric]: Number(curr.current_value),
}),
{},
);
};

describe('v1QueueDocumentDrop', () => {
beforeEach(global.resetSchema);

Expand Down Expand Up @@ -55,4 +67,111 @@ describe('v1QueueDocumentDrop', () => {
expect(calls[0][0].response.status).toBe(404);
expect(calls[0][0].response.data.success).toBe(false);
});

it('should update counters from ACTIVE status', async () => {
await global.query(`SELECT FROM fetchq.queue_create('q1')`);
await global.query(`SELECT FROM fetchq.queue_set_max_attempts('q1', 1)`);
await global.query(`SELECT FROM fetchq.doc_push('q1', 's1')`);
await global.query(`SELECT FROM fetchq.doc_pick('q1', 0, 1, '1y')`);
await global.query(`SELECT FROM fetchq.mnt()`);

const m1 = await getQueueMetrics('q1');
expect(m1).toEqual({ act: 1, cnt: 1, ent: 1, pkd: 1, pnd: 0, v0: 1 });

await global.post('/api/v1/queues/q1/drop/s1');
await global.query(`SELECT FROM fetchq.mnt()`);

const m2 = await getQueueMetrics('q1');
expect(m2).toEqual({ act: 0, cnt: 0, ent: 1, pkd: 1, pnd: 0, v0: 0 });
});

it('should update counters from PLANNED status', async () => {
await global.query(`SELECT FROM fetchq.queue_create('q1')`);
await global.query(`SELECT FROM fetchq.queue_set_max_attempts('q1', 1)`);
await global.query(
`SELECT FROM fetchq.doc_push('q1', 's1', 0, 0, NOW() + INTERVAL '1m', '{}')`,
);
await global.query(`SELECT FROM fetchq.mnt()`);

const m1 = await getQueueMetrics('q1');
expect(m1).toEqual({ cnt: 1, ent: 1, pln: 1, v0: 1 });

await global.post('/api/v1/queues/q1/drop/s1');
await global.query(`SELECT FROM fetchq.mnt()`);

const m2 = await getQueueMetrics('q1');
expect(m2).toEqual({ cnt: 0, ent: 1, pln: 0, v0: 0 });
});

it('should update counters from COMPLETED status', async () => {
await global.query(`SELECT FROM fetchq.queue_create('q1')`);
await global.query(`SELECT FROM fetchq.queue_set_max_attempts('q1', 1)`);
await global.query(`SELECT FROM fetchq.doc_push('q1', 's1')`);
await global.query(`SELECT FROM fetchq.doc_pick('q1', 0, 1, '1y')`);
await global.query(`SELECT FROM fetchq.doc_complete('q1', 's1')`);
await global.query(`SELECT FROM fetchq.mnt()`);

const m1 = await getQueueMetrics('q1');
expect(m1).toEqual({
act: 0,
cnt: 1,
cpl: 1,
ent: 1,
pkd: 1,
pnd: 0,
prc: 1,
v0: 1,
});

await global.post('/api/v1/queues/q1/drop/s1');
await global.query(`SELECT FROM fetchq.mnt()`);

const m2 = await getQueueMetrics('q1');
expect(m2).toEqual({
act: 0,
cnt: 0,
cpl: 0,
ent: 1,
pkd: 1,
pnd: 0,
prc: 1,
v0: 0,
});
});

it('should update counters from KILLED status', async () => {
await global.query(`SELECT FROM fetchq.queue_create('q1')`);
await global.query(`SELECT FROM fetchq.queue_set_max_attempts('q1', 1)`);
await global.query(`SELECT FROM fetchq.doc_push('q1', 's1')`);
await global.query(`SELECT FROM fetchq.doc_pick('q1', 0, 1, '1y')`);
await global.query(`SELECT FROM fetchq.doc_kill('q1', 's1')`);
await global.query(`SELECT FROM fetchq.mnt()`);

const m1 = await getQueueMetrics('q1');
expect(m1).toEqual({
act: 0,
cnt: 1,
kll: 1,
ent: 1,
pkd: 1,
pnd: 0,
prc: 1,
v0: 1,
});

await global.post('/api/v1/queues/q1/drop/s1');
await global.query(`SELECT FROM fetchq.mnt()`);

const m2 = await getQueueMetrics('q1');
expect(m2).toEqual({
act: 0,
cnt: 0,
kll: 0,
ent: 1,
pkd: 1,
pnd: 0,
prc: 1,
v0: 0,
});
});
});
57 changes: 32 additions & 25 deletions ssr/feature/api-v1/routes/v1-document-play.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
const schema = require('./v1-document-play.schema');

const getUpdateMetricsSql = (queueName, prevStatus) => {
if (prevStatus === 2) {
return `
SELECT FROM "fetchq"."metric_log_increment"('${queueName}', 'pnd', 1);
SELECT FROM "fetchq"."metric_log_decrement"('${queueName}', 'act', 1);
`;
}
if (prevStatus === 0) {
return `
SELECT FROM "fetchq"."metric_log_increment"('${queueName}', 'pnd', 1);
SELECT FROM "fetchq"."metric_log_decrement"('${queueName}', 'pln', 1);
`;
}
if (prevStatus === 3) {
return `
SELECT FROM "fetchq"."metric_log_increment"('${queueName}', 'pnd', 1);
SELECT FROM "fetchq"."metric_log_decrement"('${queueName}', 'cpl', 1);
`;
}
if (prevStatus === -1) {
return `
SELECT FROM "fetchq"."metric_log_increment"('${queueName}', 'pnd', 1);
SELECT FROM "fetchq"."metric_log_decrement"('${queueName}', 'kll', 1);
`;
const getUpdateMetricsSql = ({ status_prev }) => {
const increment = [];
const decrement = [];

switch (status_prev) {
case 3:
increment.push('pnd');
decrement.push('cpl');
break;
case 2:
increment.push('pnd');
decrement.push('act');
break;
case 0:
increment.push('pnd');
decrement.push('pln');
break;
case -1:
increment.push('pnd');
decrement.push('kll');
break;
}

return [increment, decrement];
};

const buildMetricSql = (queueName, action) => (metric) =>
`SELECT FROM "fetchq"."metric_log_${action}"('${queueName}', '${metric}', 1);`;

/**
* POST://api/v1/queues/:name/play/:subject
*/
Expand Down Expand Up @@ -71,7 +73,12 @@ const v1QueueDocumentPlay = {
const doc = res.rows[0];

// Update the metrics according to the previous status of the document:
const _sqlMetrics = getUpdateMetricsSql(params.name, doc.status_prev);
const [increment, decrement] = getUpdateMetricsSql(doc);
const _sqlMetrics = [
...increment.map(buildMetricSql(params.name, 'increment')),
...decrement.map(buildMetricSql(params.name, 'decrement')),
].join('');

_sqlMetrics && (await fetchq.pool.query(_sqlMetrics));

reply.send({
Expand Down

0 comments on commit 2d7f1dc

Please sign in to comment.