Skip to content

Commit

Permalink
[batch] Compact And Drop Records from `job_group_inst_coll_cancellabl…
Browse files Browse the repository at this point in the history
…e_resources`

Resolves: hail-is#14623
  • Loading branch information
ehigham committed Jul 30, 2024
1 parent ee6d17e commit fd1b7eb
Showing 1 changed file with 85 additions and 0 deletions.
85 changes: 85 additions & 0 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,90 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data
)


async def compact_job_group_cancellable_resources_records(app, db: Database):
if not app['feature_flags']['compact_billing_tables']:
return

keyfields = [
'batch_id',
'update_id',
'job_group_id',
'inst_coll',
]

rowfields = [
*keyfields,
'token',
'n_creating_cancellable_jobs',
'n_ready_cancellable_jobs',
'n_running_cancellable_jobs',
'ready_cancellable_cores_mcpu',
'running_cancellable_cores_mcpu',
]

@transaction(db)
async def compact(tx: Transaction, record: dict):
await tx.just_execute(
"""\
DELETE FROM job_group_inst_coll_cancellable_resources
WHERE batch_id = %s, update_id = %s, job_group_id = %s, inst_coll = %s
""",
(record[k] for k in keyfields),
)

await tx.execute_insertone(
f"""\
INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)})
VALUES ({','.join(['%s' for _ in rowfields])}),
""",
[{**record, 'token': 0}[k] for k in rowfields],
)

targets = db.execute_and_fetchall(
"""\
SELECT R.*
FROM job_groups AS G
INNER JOIN job_group_self_and_ancestors AS D
ON G.batch_id = D.batch_id
AND G.job_group_id = D.job_group_id
LEFT JOIN job_groups_cancelled AS C
ON C.id = G.batch_id
AND C.job_group_id = D.ancestor_id
INNER JOIN LATERAL (
SELECT R.batch_id
, R.update_id
, R.job_group_id
, R.inst_coll
, SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs
, SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs
, SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs
, SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu
, SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu
, COUNT(*) as `count`
FROM job_group_inst_coll_cancellable_resources AS R
WHERE R.batch_id = G.batch_id
AND R.job_group_id = G.job_group_id
GROUP BY R.batch_id
, R.update_id
, R.job_group_id
, R.inst_coll
ORDER BY R.batch_id ASC
, R.update_id ASC
, R.job_group_id ASC
, R.inst_coll ASC
) AS R ON TRUE
WHERE G.time_completed IS NOT NULL
AND C.id IS NULL
AND R.`count` > 1
LIMIT 1000
""",
query_name='find_finished_cancellable_resources_records_to_compact',
)

async for target in targets:
await compact(target)


async def compact_agg_billing_project_users_table(app, db: Database):
if not app['feature_flags']['compact_billing_tables']:
return
Expand Down Expand Up @@ -1754,6 +1838,7 @@ async def close_and_wait():
task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db))
task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db))
task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db))
task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, app, db))


async def on_cleanup(app):
Expand Down

0 comments on commit fd1b7eb

Please sign in to comment.