From 407b5c3e5123a4c63d8c480c954fd0575052f388 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Tue, 23 Jan 2024 16:13:33 -0700 Subject: [PATCH 01/12] Add ignores for tarred files --- .gitignore | 2 ++ examples/.gitignore | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f5d43efb0..5fa2d5ea6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ poetry.lock *.pyc *.egg-info *.out +*.tgz +*.tar.gz .python-version .DS_Store .idea diff --git a/examples/.gitignore b/examples/.gitignore index aa1ec1ea0..e69de29bb 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -1 +0,0 @@ -*.tgz From 29b254bea5248af6fec7133e6caaa28bf3091f59 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Tue, 23 Jan 2024 17:12:31 -0700 Subject: [PATCH 02/12] Start remove workflow functionality --- beeflow/client/bee_client.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index ac22c5e57..2fd309033 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -26,6 +26,7 @@ from beeflow.common.parser import CwlParser from beeflow.common.wf_data import generate_workflow_id from beeflow.client import core +from beeflow.wf_manager.resources import wf_utils # Length of a shortened workflow ID short_id_len = 6 #noqa: Not a constant @@ -340,6 +341,39 @@ def package(wf_path: pathlib.Path = typer.Argument(..., return package_path +@app.command() +def remove(wf_id: str = typer.Argument(..., callback=match_short_id)): + """Remove a cancelled or archived workflow with a workflow ID.""" + long_wf_id = wf_id + + # Check status of workflow for Archived or Canncelled + try: + conn = _wfm_conn() + resp = conn.get(_resource(long_wf_id), timeout=60) + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') + + if resp.status_code != requests.codes.okay: # pylint: disable=no-member + error_exit('Could not successfully query workflow manager') + + tasks_status = resp.json()['tasks_status'] + wf_status = resp.json()['wf_status'] + print(f"Workflow Status is {wf_status}") + if wf_status == 'Cancelled' or wf_status == 'Archived': + response = input(f"All stored information for workflow {_short_id(wf_id)} will be removed. Continue to remove? yes(y)/no(n): " ) + if response in ("n", "no"): + sys.exit("Workflow not removed.") + elif response in ("y", "yes"): + bee_workdir = wf_utils.get_bee_workdir() + workflow_dir = f"{bee_workdir}/workflows/{wf_id}" + print(f"Removing {_short_id(wf_id)} {workflow_dir}") + shutil.rmtree(workflow_dir) + else: + print(f"{_short_id(wf_id)} may still be running; it must be cancelled before attempting removal.") + + sys.exit() + + def unpackage(package_path, dest_path): """Unpackage a workflow tarball for parsing.""" package_str = str(package_path) From 4fbfcddf29941179c333cd0faeb595257c118026 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Tue, 30 Jan 2024 14:14:05 -0700 Subject: [PATCH 03/12] Add remove workflow option to client --- beeflow/client/bee_client.py | 20 ++++++---- beeflow/client/core.py | 2 +- beeflow/wf_manager/resources/wf_actions.py | 43 +++++++++++++++------- 3 files changed, 44 insertions(+), 21 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 2fd309033..7dc87943b 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -359,17 +359,22 @@ def remove(wf_id: str = typer.Argument(..., callback=match_short_id)): tasks_status = resp.json()['tasks_status'] wf_status = resp.json()['wf_status'] print(f"Workflow Status is {wf_status}") - if wf_status == 'Cancelled' or wf_status == 'Archived': + if wf_status in ('Cancelled', 'Archived'): response = input(f"All stored information for workflow {_short_id(wf_id)} will be removed. Continue to remove? yes(y)/no(n): " ) if response in ("n", "no"): sys.exit("Workflow not removed.") elif response in ("y", "yes"): - bee_workdir = wf_utils.get_bee_workdir() - workflow_dir = f"{bee_workdir}/workflows/{wf_id}" - print(f"Removing {_short_id(wf_id)} {workflow_dir}") - shutil.rmtree(workflow_dir) + try: + conn = _wfm_conn() + resp = conn.delete(_resource(long_wf_id), json={'option': 'remove'}, timeout=60) + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') + if resp.status_code != requests.codes.accepted: # pylint: disable=no-member + error_exit('WF Manager could not remove workflow.') + typer.secho("Workflow removed!", fg=typer.colors.GREEN) + logging.info(f'Remove workflow: {resp.text}') else: - print(f"{_short_id(wf_id)} may still be running; it must be cancelled before attempting removal.") + print(f"{_short_id(wf_id)} may still be running.\nThe workflow must be cancelled before attempting removal.") sys.exit() @@ -501,7 +506,8 @@ def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)): long_wf_id = wf_id try: conn = _wfm_conn() - resp = conn.delete(_resource(long_wf_id), timeout=60) + resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60) + except requests.exceptions.ConnectionError: error_exit('Could not reach WF Manager.') if resp.status_code != requests.codes.accepted: # pylint: disable=no-member diff --git a/beeflow/client/core.py b/beeflow/client/core.py index 5042dce6e..db6967bad 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -402,7 +402,7 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F', sys.exit(1) print('Starting beeflow...') if not foreground: - print(f'Check "{beeflow_log}" or run `beeflow core status` for more information.') + print(f'Run `beeflow core status` for more information.') # Create the log path if it doesn't exist yet path = paths.log_path() os.makedirs(path, exist_ok=True) diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 441b519f2..1d3a1d65e 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -1,4 +1,6 @@ """This module contains the workflow action endpoints.""" +import shutil +import os from flask import make_response, jsonify from flask_restful import Resource, reqparse @@ -53,23 +55,38 @@ def get(wf_id): wf_status=wf_status, status='ok'), 200) return resp - @staticmethod - def delete(wf_id): + def delete(self, wf_id): """Cancel the workflow. Lets current tasks finish running.""" + self.reqparse.add_argument('option', type=str, location='json') + option = self.reqparse.parse_args()['option'] db = connect_db(wfm_db, db_path) - wfi = wf_utils.get_workflow_interface(wf_id) - # Remove all tasks currently in the database - if wfi.workflow_loaded(): - wfi.finalize_workflow() - wf_utils.update_wf_status(wf_id, 'Cancelled') - db.workflows.update_workflow_state(wf_id, 'Cancelled') - log.info("Workflow cancelled") - log.info("Shutting down gdb") - pid = db.workflows.get_gdb_pid(wf_id) - dep_manager.kill_gdb(pid) - resp = make_response(jsonify(status='Cancelled'), 202) + if option == "cancel": + wfi = wf_utils.get_workflow_interface(wf_id) + # Remove all tasks currently in the database + if wfi.workflow_loaded(): + wfi.finalize_workflow() + wf_utils.update_wf_status(wf_id, 'Cancelled') + db.workflows.update_workflow_state(wf_id, 'Cancelled') + log.info("Workflow cancelled") + log.info("Shutting down gdb") + pid = db.workflows.get_gdb_pid(wf_id) + dep_manager.kill_gdb(pid) + resp = make_response(jsonify(status='Cancelled'), 202) + elif option == "remove": + log.info(f"Removing workflow {wf_id}.") + db.workflows.delete_workflow(wf_id) + resp = make_response(jsonify(status='Removed'), 202) + bee_workdir = wf_utils.get_bee_workdir() + workflow_dir = f"{bee_workdir}/workflows/{wf_id}" + print(f"Removing {wf_id} {workflow_dir}") + shutil.rmtree(workflow_dir, ignore_errors=True) + archive_path = f"{bee_workdir}/archives/{wf_id}" + print(f"Removing {wf_id} {archive_path}") + if os.path.exists(archive_path): + os.remove(archive_path) return resp + def patch(self, wf_id): """Pause or resume workflow.""" db = connect_db(wfm_db, db_path) From 881d415499c6c38fce2fe31de1822a75032876de Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Thu, 1 Feb 2024 16:29:10 -0700 Subject: [PATCH 04/12] Make method for getting workflow status from database and prevent cancel workflow when in Intializing state --- beeflow/client/bee_client.py | 58 +++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 7dc87943b..31380ebf9 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -168,6 +168,20 @@ def match_short_id(wf_id): return None +def get_wf_status(wf_id): + """ Get workflow status.""" + try: + conn = _wfm_conn() + resp = conn.get(_resource(wf_id), timeout=60) + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') + + if resp.status_code != requests.codes.okay: # pylint: disable=no-member + error_exit('Could not successfully query workflow manager') + + tasks_status = resp.json()['tasks_status'] + return resp.json()['wf_status'] + app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup) app.add_typer(core.app, name='core') app.add_typer(config_driver.app, name='config') @@ -346,21 +360,12 @@ def remove(wf_id: str = typer.Argument(..., callback=match_short_id)): """Remove a cancelled or archived workflow with a workflow ID.""" long_wf_id = wf_id - # Check status of workflow for Archived or Canncelled - try: - conn = _wfm_conn() - resp = conn.get(_resource(long_wf_id), timeout=60) - except requests.exceptions.ConnectionError: - error_exit('Could not reach WF Manager.') - - if resp.status_code != requests.codes.okay: # pylint: disable=no-member - error_exit('Could not successfully query workflow manager') - - tasks_status = resp.json()['tasks_status'] - wf_status = resp.json()['wf_status'] + wf_status = get_wf_status(wf_id) print(f"Workflow Status is {wf_status}") if wf_status in ('Cancelled', 'Archived'): - response = input(f"All stored information for workflow {_short_id(wf_id)} will be removed. Continue to remove? yes(y)/no(n): " ) + verfiy = """f"All stored information for workflow {_short_id(wf_id)} will be removed." + Continue to remove? yes(y)/no(n): """ + response = input(verify) if response in ("n", "no"): sys.exit("Workflow not removed.") elif response in ("y", "yes"): @@ -374,7 +379,8 @@ def remove(wf_id: str = typer.Argument(..., callback=match_short_id)): typer.secho("Workflow removed!", fg=typer.colors.GREEN) logging.info(f'Remove workflow: {resp.text}') else: - print(f"{_short_id(wf_id)} may still be running.\nThe workflow must be cancelled before attempting removal.") + print(f"{_short_id(wf_id)} may still be running.") + print("The workflow must be cancelled before attempting removal.") sys.exit() @@ -504,16 +510,20 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)): def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)): """Cancel a workflow.""" long_wf_id = wf_id - try: - conn = _wfm_conn() - resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60) - - except requests.exceptions.ConnectionError: - error_exit('Could not reach WF Manager.') - if resp.status_code != requests.codes.accepted: # pylint: disable=no-member - error_exit('WF Manager could not cancel workflow.') - typer.secho("Workflow cancelled!", fg=typer.colors.GREEN) - logging.info(f'Cancel workflow: {resp.text}') + wf_status = get_wf_status(wf_id) + if wf_status == "Initializing": + print(f"Workflow is {wf_status}, please retry cancel later.") + else: + try: + conn = _wfm_conn() + resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60) + + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') + if resp.status_code != requests.codes.accepted: # pylint: disable=no-member + error_exit('WF Manager could not cancel workflow.') + typer.secho("Workflow cancelled!", fg=typer.colors.GREEN) + logging.info(f'Cancel workflow: {resp.text}') @app.command() From 9e43a02fa81a3283844e543e6a22345cb2e1e30b Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Fri, 2 Feb 2024 12:53:50 -0700 Subject: [PATCH 05/12] Fix some pylama complaints, prevent cancel errors and add remove command to documentation --- beeflow/client/bee_client.py | 50 ++++++++++++++++++------------------ beeflow/client/core.py | 2 +- docs/sphinx/commands.rst | 5 ++++ 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 31380ebf9..958fc8f01 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -26,7 +26,6 @@ from beeflow.common.parser import CwlParser from beeflow.common.wf_data import generate_workflow_id from beeflow.client import core -from beeflow.wf_manager.resources import wf_utils # Length of a shortened workflow ID short_id_len = 6 #noqa: Not a constant @@ -169,18 +168,17 @@ def match_short_id(wf_id): def get_wf_status(wf_id): - """ Get workflow status.""" - try: - conn = _wfm_conn() - resp = conn.get(_resource(wf_id), timeout=60) - except requests.exceptions.ConnectionError: - error_exit('Could not reach WF Manager.') + """ Get workflow status.""" + try: + conn = _wfm_conn() + resp = conn.get(_resource(wf_id), timeout=60) + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') - if resp.status_code != requests.codes.okay: # pylint: disable=no-member - error_exit('Could not successfully query workflow manager') + if resp.status_code != requests.codes.okay: # pylint: disable=no-member + error_exit('Could not successfully query workflow manager') - tasks_status = resp.json()['tasks_status'] - return resp.json()['wf_status'] + return resp.json()['wf_status'] app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup) app.add_typer(core.app, name='core') @@ -363,21 +361,21 @@ def remove(wf_id: str = typer.Argument(..., callback=match_short_id)): wf_status = get_wf_status(wf_id) print(f"Workflow Status is {wf_status}") if wf_status in ('Cancelled', 'Archived'): - verfiy = """f"All stored information for workflow {_short_id(wf_id)} will be removed." - Continue to remove? yes(y)/no(n): """ + verify = f"All stored information for workflow {_short_id(wf_id)} will be removed." + verify += "\nContinue to remove? yes(y)/no(n): """ response = input(verify) if response in ("n", "no"): sys.exit("Workflow not removed.") elif response in ("y", "yes"): - try: - conn = _wfm_conn() - resp = conn.delete(_resource(long_wf_id), json={'option': 'remove'}, timeout=60) - except requests.exceptions.ConnectionError: - error_exit('Could not reach WF Manager.') - if resp.status_code != requests.codes.accepted: # pylint: disable=no-member - error_exit('WF Manager could not remove workflow.') - typer.secho("Workflow removed!", fg=typer.colors.GREEN) - logging.info(f'Remove workflow: {resp.text}') + try: + conn = _wfm_conn() + resp = conn.delete(_resource(long_wf_id), json={'option': 'remove'}, timeout=60) + except requests.exceptions.ConnectionError: + error_exit('Could not reach WF Manager.') + if resp.status_code != requests.codes.accepted: # pylint: disable=no-member + error_exit('WF Manager could not remove workflow.') + typer.secho("Workflow removed!", fg=typer.colors.GREEN) + logging.info(f'Remove workflow: {resp.text}') else: print(f"{_short_id(wf_id)} may still be running.") print("The workflow must be cancelled before attempting removal.") @@ -511,9 +509,7 @@ def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)): """Cancel a workflow.""" long_wf_id = wf_id wf_status = get_wf_status(wf_id) - if wf_status == "Initializing": - print(f"Workflow is {wf_status}, please retry cancel later.") - else: + if wf_status == "Running": try: conn = _wfm_conn() resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60) @@ -524,6 +520,10 @@ def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)): error_exit('WF Manager could not cancel workflow.') typer.secho("Workflow cancelled!", fg=typer.colors.GREEN) logging.info(f'Cancel workflow: {resp.text}') + elif wf_status == "Intializing": + print(f"Workflow is {wf_status}, try cancel later.") + else: + print(f"Workflow is {wf_status} cannot cancel.") @app.command() diff --git a/beeflow/client/core.py b/beeflow/client/core.py index db6967bad..b53d5d228 100644 --- a/beeflow/client/core.py +++ b/beeflow/client/core.py @@ -402,7 +402,7 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F', sys.exit(1) print('Starting beeflow...') if not foreground: - print(f'Run `beeflow core status` for more information.') + print('Run `beeflow core status` for more information.') # Create the log path if it doesn't exist yet path = paths.log_path() os.makedirs(path, exist_ok=True) diff --git a/docs/sphinx/commands.rst b/docs/sphinx/commands.rst index 9689f6acd..997714e6f 100644 --- a/docs/sphinx/commands.rst +++ b/docs/sphinx/commands.rst @@ -80,6 +80,11 @@ Arguments: ``beeflow cancel``: Cancel a workflow. +Arguments: + WF_ID [required] + +``beeflow remove``: Remove cancelled or archived workflow and it's information. + Arguments: WF_ID [required] From e9744769f99ffeeb8982ad96e0d893f749db0fa6 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Fri, 2 Feb 2024 13:26:25 -0700 Subject: [PATCH 06/12] Fix pylama issues --- beeflow/client/bee_client.py | 3 ++- beeflow/wf_manager/resources/wf_actions.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index 958fc8f01..b269846b4 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -168,7 +168,7 @@ def match_short_id(wf_id): def get_wf_status(wf_id): - """ Get workflow status.""" + """Get workflow status.""" try: conn = _wfm_conn() resp = conn.get(_resource(wf_id), timeout=60) @@ -180,6 +180,7 @@ def get_wf_status(wf_id): return resp.json()['wf_status'] + app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup) app.add_typer(core.app, name='core') app.add_typer(config_driver.app, name='config') diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 1d3a1d65e..90e377fcc 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -86,7 +86,6 @@ def delete(self, wf_id): os.remove(archive_path) return resp - def patch(self, wf_id): """Pause or resume workflow.""" db = connect_db(wfm_db, db_path) From 272ed1508c89e005dfb321eb03c4f88461dc44b3 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Fri, 2 Feb 2024 14:47:26 -0700 Subject: [PATCH 07/12] Add unit test for remove wf and fix cancel unit test --- beeflow/tests/test_wf_manager.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 48ee9df09..0cb50a531 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -210,11 +210,34 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING") mocker.patch('beeflow.wf_manager.resources.wf_actions.dep_manager.kill_gdb', return_value=None) - request = {'wf_id': WF_ID} + request = {'wf_id': WF_ID, 'option': 'cancel'} resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request) assert resp.json['status'] == 'Cancelled' assert resp.status_code == 202 +def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db): + """Test cancelling a workflow.""" + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', + return_value=MockWFI()) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) + + wf_name = 'wf' + wf_status = 'Archived' + bolt_port = 3030 + gdb_pid = 12345 + + temp_db.workflows.init_workflow(WF_ID, wf_name, wf_status, 'dir', bolt_port, gdb_pid) + temp_db.workflows.add_task(123, WF_ID, 'task', "WAITING") + temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING") + mocker.patch('beeflow.wf_manager.resources.wf_actions.dep_manager.kill_gdb', return_value=None) + + request = {'wf_id': WF_ID, 'option': 'remove'} + resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request) + assert resp.json['status'] == 'Removed' + assert resp.status_code == 202 + + def test_pause_workflow(client, mocker, setup_teardown_workflow, temp_db): """Test pausing a workflow.""" From 8dc95941d25530632247b98eef9aafa8d6d7778e Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Fri, 2 Feb 2024 14:51:28 -0700 Subject: [PATCH 08/12] Fix pylama complaint --- beeflow/tests/test_wf_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 0cb50a531..5135a06bb 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -238,7 +238,6 @@ def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db): assert resp.status_code == 202 - def test_pause_workflow(client, mocker, setup_teardown_workflow, temp_db): """Test pausing a workflow.""" mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', From 12405f3a3afddcd42ff5a431bfd97b87a09f2b5b Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Fri, 2 Feb 2024 14:56:54 -0700 Subject: [PATCH 09/12] Fix another pylama complaint --- beeflow/tests/test_wf_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 5135a06bb..c63842bb8 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -215,6 +215,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): assert resp.json['status'] == 'Cancelled' assert resp.status_code == 202 + def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db): """Test cancelling a workflow.""" mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', From f6685e0d59074c607fed1cedcf6a7f1590e3800a Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Mon, 5 Feb 2024 10:26:10 -0700 Subject: [PATCH 10/12] Correct a comment --- beeflow/tests/test_wf_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index c63842bb8..7d238e874 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -217,7 +217,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db): - """Test cancelling a workflow.""" + """Test removing a workflow.""" mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', return_value=MockWFI()) mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) From 827152e05ca18b05afde7453b66c303e94b54c85 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Mon, 5 Feb 2024 10:30:32 -0700 Subject: [PATCH 11/12] Correct description of delete action --- beeflow/wf_manager/resources/wf_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 90e377fcc..dae324b21 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -56,7 +56,7 @@ def get(wf_id): return resp def delete(self, wf_id): - """Cancel the workflow. Lets current tasks finish running.""" + """Cancel or delete the workflow. For cancel, current tasks finish running.""" self.reqparse.add_argument('option', type=str, location='json') option = self.reqparse.parse_args()['option'] db = connect_db(wfm_db, db_path) From b59d931559ec80d265b7d531a7ffd4573309b679 Mon Sep 17 00:00:00 2001 From: Patricia Grubel Date: Wed, 21 Feb 2024 09:53:14 -0700 Subject: [PATCH 12/12] Fix remove archive, and exit match_short_id when there are no workflows --- .gitignore | 1 + beeflow/client/bee_client.py | 2 +- beeflow/wf_manager/resources/wf_actions.py | 4 +--- pyproject.toml | 3 +-- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 5fa2d5ea6..7f43286ce 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ poetry.lock *.out *.tgz *.tar.gz +*.log .python-version .DS_Store .idea diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index b269846b4..16654c4ce 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -162,7 +162,7 @@ def match_short_id(wf_id): long_wf_id = matched_ids[0] return long_wf_id else: - print("There are currently no workflows.") + sys.exit("There are currently no workflows.") return None diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index dae324b21..49b555483 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -78,10 +78,8 @@ def delete(self, wf_id): resp = make_response(jsonify(status='Removed'), 202) bee_workdir = wf_utils.get_bee_workdir() workflow_dir = f"{bee_workdir}/workflows/{wf_id}" - print(f"Removing {wf_id} {workflow_dir}") shutil.rmtree(workflow_dir, ignore_errors=True) - archive_path = f"{bee_workdir}/archives/{wf_id}" - print(f"Removing {wf_id} {archive_path}") + archive_path = f"{bee_workdir}/archives/{wf_id}.tgz" if os.path.exists(archive_path): os.remove(archive_path) return resp diff --git a/pyproject.toml b/pyproject.toml index d96bd9a85..88b7fceb9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,11 +48,10 @@ beecloud = 'beeflow.cloud_launcher:main' [tool.poetry.dependencies] # Python version (>=3.8.3, <3.11) -python = ">=3.8.3,<=3.11" +python = ">=3.8.3,<=3.12.2" # Package dependencies Flask = { version = "^2.0" } -Jinja2 = { version = "<3.1" } neo4j = { version = "^1.7.4" } PyYAML = { version = "^6.0.1" } flask_restful = "0.3.9"