Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,130 @@
)


async def compact_job_group_cancellable_resources_records(db: Database):
keyfields = [
'batch_id',
'update_id',
'job_group_id',
'inst_coll',
]

rowfields = [
*keyfields,
'token',
'n_creating_cancellable_jobs',
'n_ready_cancellable_jobs',
'n_running_cancellable_jobs',
'ready_cancellable_cores_mcpu',
'running_cancellable_cores_mcpu',
]

@transaction(db)
async def compact(tx: Transaction, record: dict):
await tx.just_execute(
f"""\

Check warning on line 1516 in batch/batch/driver/main.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

batch/batch/driver/main.py#L1516

Possible SQL injection vector through string-based query construction.
DELETE FROM job_group_inst_coll_cancellable_resources
WHERE {' AND '.join([f'{k} = %s' for k in keyfields])};
""",
[record[k] for k in keyfields],
)

await tx.execute_insertone(
f"""\
INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)})
VALUES ({','.join(['%s' for _ in rowfields])});
""",
[record[k] for k in rowfields],
)

keys = ','.join([f'R.{k}' for k in keyfields])
targets = db.execute_and_fetchall(
f"""\
SELECT R.*
FROM job_groups AS G
LEFT JOIN LATERAL (
SELECT C.id FROM job_groups_cancelled AS C
INNER JOIN job_group_self_and_ancestors AS D
ON C.id = D.batch_id
AND C.job_group_id = D.ancestor_id
WHERE D.batch_id = G.batch_id
AND D.job_group_id = G.job_group_id
) AS C ON TRUE
INNER JOIN LATERAL (
SELECT {keys}
, SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs
, SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs
, SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs
, SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu
, SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu
FROM job_group_inst_coll_cancellable_resources AS R
WHERE R.batch_id = G.batch_id
AND R.job_group_id = G.job_group_id
GROUP BY {keys}
HAVING COUNT(*) > 1
) AS R ON TRUE
WHERE G.time_completed IS NOT NULL
AND C.id IS NULL
LIMIT 1000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling out the limit on both queries. Seems other queries also limit to 1000 but not sure where this comes from. Without compacting, the query to find compacted rows takes for ever as it scans through a large chunk of the db. On the other hand, there are millions of rows so reducing this number would make the background task take longer to churn through records. Suggestions?

Copy link
Member Author

@ehigham ehigham Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, my 2-week old prod snapshot has 173561655 rows in job_group_inst_coll_cancellable_resources and 8567769 job_groups. Assuming (incorrectly) instant execution, It'll will take 100 days to churn through the db.

""",
query_name='find_finished_cancellable_resources_records_to_compact',
)

async for target in targets:
await compact({**target, 'token': 0})


async def delete_dead_job_group_cancellable_resources_records(db: Database):
keyfields = [
'batch_id',
'update_id',
'job_group_id',
'inst_coll',
]

keys = ','.join([f'R.{k}' for k in keyfields])
targets = db.execute_and_fetchall(
f"""\
SELECT {keys}
FROM job_groups AS G
LEFT JOIN LATERAL (
SELECT C.id FROM job_groups_cancelled AS C
INNER JOIN job_group_self_and_ancestors AS D
ON C.id = D.batch_id
AND C.job_group_id = D.ancestor_id
WHERE D.batch_id = G.batch_id
AND D.job_group_id = G.job_group_id
) AS C ON TRUE
INNER JOIN LATERAL (
SELECT {keys}
FROM job_group_inst_coll_cancellable_resources AS R
WHERE R.batch_id = G.batch_id
AND R.job_group_id = G.job_group_id
GROUP BY {keys}
HAVING COUNT(*) = 1
AND MAX(R.n_creating_cancellable_jobs) = 0
AND MAX(R.n_ready_cancellable_jobs) = 0
AND MAX(R.n_running_cancellable_jobs) = 0
AND MAX(R.ready_cancellable_cores_mcpu) = 0
AND MAX(R.running_cancellable_cores_mcpu) = 0
) AS R ON TRUE
WHERE G.time_completed IS NOT NULL
AND C.id IS NULL
LIMIT 1000;
""",
query_name='find_dead_cancellable_resources_records_to_delete',
)

async for target in targets:
await db.just_execute(
f"""\
DELETE FROM job_group_inst_coll_cancellable_resources
WHERE {' AND '.join([f'{k} = %s' for k in keyfields])};
""",
[target[k] for k in keyfields],
)


async def compact_agg_billing_project_users_table(app, db: Database):
if not app['feature_flags']['compact_billing_tables']:
return
Expand Down Expand Up @@ -1754,6 +1878,8 @@
task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db))
task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db))
task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db))
task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, db))
task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, db))


async def on_cleanup(app):
Expand Down