diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index ac6f99d756e..b8520443cae 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1492,6 +1492,90 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data ) +async def compact_job_group_cancellable_resources_records(app, db: Database): + if not app['feature_flags']['compact_billing_tables']: + return + + keyfields = [ + 'batch_id', + 'update_id', + 'job_group_id', + 'inst_coll', + ] + + rowfields = [ + *keyfields, + 'token', + 'n_creating_cancellable_jobs', + 'n_ready_cancellable_jobs', + 'n_running_cancellable_jobs', + 'ready_cancellable_cores_mcpu', + 'running_cancellable_cores_mcpu', + ] + + @transaction(db) + async def compact(tx: Transaction, record: dict): + await tx.just_execute( + """\ +DELETE FROM job_group_inst_coll_cancellable_resources +WHERE batch_id = %s, update_id = %s, job_group_id = %s, inst_coll = %s +""", + (record[k] for k in keyfields), + ) + + await tx.execute_insertone( + f"""\ +INSERT INTO job_group_inst_coll_cancellable_resources ({','.join(rowfields)}) +VALUES ({','.join(['%s' for _ in rowfields])}), +""", + [{**record, 'token': 0}[k] for k in rowfields], + ) + + targets = db.execute_and_fetchall( + """\ +SELECT R.* +FROM job_groups AS G +INNER JOIN job_group_self_and_ancestors AS D + ON G.batch_id = D.batch_id + AND G.job_group_id = D.job_group_id +LEFT JOIN job_groups_cancelled AS C + ON C.id = G.batch_id + AND C.job_group_id = D.ancestor_id +INNER JOIN LATERAL ( + SELECT R.batch_id + , R.update_id + , R.job_group_id + , R.inst_coll + , SUM(R.n_creating_cancellable_jobs) AS n_creating_cancellable_jobs + , SUM(R.n_ready_cancellable_jobs) AS n_ready_cancellable_jobs + , SUM(R.n_running_cancellable_jobs) AS n_running_cancellable_jobs + , SUM(R.ready_cancellable_cores_mcpu) AS ready_cancellable_cores_mcpu + , SUM(R.running_cancellable_cores_mcpu) AS running_cancellable_cores_mcpu + , COUNT(*) as `count` + FROM job_group_inst_coll_cancellable_resources AS R + WHERE R.batch_id = G.batch_id + AND R.job_group_id = G.job_group_id + GROUP BY R.batch_id + , R.update_id + , R.job_group_id + , R.inst_coll + ORDER BY R.batch_id ASC + , R.update_id ASC + , R.job_group_id ASC + , R.inst_coll ASC +) AS R ON TRUE +WHERE G.time_completed IS NOT NULL + AND C.id IS NULL + AND R.`count` > 1 +LIMIT 1000 +""", + query_name='find_finished_cancellable_resources_records_to_compact', + ) + + async for target in targets: + await compact(target) + + async def compact_agg_billing_project_users_table(app, db: Database): if not app['feature_flags']['compact_billing_tables']: return @@ -1754,6 +1838,7 @@ async def close_and_wait(): task_manager.ensure_future(periodically_call(60, compact_agg_billing_project_users_by_date_table, app, db)) task_manager.ensure_future(periodically_call(60, delete_committed_job_groups_inst_coll_staging_records, db)) task_manager.ensure_future(periodically_call(60, delete_prev_cancelled_job_group_cancellable_resources_records, db)) + task_manager.ensure_future(periodically_call(60, compact_job_group_cancellable_resources_records, app, db)) async def on_cleanup(app):