Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] A procedure to rename job_groups_cancelled.id -> job_groups_cancelled.batch_id #14672

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions batch/batch/driver/canceller.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str,
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -204,7 +204,7 @@ async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[st
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -311,7 +311,7 @@ async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down
2 changes: 1 addition & 1 deletion batch/batch/driver/instance_collection/job_private.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down
4 changes: 2 additions & 2 deletions batch/batch/driver/instance_collection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def regions_to_ready_cores_mcpu_from_estimated_job_queue(self) -> List[Tup
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE jobs.batch_id = job_group_self_and_ancestors.batch_id AND
jobs.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -622,7 +622,7 @@ async def user_runnable_jobs(user):
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down
6 changes: 3 additions & 3 deletions batch/batch/driver/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe
SELECT batches.*,
cost_t.cost,
cost_t.cost_breakdown,
job_groups_cancelled.id IS NOT NULL AS cancelled,
job_groups_cancelled.batch_id IS NOT NULL AS cancelled,
job_groups_n_jobs_in_complete_states.n_completed,
job_groups_n_jobs_in_complete_states.n_succeeded,
job_groups_n_jobs_in_complete_states.n_failed,
Expand All @@ -56,7 +56,7 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe
GROUP BY batch_id
) AS cost_t ON TRUE
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
ON batches.id = job_groups_cancelled.batch_id
WHERE batches.id = %s AND NOT deleted AND callback IS NOT NULL AND
batches.`state` = 'complete';
""",
Expand Down Expand Up @@ -123,7 +123,7 @@ async def notify_job_group_on_job_complete(
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors AS self_and_ancestors
INNER JOIN job_groups_cancelled
ON self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE self_and_ancestors.batch_id = job_group_self_and_ancestors.batch_id AND
self_and_ancestors.job_group_id = job_group_self_and_ancestors.ancestor_id
Expand Down
6 changes: 3 additions & 3 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ async def check(tx):
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -1312,7 +1312,7 @@ async def cancel_fast_failing_job_groups(app):
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -1471,7 +1471,7 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data
1
FROM job_group_self_and_ancestors AS descendant
INNER JOIN job_groups_cancelled AS cancelled
ON descendant.batch_id = cancelled.id
ON descendant.batch_id = cancelled.batch_id
AND descendant.ancestor_id = cancelled.job_group_id
WHERE descendant.batch_id = group_resources.batch_id
AND descendant.job_group_id = group_resources.job_group_id
Expand Down
24 changes: 12 additions & 12 deletions batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ async def _create_job_group(
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_group_self_and_ancestors.batch_id = %s AND job_group_self_and_ancestors.job_group_id = %s;
""",
Expand Down Expand Up @@ -1857,10 +1857,10 @@ async def update(tx: Transaction):
SELECT cancelled_t.cancelled IS NOT NULL AS cancelled
FROM batches
LEFT JOIN (
SELECT id, 1 AS cancelled
SELECT batch_id, 1 AS cancelled
FROM job_groups_cancelled
WHERE id = %s AND job_group_id = %s
) AS cancelled_t ON batches.id = cancelled_t.id
WHERE batch_id = %s AND job_group_id = %s
) AS cancelled_t ON batches.id = cancelled_t.batch_id
WHERE batches.id = %s AND batches.user = %s AND NOT deleted
FOR UPDATE;
""",
Expand Down Expand Up @@ -1936,10 +1936,10 @@ async def _get_batch(app, batch_id):
LEFT JOIN job_groups_n_jobs_in_complete_states
ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id
LEFT JOIN (
SELECT id, 1 AS cancelled
SELECT batch_id, 1 AS cancelled
FROM job_groups_cancelled
WHERE id = %s AND job_group_id = %s
) AS cancelled_t ON batches.id = cancelled_t.id
WHERE batch_id = %s AND job_group_id = %s
) AS cancelled_t ON batches.id = cancelled_t.batch_id
LEFT JOIN LATERAL (
SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown
FROM (
Expand Down Expand Up @@ -1984,7 +1984,7 @@ async def _get_job_group(app, batch_id: int, job_group_id: int) -> GetJobGroupRe
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -2086,7 +2086,7 @@ async def close_batch(request, userdata):
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -2129,10 +2129,10 @@ async def commit_update(request: web.Request, userdata):
FROM batches
LEFT JOIN batch_updates ON batches.id = batch_updates.batch_id
LEFT JOIN (
SELECT id, 1 AS cancelled
SELECT batch_id, 1 AS cancelled
FROM job_groups_cancelled
WHERE id = %s AND job_group_id = %s
) AS cancelled_t ON batches.id = cancelled_t.id
WHERE batch_id = %s AND job_group_id = %s
) AS cancelled_t ON batches.id = cancelled_t.batch_id
WHERE batches.user = %s AND batches.id = %s AND batch_updates.update_id = %s AND NOT deleted;
""",
(batch_id, ROOT_JOB_GROUP_ID, user, batch_id, update_id),
Expand Down
4 changes: 2 additions & 2 deletions batch/batch/front_end/query/query_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int])
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down Expand Up @@ -171,7 +171,7 @@ def parse_list_job_groups_query_v1(
SELECT 1 AS cancelled
FROM job_group_self_and_ancestors
INNER JOIN job_groups_cancelled
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND
ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND
job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id
WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND
job_groups.job_group_id = job_group_self_and_ancestors.job_group_id
Expand Down
2 changes: 1 addition & 1 deletion batch/batch/front_end/query/query_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def parse_list_batches_query_v2(user: str, q: str, last_batch_id: Optional[int])
ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id
AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id
LEFT JOIN (SELECT *, 1 AS cancelled FROM job_groups_cancelled) AS cancelled_t
ON job_groups.batch_id = cancelled_t.id
ON job_groups.batch_id = cancelled_t.batch_id
AND job_groups.job_group_id = cancelled_t.job_group_id
INNER JOIN LATERAL (
WITH resource_costs AS (
Expand Down
Loading