From 09f081c132717060472265b6a74ed94207c2c198 Mon Sep 17 00:00:00 2001 From: Ivan Date: Fri, 6 Sep 2024 12:07:20 -0400 Subject: [PATCH 1/5] A procedure for renaming a column (non-primary) in job_groups_cancelled table --- .../driver/instance_collection/job_private.py | 2 +- .../batch/driver/instance_collection/pool.py | 4 +- batch/batch/driver/main.py | 4 +- batch/batch/front_end/front_end.py | 6 +- batch/batch/front_end/query/query_v1.py | 4 +- batch/sql/estimated-current.sql | 18 +-- batch/sql/finalize-job-groups.sql | 6 +- .../rename-job-groups-cancelled-column.sql | 103 ++++++++++++++++++ batch/sql/rename-job-groups-tables.sql | 12 +- 9 files changed, 131 insertions(+), 28 deletions(-) create mode 100644 batch/sql/rename-job-groups-cancelled-column.sql diff --git a/batch/batch/driver/instance_collection/job_private.py b/batch/batch/driver/instance_collection/job_private.py index c4907ec590b..414a992739c 100644 --- a/batch/batch/driver/instance_collection/job_private.py +++ b/batch/batch/driver/instance_collection/job_private.py @@ -360,7 +360,7 @@ async def user_runnable_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]: SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/driver/instance_collection/pool.py b/batch/batch/driver/instance_collection/pool.py index 39ff5046b60..24d02a53af5 100644 --- a/batch/batch/driver/instance_collection/pool.py +++ b/batch/batch/driver/instance_collection/pool.py @@ -344,7 +344,7 @@ async def regions_to_ready_cores_mcpu_from_estimated_job_queue(self) -> List[Tup SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE jobs.batch_id = job_group_self_and_ancestors.batch_id AND jobs.job_group_id = job_group_self_and_ancestors.job_group_id @@ -622,7 +622,7 @@ async def user_runnable_jobs(user): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index ac6f99d756e..d9b18c98805 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1034,7 +1034,7 @@ async def check(tx): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -1312,7 +1312,7 @@ async def cancel_fast_failing_job_groups(app): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 7cc6121b187..8827d55f159 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -911,7 +911,7 @@ async def _create_job_group( SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_group_self_and_ancestors.batch_id = %s AND job_group_self_and_ancestors.job_group_id = %s; """, @@ -1984,7 +1984,7 @@ async def _get_job_group(app, batch_id: int, job_group_id: int) -> GetJobGroupRe SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -2086,7 +2086,7 @@ async def close_batch(request, userdata): SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/front_end/query/query_v1.py b/batch/batch/front_end/query/query_v1.py index c5cbefdf2f3..4fa36cbe43a 100644 --- a/batch/batch/front_end/query/query_v1.py +++ b/batch/batch/front_end/query/query_v1.py @@ -106,7 +106,7 @@ def parse_list_batches_query_v1(user: str, q: str, last_batch_id: Optional[int]) SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -171,7 +171,7 @@ def parse_list_job_groups_query_v1( SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/sql/estimated-current.sql b/batch/sql/estimated-current.sql index 3b7fe1e5ac2..20b10cc8151 100644 --- a/batch/sql/estimated-current.sql +++ b/batch/sql/estimated-current.sql @@ -612,7 +612,7 @@ BEGIN SET job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id LOCK IN SHARE MODE); @@ -671,7 +671,7 @@ BEGIN SET cur_job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = OLD.batch_id AND job_group_self_and_ancestors.job_group_id = OLD.job_group_id LOCK IN SHARE MODE); @@ -1107,7 +1107,7 @@ BEGIN SET cur_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id FOR UPDATE); @@ -1256,10 +1256,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -1376,10 +1376,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -1421,10 +1421,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; diff --git a/batch/sql/finalize-job-groups.sql b/batch/sql/finalize-job-groups.sql index 382b720c68a..2acd87262b6 100644 --- a/batch/sql/finalize-job-groups.sql +++ b/batch/sql/finalize-job-groups.sql @@ -10,7 +10,7 @@ BEGIN SET job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id LOCK IN SHARE MODE); @@ -142,7 +142,7 @@ BEGIN SET cur_job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = OLD.batch_id AND job_group_self_and_ancestors.job_group_id = OLD.job_group_id LOCK IN SHARE MODE); @@ -318,7 +318,7 @@ BEGIN SET cur_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id FOR UPDATE); diff --git a/batch/sql/rename-job-groups-cancelled-column.sql b/batch/sql/rename-job-groups-cancelled-column.sql new file mode 100644 index 00000000000..38154f2b6e8 --- /dev/null +++ b/batch/sql/rename-job-groups-cancelled-column.sql @@ -0,0 +1,103 @@ +DELIMITER $$ + +CREATE PROCEDURE IF NOT EXISTS rename_job_groups_cancelled_column( + IN from_column VARCHAR(64), + IN to_column VARCHAR(64) +) +rename_column:BEGIN + DECLARE v_constraint_name VARCHAR(64); + DECLARE v_done INT DEFAULT FALSE; + DECLARE v_error_message TEXT; + DECLARE v_primary_exists INT DEFAULT FALSE; + DECLARE v_sqlstate CHAR(5); + + -- Dynamically fetch the names of contraints related to `from_column` + DECLARE v_cursor CURSOR FOR + SELECT CONSTRAINT_NAME + FROM information_schema.KEY_COLUMN_USAGE + WHERE TABLE_NAME = 'job_groups_cancelled' + AND COLUMN_NAME = from_column; + + -- Rollback handler if something goes wrong + DECLARE EXIT HANDLER FOR SQLEXCEPTION + BEGIN + GET DIAGNOSTICS CONDITION 1 + v_sqlstate = RETURNED_SQLSTATE, + v_error_message = MESSAGE_TEXT; + + SELECT CONCAT('Error SQLSTATE: ', v_sqlstate, ', Message: ', v_error_message) AS ErrorDetails; + + ROLLBACK; + END; + + DECLARE CONTINUE HANDLER FOR NOT FOUND SET v_done = TRUE; + + -- Check if `from_column` exists and `to_column` does not exist + IF NOT EXISTS (SELECT * FROM information_schema.COLUMNS + WHERE TABLE_NAME = 'job_groups_cancelled' + AND COLUMN_NAME = from_column) THEN + SELECT CONCAT('Error: Column ', from_column, ' does not exist.') AS ErrorDetails; + LEAVE rename_column; + END IF; + + IF EXISTS (SELECT * FROM information_schema.COLUMNS + WHERE TABLE_NAME = 'job_groups_cancelled' + AND COLUMN_NAME = to_column) THEN + SELECT CONCAT('Error: Column ', to_column, ' already exists.') AS ErrorDetails; + LEAVE rename_column; + END IF; + + START TRANSACTION; + + OPEN v_cursor; + + drop_fk_loop: LOOP + + FETCH v_cursor INTO v_constraint_name; + + IF v_done THEN + LEAVE drop_fk_loop; + END IF; + + SELECT v_constraint_name; + + IF v_constraint_name = 'PRIMARY' THEN + SET v_primary_exists = TRUE; + ELSE + SET @sql = CONCAT('ALTER TABLE job_groups_cancelled DROP FOREIGN KEY ', v_constraint_name); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; + + END LOOP; + + CLOSE v_cursor; + + -- drop primary key + IF v_primary_exists THEN + SET @sql = 'ALTER TABLE job_groups_cancelled DROP PRIMARY KEY'; + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; + + -- rename column from `from_column` to `to_column` + SET @sql = CONCAT('ALTER TABLE job_groups_cancelled CHANGE COLUMN `', from_column, '` `', to_column, '` BIGINT NOT NULL'); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + + -- Recreate the primary and foreign key constraints using `to_column` + SET @sql = CONCAT('ALTER TABLE job_groups_cancelled ADD PRIMARY KEY (`', to_column, '`, `job_group_id`), ', + 'ADD FOREIGN KEY (`', to_column, '`) REFERENCES batches(`id`) ON DELETE CASCADE, ', + 'ADD FOREIGN KEY (`', to_column, '`, `job_group_id`) REFERENCES job_groups(`batch_id`, `job_group_id`) ON DELETE CASCADE'); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + + COMMIT; + +END $$ + +DELIMITER ; diff --git a/batch/sql/rename-job-groups-tables.sql b/batch/sql/rename-job-groups-tables.sql index f13602c2878..55d044c7b15 100644 --- a/batch/sql/rename-job-groups-tables.sql +++ b/batch/sql/rename-job-groups-tables.sql @@ -604,10 +604,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -669,10 +669,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -714,10 +714,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; From 45e0049b91199417198e13b834d4427e697f17db Mon Sep 17 00:00:00 2001 From: Ivan Date: Fri, 6 Sep 2024 16:23:33 -0400 Subject: [PATCH 2/5] Added rename-job-groups-cancelled-column script to batch migration. Refactored sql for simplification. --- .../rename-job-groups-cancelled-column.sql | 130 ++++-------------- build.yaml | 3 + 2 files changed, 30 insertions(+), 103 deletions(-) diff --git a/batch/sql/rename-job-groups-cancelled-column.sql b/batch/sql/rename-job-groups-cancelled-column.sql index 38154f2b6e8..4ded078a68e 100644 --- a/batch/sql/rename-job-groups-cancelled-column.sql +++ b/batch/sql/rename-job-groups-cancelled-column.sql @@ -1,103 +1,27 @@ -DELIMITER $$ - -CREATE PROCEDURE IF NOT EXISTS rename_job_groups_cancelled_column( - IN from_column VARCHAR(64), - IN to_column VARCHAR(64) -) -rename_column:BEGIN - DECLARE v_constraint_name VARCHAR(64); - DECLARE v_done INT DEFAULT FALSE; - DECLARE v_error_message TEXT; - DECLARE v_primary_exists INT DEFAULT FALSE; - DECLARE v_sqlstate CHAR(5); - - -- Dynamically fetch the names of contraints related to `from_column` - DECLARE v_cursor CURSOR FOR - SELECT CONSTRAINT_NAME - FROM information_schema.KEY_COLUMN_USAGE - WHERE TABLE_NAME = 'job_groups_cancelled' - AND COLUMN_NAME = from_column; - - -- Rollback handler if something goes wrong - DECLARE EXIT HANDLER FOR SQLEXCEPTION - BEGIN - GET DIAGNOSTICS CONDITION 1 - v_sqlstate = RETURNED_SQLSTATE, - v_error_message = MESSAGE_TEXT; - - SELECT CONCAT('Error SQLSTATE: ', v_sqlstate, ', Message: ', v_error_message) AS ErrorDetails; - - ROLLBACK; - END; - - DECLARE CONTINUE HANDLER FOR NOT FOUND SET v_done = TRUE; - - -- Check if `from_column` exists and `to_column` does not exist - IF NOT EXISTS (SELECT * FROM information_schema.COLUMNS - WHERE TABLE_NAME = 'job_groups_cancelled' - AND COLUMN_NAME = from_column) THEN - SELECT CONCAT('Error: Column ', from_column, ' does not exist.') AS ErrorDetails; - LEAVE rename_column; - END IF; - - IF EXISTS (SELECT * FROM information_schema.COLUMNS - WHERE TABLE_NAME = 'job_groups_cancelled' - AND COLUMN_NAME = to_column) THEN - SELECT CONCAT('Error: Column ', to_column, ' already exists.') AS ErrorDetails; - LEAVE rename_column; - END IF; - - START TRANSACTION; - - OPEN v_cursor; - - drop_fk_loop: LOOP - - FETCH v_cursor INTO v_constraint_name; - - IF v_done THEN - LEAVE drop_fk_loop; - END IF; - - SELECT v_constraint_name; - - IF v_constraint_name = 'PRIMARY' THEN - SET v_primary_exists = TRUE; - ELSE - SET @sql = CONCAT('ALTER TABLE job_groups_cancelled DROP FOREIGN KEY ', v_constraint_name); - PREPARE stmt FROM @sql; - EXECUTE stmt; - DEALLOCATE PREPARE stmt; - END IF; - - END LOOP; - - CLOSE v_cursor; - - -- drop primary key - IF v_primary_exists THEN - SET @sql = 'ALTER TABLE job_groups_cancelled DROP PRIMARY KEY'; - PREPARE stmt FROM @sql; - EXECUTE stmt; - DEALLOCATE PREPARE stmt; - END IF; - - -- rename column from `from_column` to `to_column` - SET @sql = CONCAT('ALTER TABLE job_groups_cancelled CHANGE COLUMN `', from_column, '` `', to_column, '` BIGINT NOT NULL'); - PREPARE stmt FROM @sql; - EXECUTE stmt; - DEALLOCATE PREPARE stmt; - - -- Recreate the primary and foreign key constraints using `to_column` - SET @sql = CONCAT('ALTER TABLE job_groups_cancelled ADD PRIMARY KEY (`', to_column, '`, `job_group_id`), ', - 'ADD FOREIGN KEY (`', to_column, '`) REFERENCES batches(`id`) ON DELETE CASCADE, ', - 'ADD FOREIGN KEY (`', to_column, '`, `job_group_id`) REFERENCES job_groups(`batch_id`, `job_group_id`) ON DELETE CASCADE'); - PREPARE stmt FROM @sql; - EXECUTE stmt; - DEALLOCATE PREPARE stmt; - - COMMIT; - -END $$ - -DELIMITER ; +/* +mysql> SELECT * FROM INFORMATION_SCHEMA.INNODB_SYS_FOREIGN \G + +(mysql 8.x or above) +mysql> SELECT * FROM INFORMATION_SCHEMA.INNODB_FOREIGN \G + +*************************** XXX row *************************** + ID: batches/job_groups_cancelled_ibfk_1 +FOR_NAME: batches/job_groups_cancelled +REF_NAME: batches/batches + N_COLS: 1 + TYPE: 33 +*************************** YYY row *************************** + ID: batches/job_groups_cancelled_ibfk_2 +FOR_NAME: batches/job_groups_cancelled +REF_NAME: batches/job_groups + N_COLS: 2 + TYPE: 33 +*/ + +ALTER TABLE job_groups_cancelled DROP FOREIGN KEY job_groups_cancelled_ibfk_1; +ALTER TABLE job_groups_cancelled DROP FOREIGN KEY job_groups_cancelled_ibfk_2; +ALTER TABLE job_groups_cancelled DROP PRIMARY KEY; +ALTER TABLE job_groups_cancelled CHANGE COLUMN `id` `batch_id` BIGINT NOT NULL; +ALTER TABLE job_groups_cancelled ADD PRIMARY KEY (`batch_id`, `job_group_id`), + ADD FOREIGN KEY (`batch_id`) REFERENCES batches(id) ON DELETE CASCADE, + ADD FOREIGN KEY (`batch_id`, `job_group_id`) REFERENCES job_groups (`batch_id`, `job_group_id`) ON DELETE CASCADE; diff --git a/build.yaml b/build.yaml index a12528f67e2..3a0838add1a 100644 --- a/build.yaml +++ b/build.yaml @@ -2294,6 +2294,9 @@ steps: - name: fix-mark-job-complete-deadlocks script: /io/sql/fix-mark-job-complete-deadlocks.sql online: true + - name: rename-job-groups-cancelled-column + script: /io/sql/rename-job-groups-cancelled-column.sql + online: false inputs: - from: /repo/batch/sql to: /io/sql From fec05d4b4064cd88156c0faf55979f67fd0e3bfa Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 9 Sep 2024 16:53:21 -0400 Subject: [PATCH 3/5] revert files --- batch/batch/driver/canceller.py | 6 +++--- batch/batch/driver/job.py | 6 +++--- batch/sql/finalize-job-groups.sql | 6 +++--- batch/sql/rename-job-groups-tables.sql | 12 ++++++------ 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/batch/batch/driver/canceller.py b/batch/batch/driver/canceller.py index d438a8519bb..b02d6f24a2f 100644 --- a/batch/batch/driver/canceller.py +++ b/batch/batch/driver/canceller.py @@ -99,7 +99,7 @@ async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -204,7 +204,7 @@ async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[st SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id @@ -311,7 +311,7 @@ async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str SELECT 1 AS cancelled FROM job_group_self_and_ancestors INNER JOIN job_groups_cancelled - ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE job_groups.batch_id = job_group_self_and_ancestors.batch_id AND job_groups.job_group_id = job_group_self_and_ancestors.job_group_id diff --git a/batch/batch/driver/job.py b/batch/batch/driver/job.py index c6d129eef18..84237ca073f 100644 --- a/batch/batch/driver/job.py +++ b/batch/batch/driver/job.py @@ -36,7 +36,7 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe SELECT batches.*, cost_t.cost, cost_t.cost_breakdown, - job_groups_cancelled.id IS NOT NULL AS cancelled, + job_groups_cancelled.batch_id IS NOT NULL AS cancelled, job_groups_n_jobs_in_complete_states.n_completed, job_groups_n_jobs_in_complete_states.n_succeeded, job_groups_n_jobs_in_complete_states.n_failed, @@ -56,7 +56,7 @@ async def notify_batch_job_complete(db: Database, client_session: httpx.ClientSe GROUP BY batch_id ) AS cost_t ON TRUE LEFT JOIN job_groups_cancelled - ON batches.id = job_groups_cancelled.id + ON batches.id = job_groups_cancelled.batch_id WHERE batches.id = %s AND NOT deleted AND callback IS NOT NULL AND batches.`state` = 'complete'; """, @@ -123,7 +123,7 @@ async def notify_job_group_on_job_complete( SELECT 1 AS cancelled FROM job_group_self_and_ancestors AS self_and_ancestors INNER JOIN job_groups_cancelled - ON self_and_ancestors.batch_id = job_groups_cancelled.id AND + ON self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE self_and_ancestors.batch_id = job_group_self_and_ancestors.batch_id AND self_and_ancestors.job_group_id = job_group_self_and_ancestors.ancestor_id diff --git a/batch/sql/finalize-job-groups.sql b/batch/sql/finalize-job-groups.sql index 2acd87262b6..382b720c68a 100644 --- a/batch/sql/finalize-job-groups.sql +++ b/batch/sql/finalize-job-groups.sql @@ -10,7 +10,7 @@ BEGIN SET job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id LOCK IN SHARE MODE); @@ -142,7 +142,7 @@ BEGIN SET cur_job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = OLD.batch_id AND job_group_self_and_ancestors.job_group_id = OLD.job_group_id LOCK IN SHARE MODE); @@ -318,7 +318,7 @@ BEGIN SET cur_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id FOR UPDATE); diff --git a/batch/sql/rename-job-groups-tables.sql b/batch/sql/rename-job-groups-tables.sql index 55d044c7b15..f13602c2878 100644 --- a/batch/sql/rename-job-groups-tables.sql +++ b/batch/sql/rename-job-groups-tables.sql @@ -604,10 +604,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -669,10 +669,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -714,10 +714,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; From 64dcea324adc5777834c8da481977d5349dda8aa Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 9 Sep 2024 21:23:11 -0400 Subject: [PATCH 4/5] Recreate triggers and stored procedures after column name change. --- batch/sql/estimated-current.sql | 18 +- .../rename-job-groups-cancelled-column.sql | 494 ++++++++++++++++++ 2 files changed, 503 insertions(+), 9 deletions(-) diff --git a/batch/sql/estimated-current.sql b/batch/sql/estimated-current.sql index 20b10cc8151..3b7fe1e5ac2 100644 --- a/batch/sql/estimated-current.sql +++ b/batch/sql/estimated-current.sql @@ -612,7 +612,7 @@ BEGIN SET job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id LOCK IN SHARE MODE); @@ -671,7 +671,7 @@ BEGIN SET cur_job_group_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = OLD.batch_id AND job_group_self_and_ancestors.job_group_id = OLD.job_group_id LOCK IN SHARE MODE); @@ -1107,7 +1107,7 @@ BEGIN SET cur_cancelled = EXISTS (SELECT TRUE FROM job_group_self_and_ancestors - INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.id AND job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id WHERE batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id FOR UPDATE); @@ -1256,10 +1256,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -1376,10 +1376,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; @@ -1421,10 +1421,10 @@ BEGIN WHERE batch_id = in_batch_id AND job_id = in_job_id FOR UPDATE; - SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + SELECT (jobs.cancelled OR job_groups_cancelled.id IS NOT NULL) AND NOT jobs.always_run INTO cur_job_cancel FROM jobs - LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.id = jobs.batch_id WHERE batch_id = in_batch_id AND job_id = in_job_id LOCK IN SHARE MODE; diff --git a/batch/sql/rename-job-groups-cancelled-column.sql b/batch/sql/rename-job-groups-cancelled-column.sql index 4ded078a68e..9c6c9419956 100644 --- a/batch/sql/rename-job-groups-cancelled-column.sql +++ b/batch/sql/rename-job-groups-cancelled-column.sql @@ -25,3 +25,497 @@ ALTER TABLE job_groups_cancelled CHANGE COLUMN `id` `batch_id` BIGINT NOT NULL; ALTER TABLE job_groups_cancelled ADD PRIMARY KEY (`batch_id`, `job_group_id`), ADD FOREIGN KEY (`batch_id`) REFERENCES batches(id) ON DELETE CASCADE, ADD FOREIGN KEY (`batch_id`, `job_group_id`) REFERENCES job_groups (`batch_id`, `job_group_id`) ON DELETE CASCADE; + +DELIMITER $$ + +DROP TRIGGER IF EXISTS jobs_before_insert $$ +CREATE TRIGGER jobs_before_insert BEFORE INSERT ON jobs +FOR EACH ROW +BEGIN + DECLARE job_group_cancelled BOOLEAN; + + SET job_group_cancelled = EXISTS (SELECT TRUE + FROM job_group_self_and_ancestors + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id + WHERE batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id + LOCK IN SHARE MODE); + + IF job_group_cancelled THEN + SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = "job group has already been cancelled"; + END IF; +END $$ + +DROP TRIGGER IF EXISTS jobs_after_update $$ +CREATE TRIGGER jobs_after_update AFTER UPDATE ON jobs +FOR EACH ROW +BEGIN + DECLARE cur_user VARCHAR(100); + DECLARE cur_job_group_cancelled BOOLEAN; + DECLARE cur_n_tokens INT; + DECLARE rand_token INT; + + DECLARE always_run boolean; + DECLARE cores_mcpu bigint; + + DECLARE was_marked_cancelled boolean; + DECLARE was_cancelled boolean; + DECLARE was_cancellable boolean; + + DECLARE now_marked_cancelled boolean; + DECLARE now_cancelled boolean; + DECLARE now_cancellable boolean; + + DECLARE was_ready boolean; + DECLARE now_ready boolean; + + DECLARE was_running boolean; + DECLARE now_running boolean; + + DECLARE was_creating boolean; + DECLARE now_creating boolean; + + DECLARE delta_n_ready_cancellable_jobs int; + DECLARE delta_ready_cancellable_cores_mcpu bigint; + DECLARE delta_n_ready_jobs int; + DECLARE delta_ready_cores_mcpu bigint; + DECLARE delta_n_cancelled_ready_jobs int; + + DECLARE delta_n_running_cancellable_jobs int; + DECLARE delta_running_cancellable_cores_mcpu bigint; + DECLARE delta_n_running_jobs int; + DECLARE delta_running_cores_mcpu bigint; + DECLARE delta_n_cancelled_running_jobs int; + + DECLARE delta_n_creating_cancellable_jobs int; + DECLARE delta_n_creating_jobs int; + DECLARE delta_n_cancelled_creating_jobs int; + + SELECT user INTO cur_user FROM batches WHERE id = NEW.batch_id; + + SET cur_job_group_cancelled = EXISTS (SELECT TRUE + FROM job_group_self_and_ancestors + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id + WHERE batch_id = OLD.batch_id AND job_group_self_and_ancestors.job_group_id = OLD.job_group_id + LOCK IN SHARE MODE); + + SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE; + SET rand_token = FLOOR(RAND() * cur_n_tokens); + + SET always_run = old.always_run; # always_run is immutable + SET cores_mcpu = old.cores_mcpu; # cores_mcpu is immutable + + SET was_marked_cancelled = old.cancelled OR cur_job_group_cancelled; + SET was_cancelled = NOT always_run AND was_marked_cancelled; + SET was_cancellable = NOT always_run AND NOT was_marked_cancelled; + + SET now_marked_cancelled = new.cancelled or cur_job_group_cancelled; + SET now_cancelled = NOT always_run AND now_marked_cancelled; + SET now_cancellable = NOT always_run AND NOT now_marked_cancelled; + + # NB: was_cancelled => now_cancelled b/c you cannot be uncancelled + + SET was_ready = old.state = 'Ready'; + SET now_ready = new.state = 'Ready'; + SET was_running = old.state = 'Running'; + SET now_running = new.state = 'Running'; + SET was_creating = old.state = 'Creating'; + SET now_creating = new.state = 'Creating'; + + SET delta_n_ready_cancellable_jobs = (-1 * was_ready * was_cancellable ) + (now_ready * now_cancellable ) ; + SET delta_n_ready_jobs = (-1 * was_ready * (NOT was_cancelled)) + (now_ready * (NOT now_cancelled)); + SET delta_n_cancelled_ready_jobs = (-1 * was_ready * was_cancelled ) + (now_ready * now_cancelled ) ; + + SET delta_n_running_cancellable_jobs = (-1 * was_running * was_cancellable ) + (now_running * now_cancellable ) ; + SET delta_n_running_jobs = (-1 * was_running * (NOT was_cancelled)) + (now_running * (NOT now_cancelled)); + SET delta_n_cancelled_running_jobs = (-1 * was_running * was_cancelled ) + (now_running * now_cancelled ) ; + + SET delta_n_creating_cancellable_jobs = (-1 * was_creating * was_cancellable ) + (now_creating * now_cancellable ) ; + SET delta_n_creating_jobs = (-1 * was_creating * (NOT was_cancelled)) + (now_creating * (NOT now_cancelled)); + SET delta_n_cancelled_creating_jobs = (-1 * was_creating * was_cancelled ) + (now_creating * now_cancelled ) ; + + SET delta_ready_cancellable_cores_mcpu = delta_n_ready_cancellable_jobs * cores_mcpu; + SET delta_ready_cores_mcpu = delta_n_ready_jobs * cores_mcpu; + + SET delta_running_cancellable_cores_mcpu = delta_n_running_cancellable_jobs * cores_mcpu; + SET delta_running_cores_mcpu = delta_n_running_jobs * cores_mcpu; + + INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, update_id, job_group_id, inst_coll, token, + n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs, + n_running_cancellable_jobs, + running_cancellable_cores_mcpu) + SELECT NEW.batch_id, NEW.update_id, job_group_self_and_ancestors.ancestor_id, NEW.inst_coll, rand_token, + delta_n_ready_cancellable_jobs, + delta_ready_cancellable_cores_mcpu, + delta_n_creating_cancellable_jobs, + delta_n_running_cancellable_jobs, + delta_running_cancellable_cores_mcpu + FROM job_group_self_and_ancestors + WHERE job_group_self_and_ancestors.batch_id = NEW.batch_id AND job_group_self_and_ancestors.job_group_id = NEW.job_group_id + ON DUPLICATE KEY UPDATE + n_ready_cancellable_jobs = n_ready_cancellable_jobs + delta_n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu = ready_cancellable_cores_mcpu + delta_ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs = n_creating_cancellable_jobs + delta_n_creating_cancellable_jobs, + n_running_cancellable_jobs = n_running_cancellable_jobs + delta_n_running_cancellable_jobs, + running_cancellable_cores_mcpu = running_cancellable_cores_mcpu + delta_running_cancellable_cores_mcpu; + + INSERT INTO user_inst_coll_resources (user, inst_coll, token, + n_ready_jobs, + n_running_jobs, + n_creating_jobs, + ready_cores_mcpu, + running_cores_mcpu, + n_cancelled_ready_jobs, + n_cancelled_running_jobs, + n_cancelled_creating_jobs + ) + VALUES (cur_user, NEW.inst_coll, rand_token, + delta_n_ready_jobs, + delta_n_running_jobs, + delta_n_creating_jobs, + delta_ready_cores_mcpu, + delta_running_cores_mcpu, + delta_n_cancelled_ready_jobs, + delta_n_cancelled_running_jobs, + delta_n_cancelled_creating_jobs + ) + ON DUPLICATE KEY UPDATE + n_ready_jobs = n_ready_jobs + delta_n_ready_jobs, + n_running_jobs = n_running_jobs + delta_n_running_jobs, + n_creating_jobs = n_creating_jobs + delta_n_creating_jobs, + ready_cores_mcpu = ready_cores_mcpu + delta_ready_cores_mcpu, + running_cores_mcpu = running_cores_mcpu + delta_running_cores_mcpu, + n_cancelled_ready_jobs = n_cancelled_ready_jobs + delta_n_cancelled_ready_jobs, + n_cancelled_running_jobs = n_cancelled_running_jobs + delta_n_cancelled_running_jobs, + n_cancelled_creating_jobs = n_cancelled_creating_jobs + delta_n_cancelled_creating_jobs; +END $$ + +DROP PROCEDURE IF EXISTS cancel_batch $$ +CREATE PROCEDURE cancel_batch( + IN in_batch_id VARCHAR(100) +) +BEGIN + DECLARE cur_user VARCHAR(100); + DECLARE cur_batch_state VARCHAR(40); + DECLARE cur_cancelled BOOLEAN; + DECLARE cur_n_cancelled_ready_jobs INT; + DECLARE cur_cancelled_ready_cores_mcpu BIGINT; + DECLARE cur_n_cancelled_running_jobs INT; + DECLARE cur_cancelled_running_cores_mcpu BIGINT; + DECLARE cur_n_n_cancelled_creating_jobs INT; + + START TRANSACTION; + + SELECT user, `state` INTO cur_user, cur_batch_state FROM batches + WHERE id = in_batch_id + FOR UPDATE; + + SET cur_cancelled = EXISTS (SELECT TRUE + FROM job_groups_cancelled + WHERE batch_id = in_batch_id + FOR UPDATE); + + IF cur_batch_state = 'running' AND NOT cur_cancelled THEN + INSERT INTO user_inst_coll_resources (user, inst_coll, token, + n_ready_jobs, ready_cores_mcpu, + n_running_jobs, running_cores_mcpu, + n_creating_jobs, + n_cancelled_ready_jobs, n_cancelled_running_jobs, n_cancelled_creating_jobs) + SELECT user, inst_coll, 0, + -1 * (@n_ready_cancellable_jobs := COALESCE(SUM(n_ready_cancellable_jobs), 0)), + -1 * (@ready_cancellable_cores_mcpu := COALESCE(SUM(ready_cancellable_cores_mcpu), 0)), + -1 * (@n_running_cancellable_jobs := COALESCE(SUM(n_running_cancellable_jobs), 0)), + -1 * (@running_cancellable_cores_mcpu := COALESCE(SUM(running_cancellable_cores_mcpu), 0)), + -1 * (@n_creating_cancellable_jobs := COALESCE(SUM(n_creating_cancellable_jobs), 0)), + COALESCE(SUM(n_ready_cancellable_jobs), 0), + COALESCE(SUM(n_running_cancellable_jobs), 0), + COALESCE(SUM(n_creating_cancellable_jobs), 0) + FROM job_group_inst_coll_cancellable_resources + JOIN batches ON batches.id = job_group_inst_coll_cancellable_resources.batch_id + INNER JOIN batch_updates ON job_group_inst_coll_cancellable_resources.batch_id = batch_updates.batch_id AND + job_group_inst_coll_cancellable_resources.update_id = batch_updates.update_id + WHERE job_group_inst_coll_cancellable_resources.batch_id = in_batch_id AND batch_updates.committed + GROUP BY user, inst_coll + ON DUPLICATE KEY UPDATE + n_ready_jobs = n_ready_jobs - @n_ready_cancellable_jobs, + ready_cores_mcpu = ready_cores_mcpu - @ready_cancellable_cores_mcpu, + n_running_jobs = n_running_jobs - @n_running_cancellable_jobs, + running_cores_mcpu = running_cores_mcpu - @running_cancellable_cores_mcpu, + n_creating_jobs = n_creating_jobs - @n_creating_cancellable_jobs, + n_cancelled_ready_jobs = n_cancelled_ready_jobs + @n_ready_cancellable_jobs, + n_cancelled_running_jobs = n_cancelled_running_jobs + @n_running_cancellable_jobs, + n_cancelled_creating_jobs = n_cancelled_creating_jobs + @n_creating_cancellable_jobs; + + # there are no cancellable jobs left, they have been cancelled + DELETE FROM job_group_inst_coll_cancellable_resources WHERE batch_id = in_batch_id; + + # cancel root job group only + INSERT INTO job_groups_cancelled (batch_id, job_group_id) VALUES (in_batch_id, 0); + END IF; + + COMMIT; +END $$ + +DROP PROCEDURE IF EXISTS cancel_job_group $$ +CREATE PROCEDURE cancel_job_group( + IN in_batch_id VARCHAR(100), + IN in_job_group_id INT +) +BEGIN + DECLARE cur_user VARCHAR(100); + DECLARE cur_job_group_state VARCHAR(40); + DECLARE cur_cancelled BOOLEAN; + + START TRANSACTION; + + SELECT user, `state` INTO cur_user, cur_job_group_state + FROM job_groups + WHERE batch_id = in_batch_id AND job_group_id = in_job_group_id + FOR UPDATE; + + SET cur_cancelled = EXISTS (SELECT TRUE + FROM job_group_self_and_ancestors + INNER JOIN job_groups_cancelled ON job_group_self_and_ancestors.batch_id = job_groups_cancelled.batch_id AND + job_group_self_and_ancestors.ancestor_id = job_groups_cancelled.job_group_id + WHERE batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id + FOR UPDATE); + + IF NOT cur_cancelled THEN + INSERT INTO user_inst_coll_resources (user, inst_coll, token, + n_ready_jobs, ready_cores_mcpu, + n_running_jobs, running_cores_mcpu, + n_creating_jobs, + n_cancelled_ready_jobs, n_cancelled_running_jobs, n_cancelled_creating_jobs) + SELECT user, inst_coll, 0, + -1 * (@n_ready_cancellable_jobs := COALESCE(SUM(n_ready_cancellable_jobs), 0)), + -1 * (@ready_cancellable_cores_mcpu := COALESCE(SUM(ready_cancellable_cores_mcpu), 0)), + -1 * (@n_running_cancellable_jobs := COALESCE(SUM(n_running_cancellable_jobs), 0)), + -1 * (@running_cancellable_cores_mcpu := COALESCE(SUM(running_cancellable_cores_mcpu), 0)), + -1 * (@n_creating_cancellable_jobs := COALESCE(SUM(n_creating_cancellable_jobs), 0)), + COALESCE(SUM(n_ready_cancellable_jobs), 0), + COALESCE(SUM(n_running_cancellable_jobs), 0), + COALESCE(SUM(n_creating_cancellable_jobs), 0) + FROM job_group_inst_coll_cancellable_resources + INNER JOIN batches ON job_group_inst_coll_cancellable_resources.batch_id = batches.id + INNER JOIN batch_updates ON job_group_inst_coll_cancellable_resources.batch_id = batch_updates.batch_id AND + job_group_inst_coll_cancellable_resources.update_id = batch_updates.update_id + WHERE job_group_inst_coll_cancellable_resources.batch_id = in_batch_id AND + job_group_inst_coll_cancellable_resources.job_group_id = in_job_group_id AND + batch_updates.committed + GROUP BY user, inst_coll + FOR UPDATE + ON DUPLICATE KEY UPDATE + n_ready_jobs = n_ready_jobs - @n_ready_cancellable_jobs, + ready_cores_mcpu = ready_cores_mcpu - @ready_cancellable_cores_mcpu, + n_running_jobs = n_running_jobs - @n_running_cancellable_jobs, + running_cores_mcpu = running_cores_mcpu - @running_cancellable_cores_mcpu, + n_creating_jobs = n_creating_jobs - @n_creating_cancellable_jobs, + n_cancelled_ready_jobs = n_cancelled_ready_jobs + @n_ready_cancellable_jobs, + n_cancelled_running_jobs = n_cancelled_running_jobs + @n_running_cancellable_jobs, + n_cancelled_creating_jobs = n_cancelled_creating_jobs + @n_creating_cancellable_jobs; + + INSERT INTO job_group_inst_coll_cancellable_resources (batch_id, update_id, job_group_id, inst_coll, token, + n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs, + n_running_cancellable_jobs, + running_cancellable_cores_mcpu) + SELECT batch_id, update_id, ancestor_id, inst_coll, 0, + -1 * (@jg_n_ready_cancellable_jobs := old_n_ready_cancellable_jobs), + -1 * (@jg_ready_cancellable_cores_mcpu := old_ready_cancellable_cores_mcpu), + -1 * (@jg_n_creating_cancellable_jobs := old_n_creating_cancellable_jobs), + -1 * (@jg_n_running_cancellable_jobs := old_n_running_cancellable_jobs), + -1 * (@jg_running_cancellable_cores_mcpu := old_running_cancellable_cores_mcpu) + FROM job_group_self_and_ancestors + INNER JOIN LATERAL ( + SELECT update_id, inst_coll, COALESCE(SUM(n_ready_cancellable_jobs), 0) AS old_n_ready_cancellable_jobs, + COALESCE(SUM(ready_cancellable_cores_mcpu), 0) AS old_ready_cancellable_cores_mcpu, + COALESCE(SUM(n_creating_cancellable_jobs), 0) AS old_n_creating_cancellable_jobs, + COALESCE(SUM(n_running_cancellable_jobs), 0) AS old_n_running_cancellable_jobs, + COALESCE(SUM(running_cancellable_cores_mcpu), 0) AS old_running_cancellable_cores_mcpu + FROM job_group_inst_coll_cancellable_resources + WHERE job_group_inst_coll_cancellable_resources.batch_id = job_group_self_and_ancestors.batch_id AND + job_group_inst_coll_cancellable_resources.job_group_id = job_group_self_and_ancestors.job_group_id + GROUP BY update_id, inst_coll + FOR UPDATE + ) AS t ON TRUE + WHERE job_group_self_and_ancestors.batch_id = in_batch_id AND job_group_self_and_ancestors.job_group_id = in_job_group_id + ON DUPLICATE KEY UPDATE + n_ready_cancellable_jobs = n_ready_cancellable_jobs - @jg_n_ready_cancellable_jobs, + ready_cancellable_cores_mcpu = ready_cancellable_cores_mcpu - @jg_ready_cancellable_cores_mcpu, + n_creating_cancellable_jobs = n_creating_cancellable_jobs - @jg_n_creating_cancellable_jobs, + n_running_cancellable_jobs = n_running_cancellable_jobs - @jg_n_running_cancellable_jobs, + running_cancellable_cores_mcpu = running_cancellable_cores_mcpu - @jg_running_cancellable_cores_mcpu; + + # Group cancellation, like any operation, must be O(1) time. The number of descendant groups is unbounded, + # so we neither delete rows from job_group_inst_coll_cancellable_resources nor update job_groups_cancelled. + # The former is handled by main.py. In the latter case, group cancellation state is implicitly defined by an + # upwards traversal on the ancestor tree. + + INSERT INTO job_groups_cancelled (batch_id, job_group_id) + VALUES (in_batch_id, in_job_group_id); + END IF; + + COMMIT; +END $$ + +DROP PROCEDURE IF EXISTS schedule_job $$ +CREATE PROCEDURE schedule_job( + IN in_batch_id BIGINT, + IN in_job_id INT, + IN in_attempt_id VARCHAR(40), + IN in_instance_name VARCHAR(100) +) +BEGIN + DECLARE cur_job_state VARCHAR(40); + DECLARE cur_cores_mcpu INT; + DECLARE cur_job_cancel BOOLEAN; + DECLARE cur_instance_state VARCHAR(40); + DECLARE cur_attempt_id VARCHAR(40); + DECLARE delta_cores_mcpu INT; + DECLARE cur_instance_is_pool BOOLEAN; + + START TRANSACTION; + + SELECT state, cores_mcpu, attempt_id + INTO cur_job_state, cur_cores_mcpu, cur_attempt_id + FROM jobs + WHERE batch_id = in_batch_id AND job_id = in_job_id + FOR UPDATE; + + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + INTO cur_job_cancel + FROM jobs + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + WHERE batch_id = in_batch_id AND job_id = in_job_id + LOCK IN SHARE MODE; + + SELECT is_pool + INTO cur_instance_is_pool + FROM instances + LEFT JOIN inst_colls ON instances.inst_coll = inst_colls.name + WHERE instances.name = in_instance_name; + + CALL add_attempt(in_batch_id, in_job_id, in_attempt_id, in_instance_name, cur_cores_mcpu, delta_cores_mcpu); + + IF cur_instance_is_pool THEN + IF delta_cores_mcpu = 0 THEN + SET delta_cores_mcpu = cur_cores_mcpu; + ELSE + SET delta_cores_mcpu = 0; + END IF; + END IF; + + SELECT state INTO cur_instance_state FROM instances WHERE name = in_instance_name LOCK IN SHARE MODE; + + IF (cur_job_state = 'Ready' OR cur_job_state = 'Creating') AND NOT cur_job_cancel AND cur_instance_state = 'active' THEN + UPDATE jobs SET state = 'Running', attempt_id = in_attempt_id WHERE batch_id = in_batch_id AND job_id = in_job_id; + COMMIT; + SELECT 0 as rc, in_instance_name, delta_cores_mcpu; + ELSE + COMMIT; + SELECT 1 as rc, + cur_job_state, + cur_job_cancel, + cur_instance_state, + in_instance_name, + cur_attempt_id, + delta_cores_mcpu, + 'job not Ready or cancelled or instance not active, but attempt already exists' as message; + END IF; +END $$ + +DROP PROCEDURE IF EXISTS mark_job_creating $$ +CREATE PROCEDURE mark_job_creating( + IN in_batch_id BIGINT, + IN in_job_id INT, + IN in_attempt_id VARCHAR(40), + IN in_instance_name VARCHAR(100), + IN new_start_time BIGINT +) +BEGIN + DECLARE cur_job_state VARCHAR(40); + DECLARE cur_job_cancel BOOLEAN; + DECLARE cur_cores_mcpu INT; + DECLARE cur_instance_state VARCHAR(40); + DECLARE delta_cores_mcpu INT; + + START TRANSACTION; + + SELECT state, cores_mcpu + INTO cur_job_state, cur_cores_mcpu + FROM jobs + WHERE batch_id = in_batch_id AND job_id = in_job_id + FOR UPDATE; + + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + INTO cur_job_cancel + FROM jobs + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + WHERE batch_id = in_batch_id AND job_id = in_job_id + LOCK IN SHARE MODE; + + CALL add_attempt(in_batch_id, in_job_id, in_attempt_id, in_instance_name, cur_cores_mcpu, delta_cores_mcpu); + + UPDATE attempts SET start_time = new_start_time, rollup_time = new_start_time + WHERE batch_id = in_batch_id AND job_id = in_job_id AND attempt_id = in_attempt_id; + + SELECT state INTO cur_instance_state FROM instances WHERE name = in_instance_name LOCK IN SHARE MODE; + + IF cur_job_state = 'Ready' AND NOT cur_job_cancel AND cur_instance_state = 'pending' THEN + UPDATE jobs SET state = 'Creating', attempt_id = in_attempt_id WHERE batch_id = in_batch_id AND job_id = in_job_id; + END IF; + + COMMIT; + SELECT 0 as rc, delta_cores_mcpu; +END $$ + +DROP PROCEDURE IF EXISTS mark_job_started $$ +CREATE PROCEDURE mark_job_started( + IN in_batch_id BIGINT, + IN in_job_id INT, + IN in_attempt_id VARCHAR(40), + IN in_instance_name VARCHAR(100), + IN new_start_time BIGINT +) +BEGIN + DECLARE cur_job_state VARCHAR(40); + DECLARE cur_job_cancel BOOLEAN; + DECLARE cur_cores_mcpu INT; + DECLARE cur_instance_state VARCHAR(40); + DECLARE delta_cores_mcpu INT; + + START TRANSACTION; + + SELECT state, cores_mcpu + INTO cur_job_state, cur_cores_mcpu + FROM jobs + WHERE batch_id = in_batch_id AND job_id = in_job_id + FOR UPDATE; + + SELECT (jobs.cancelled OR job_groups_cancelled.batch_id IS NOT NULL) AND NOT jobs.always_run + INTO cur_job_cancel + FROM jobs + LEFT JOIN job_groups_cancelled ON job_groups_cancelled.batch_id = jobs.batch_id + WHERE batch_id = in_batch_id AND job_id = in_job_id + LOCK IN SHARE MODE; + + CALL add_attempt(in_batch_id, in_job_id, in_attempt_id, in_instance_name, cur_cores_mcpu, delta_cores_mcpu); + + UPDATE attempts SET start_time = new_start_time, rollup_time = new_start_time + WHERE batch_id = in_batch_id AND job_id = in_job_id AND attempt_id = in_attempt_id; + + SELECT state INTO cur_instance_state FROM instances WHERE name = in_instance_name LOCK IN SHARE MODE; + + IF cur_job_state = 'Ready' AND NOT cur_job_cancel AND cur_instance_state = 'active' THEN + UPDATE jobs SET state = 'Running', attempt_id = in_attempt_id WHERE batch_id = in_batch_id AND job_id = in_job_id; + END IF; + + COMMIT; + SELECT 0 as rc, delta_cores_mcpu; +END $$ + +DELIMITER ; From c6e3c660035379e6fe3f96fb4385f8b3c7e8d436 Mon Sep 17 00:00:00 2001 From: Ivan Date: Tue, 10 Sep 2024 15:51:24 -0400 Subject: [PATCH 5/5] Replaced job_groups_cancelled.id by job_groups_cancelled.batch_id, including those referenced by alias. --- batch/batch/driver/main.py | 2 +- batch/batch/front_end/front_end.py | 18 +++++++++--------- batch/batch/front_end/query/query_v2.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index d9b18c98805..9e5d878fab4 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -1471,7 +1471,7 @@ async def delete_prev_cancelled_job_group_cancellable_resources_records(db: Data 1 FROM job_group_self_and_ancestors AS descendant INNER JOIN job_groups_cancelled AS cancelled - ON descendant.batch_id = cancelled.id + ON descendant.batch_id = cancelled.batch_id AND descendant.ancestor_id = cancelled.job_group_id WHERE descendant.batch_id = group_resources.batch_id AND descendant.job_group_id = group_resources.job_group_id diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 8827d55f159..2aefa5dd4e0 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -1857,10 +1857,10 @@ async def update(tx: Transaction): SELECT cancelled_t.cancelled IS NOT NULL AS cancelled FROM batches LEFT JOIN ( - SELECT id, 1 AS cancelled + SELECT batch_id, 1 AS cancelled FROM job_groups_cancelled - WHERE id = %s AND job_group_id = %s -) AS cancelled_t ON batches.id = cancelled_t.id + WHERE batch_id = %s AND job_group_id = %s +) AS cancelled_t ON batches.id = cancelled_t.batch_id WHERE batches.id = %s AND batches.user = %s AND NOT deleted FOR UPDATE; """, @@ -1936,10 +1936,10 @@ async def _get_batch(app, batch_id): LEFT JOIN job_groups_n_jobs_in_complete_states ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id LEFT JOIN ( - SELECT id, 1 AS cancelled + SELECT batch_id, 1 AS cancelled FROM job_groups_cancelled - WHERE id = %s AND job_group_id = %s -) AS cancelled_t ON batches.id = cancelled_t.id + WHERE batch_id = %s AND job_group_id = %s +) AS cancelled_t ON batches.id = cancelled_t.batch_id LEFT JOIN LATERAL ( SELECT COALESCE(SUM(`usage` * rate), 0) AS cost, JSON_OBJECTAGG(resources.resource, COALESCE(`usage` * rate, 0)) AS cost_breakdown FROM ( @@ -2129,10 +2129,10 @@ async def commit_update(request: web.Request, userdata): FROM batches LEFT JOIN batch_updates ON batches.id = batch_updates.batch_id LEFT JOIN ( - SELECT id, 1 AS cancelled + SELECT batch_id, 1 AS cancelled FROM job_groups_cancelled - WHERE id = %s AND job_group_id = %s -) AS cancelled_t ON batches.id = cancelled_t.id + WHERE batch_id = %s AND job_group_id = %s +) AS cancelled_t ON batches.id = cancelled_t.batch_id WHERE batches.user = %s AND batches.id = %s AND batch_updates.update_id = %s AND NOT deleted; """, (batch_id, ROOT_JOB_GROUP_ID, user, batch_id, update_id), diff --git a/batch/batch/front_end/query/query_v2.py b/batch/batch/front_end/query/query_v2.py index 2eba30a95fa..974292f64e6 100644 --- a/batch/batch/front_end/query/query_v2.py +++ b/batch/batch/front_end/query/query_v2.py @@ -145,7 +145,7 @@ def parse_list_batches_query_v2(user: str, q: str, last_batch_id: Optional[int]) ON job_groups.batch_id = job_groups_n_jobs_in_complete_states.id AND job_groups.job_group_id = job_groups_n_jobs_in_complete_states.job_group_id LEFT JOIN (SELECT *, 1 AS cancelled FROM job_groups_cancelled) AS cancelled_t - ON job_groups.batch_id = cancelled_t.id + ON job_groups.batch_id = cancelled_t.batch_id AND job_groups.job_group_id = cancelled_t.job_group_id INNER JOIN LATERAL ( WITH resource_costs AS (