Skip to content

Commit fd54c51

Browse files
committed
[batch] Compact And Drop Records from job_group_inst_coll_cancellable_resources
Resolves: #14623
1 parent cb4c95b commit fd54c51

File tree

1 file changed

+126
-0
lines changed

1 file changed

+126
-0
lines changed

batch/batch/driver/main.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,6 +1492,130 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data
14921492
)
14931493

14941494

1495+
async def compact_job_group_cancellable_resources_records(db: Database):
1496+
keyfields = [
1497+
'batch_id',
1498+
'update_id',
1499+
'job_group_id',
1500+
'inst_coll',
1501+
]
1502+
1503+
rowfields = [
1504+
*keyfields,
1505+
'token',
1506+
'n_creating_cancellable_jobs',
1507+
'n_ready_cancellable_jobs',
1508+
'n_running_cancellable_jobs',
1509+
'ready_cancellable_cores_mcpu',
1510+
'running_cancellable_cores_mcpu',
1511+
]
1512+
1513+
@transaction(db)
1514+
async def compact(tx: Transaction, record: dict):
1515+
await tx.just_execute(
1516+
f"""\
1517+
DELETE FROM job_group_inst_coll_cancellable_resources
1518+
WHERE {' AND '.join([f'{k} = %s' for k in keyfields])};
1519+
""",
1520+
[record[k] for k in keyfields],
1521+
)
1522+
1523+
await tx.execute_insertone(
1524+
f"""\
1525+
INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)})
1526+
VALUES ({','.join(['%s' for _ in rowfields])});
1527+
""",
1528+
[record[k] for k in rowfields],
1529+
)
1530+
1531+
keys = ','.join([f'R.{k}' for k in keyfields])
1532+
targets = db.execute_and_fetchall(
1533+
f"""\
1534+
SELECT R.*
1535+
FROM job_groups AS G
1536+
LEFT JOIN LATERAL (
1537+
SELECT C.id FROM job_groups_cancelled AS C
1538+
INNER JOIN job_group_self_and_ancestors AS D
1539+
ON C.id = D.batch_id
1540+
AND C.job_group_id = D.ancestor_id
1541+
WHERE D.batch_id = G.batch_id
1542+
AND D.job_group_id = G.job_group_id
1543+
) AS C ON TRUE
1544+
INNER JOIN LATERAL (
1545+
SELECT {keys}
1546+
, SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs
1547+
, SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs
1548+
, SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs
1549+
, SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu
1550+
, SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu
1551+
FROM job_group_inst_coll_cancellable_resources AS R
1552+
WHERE R.batch_id = G.batch_id
1553+
AND R.job_group_id = G.job_group_id
1554+
GROUP BY {keys}
1555+
HAVING COUNT(*) > 1
1556+
) AS R ON TRUE
1557+
WHERE G.time_completed IS NOT NULL
1558+
AND C.id IS NULL
1559+
LIMIT 1000;
1560+
""",
1561+
query_name='find_finished_cancellable_resources_records_to_compact',
1562+
)
1563+
1564+
async for target in targets:
1565+
await compact({**target, 'token': 0})
1566+
1567+
1568+
async def delete_dead_job_group_cancellable_resources_records(db: Database):
1569+
keyfields = [
1570+
'batch_id',
1571+
'update_id',
1572+
'job_group_id',
1573+
'inst_coll',
1574+
]
1575+
1576+
keys = ','.join([f'R.{k}' for k in keyfields])
1577+
targets = db.execute_and_fetchall(
1578+
f"""\
1579+
SELECT {keys}
1580+
FROM job_groups AS G
1581+
LEFT JOIN LATERAL (
1582+
SELECT C.id FROM job_groups_cancelled AS C
1583+
INNER JOIN job_group_self_and_ancestors AS D
1584+
ON C.id = D.batch_id
1585+
AND C.job_group_id = D.ancestor_id
1586+
WHERE D.batch_id = G.batch_id
1587+
AND D.job_group_id = G.job_group_id
1588+
) AS C ON TRUE
1589+
INNER JOIN LATERAL (
1590+
SELECT {keys}
1591+
FROM job_group_inst_coll_cancellable_resources AS R
1592+
WHERE R.batch_id = G.batch_id
1593+
AND R.job_group_id = G.job_group_id
1594+
GROUP BY {keys}
1595+
HAVING COUNT(*) = 1
1596+
AND MAX(R.n_creating_cancellable_jobs) = 0
1597+
AND MAX(R.n_ready_cancellable_jobs) = 0
1598+
AND MAX(R.n_running_cancellable_jobs) = 0
1599+
AND MAX(R.ready_cancellable_cores_mcpu) = 0
1600+
AND MAX(R.running_cancellable_cores_mcpu) = 0
1601+
) AS R ON TRUE
1602+
WHERE G.time_completed IS NOT NULL
1603+
AND C.id IS NULL
1604+
LIMIT 1000;
1605+
""",
1606+
query_name='find_dead_cancellable_resources_records_to_delete',
1607+
)
1608+
1609+
async for target in targets:
1610+
await db.just_execute(
1611+
f"""\
1612+
DELETE FROM job_group_inst_coll_cancellable_resources
1613+
WHERE {' AND '.join([f'{k} = %s' for k in keyfields])};
1614+
""",
1615+
[target[k] for k in keyfields],
1616+
)
1617+
1618+
14951619
async def compact_agg_billing_project_users_table(app, db: Database):
14961620
if not app['feature_flags']['compact_billing_tables']:
14971621
return
@@ -1754,6 +1878,8 @@ async def close_and_wait():
17541878
task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db))
17551879
task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db))
17561880
task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db))
1881+
task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, db))
1882+
task_manager.ensure_future(periodically_call(60, delete_dead_job_group_cancellable_resources_records, db))
17571883

17581884

17591885
async def on_cleanup(app):

0 commit comments

Comments
 (0)