Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue700/remove archive workflows #774

Merged
merged 13 commits into from
Feb 21, 2024
Merged
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ poetry.lock
*.pyc
*.egg-info
*.out
*.tgz
*.tar.gz
*.log
.python-version
.DS_Store
.idea
Expand Down
71 changes: 61 additions & 10 deletions beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,25 @@ def match_short_id(wf_id):
long_wf_id = matched_ids[0]
return long_wf_id
else:
print("There are currently no workflows.")
sys.exit("There are currently no workflows.")

return None


def get_wf_status(wf_id):
"""Get workflow status."""
try:
conn = _wfm_conn()
resp = conn.get(_resource(wf_id), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')

if resp.status_code != requests.codes.okay: # pylint: disable=no-member
error_exit('Could not successfully query workflow manager')

return resp.json()['wf_status']


app = typer.Typer(no_args_is_help=True, add_completion=False, cls=NaturalOrderGroup)
app.add_typer(core.app, name='core')
app.add_typer(config_driver.app, name='config')
Expand Down Expand Up @@ -354,6 +368,36 @@ def package(wf_path: pathlib.Path = typer.Argument(...,
return package_path


@app.command()
def remove(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Remove a cancelled or archived workflow with a workflow ID."""
long_wf_id = wf_id

wf_status = get_wf_status(wf_id)
print(f"Workflow Status is {wf_status}")
if wf_status in ('Cancelled', 'Archived'):
verify = f"All stored information for workflow {_short_id(wf_id)} will be removed."
verify += "\nContinue to remove? yes(y)/no(n): """
response = input(verify)
if response in ("n", "no"):
sys.exit("Workflow not removed.")
elif response in ("y", "yes"):
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'remove'}, timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not remove workflow.')
typer.secho("Workflow removed!", fg=typer.colors.GREEN)
logging.info(f'Remove workflow: {resp.text}')
else:
print(f"{_short_id(wf_id)} may still be running.")
print("The workflow must be cancelled before attempting removal.")

sys.exit()


def unpackage(package_path, dest_path):
"""Unpackage a workflow tarball for parsing."""
package_str = str(package_path)
Expand Down Expand Up @@ -479,15 +523,22 @@ def resume(wf_id: str = typer.Argument(..., callback=match_short_id)):
def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)):
"""Cancel a workflow."""
long_wf_id = wf_id
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), timeout=60)
except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not cancel workflow.')
typer.secho("Workflow cancelled!", fg=typer.colors.GREEN)
logging.info(f'Cancel workflow: {resp.text}')
wf_status = get_wf_status(wf_id)
if wf_status == "Running":
try:
conn = _wfm_conn()
resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60)

except requests.exceptions.ConnectionError:
error_exit('Could not reach WF Manager.')
if resp.status_code != requests.codes.accepted: # pylint: disable=no-member
error_exit('WF Manager could not cancel workflow.')
typer.secho("Workflow cancelled!", fg=typer.colors.GREEN)
logging.info(f'Cancel workflow: {resp.text}')
elif wf_status == "Intializing":
print(f"Workflow is {wf_status}, try cancel later.")
else:
print(f"Workflow is {wf_status} cannot cancel.")


@app.command()
Expand Down
2 changes: 1 addition & 1 deletion beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
sys.exit(1)
print('Starting beeflow...')
if not foreground:
print(f'Check "{beeflow_log}" or run `beeflow core status` for more information.')
print('Run `beeflow core status` for more information.')
# Create the log path if it doesn't exist yet
path = paths.log_path()
os.makedirs(path, exist_ok=True)
Expand Down
25 changes: 24 additions & 1 deletion beeflow/tests/test_wf_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,35 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db):
temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING")
mocker.patch('beeflow.wf_manager.resources.wf_actions.dep_manager.kill_gdb', return_value=None)

request = {'wf_id': WF_ID}
request = {'wf_id': WF_ID, 'option': 'cancel'}
resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request)
assert resp.json['status'] == 'Cancelled'
assert resp.status_code == 202


def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db):
"""Test removing a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
return_value=MockWFI())
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file)
mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file)

wf_name = 'wf'
wf_status = 'Archived'
bolt_port = 3030
gdb_pid = 12345

temp_db.workflows.init_workflow(WF_ID, wf_name, wf_status, 'dir', bolt_port, gdb_pid)
temp_db.workflows.add_task(123, WF_ID, 'task', "WAITING")
temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING")
mocker.patch('beeflow.wf_manager.resources.wf_actions.dep_manager.kill_gdb', return_value=None)

request = {'wf_id': WF_ID, 'option': 'remove'}
resp = client().delete(f'/bee_wfm/v1/jobs/{WF_ID}', json=request)
assert resp.json['status'] == 'Removed'
assert resp.status_code == 202


def test_pause_workflow(client, mocker, setup_teardown_workflow, temp_db):
"""Test pausing a workflow."""
mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface',
Expand Down
42 changes: 28 additions & 14 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""This module contains the workflow action endpoints."""
import shutil
import os

from flask import make_response, jsonify
from flask_restful import Resource, reqparse
Expand Down Expand Up @@ -53,21 +55,33 @@ def get(wf_id):
wf_status=wf_status, status='ok'), 200)
return resp

@staticmethod
def delete(wf_id):
"""Cancel the workflow. Lets current tasks finish running."""
def delete(self, wf_id):
"""Cancel or delete the workflow. For cancel, current tasks finish running."""
self.reqparse.add_argument('option', type=str, location='json')
option = self.reqparse.parse_args()['option']
db = connect_db(wfm_db, db_path)
wfi = wf_utils.get_workflow_interface(wf_id)
# Remove all tasks currently in the database
if wfi.workflow_loaded():
wfi.finalize_workflow()
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info("Workflow cancelled")
log.info("Shutting down gdb")
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
resp = make_response(jsonify(status='Cancelled'), 202)
if option == "cancel":
wfi = wf_utils.get_workflow_interface(wf_id)
# Remove all tasks currently in the database
if wfi.workflow_loaded():
wfi.finalize_workflow()
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info("Workflow cancelled")
log.info("Shutting down gdb")
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
log.info(f"Removing workflow {wf_id}.")
db.workflows.delete_workflow(wf_id)
resp = make_response(jsonify(status='Removed'), 202)
bee_workdir = wf_utils.get_bee_workdir()
workflow_dir = f"{bee_workdir}/workflows/{wf_id}"
shutil.rmtree(workflow_dir, ignore_errors=True)
archive_path = f"{bee_workdir}/archives/{wf_id}.tgz"
if os.path.exists(archive_path):
os.remove(archive_path)
return resp

def patch(self, wf_id):
Expand Down
5 changes: 5 additions & 0 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ Arguments:

``beeflow cancel``: Cancel a workflow.

Arguments:
WF_ID [required]

``beeflow remove``: Remove cancelled or archived workflow and it's information.

Arguments:
WF_ID [required]

Expand Down
1 change: 0 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
*.tgz
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ beeflow = 'beeflow.client.bee_client:main'
beecloud = 'beeflow.cloud_launcher:main'

[tool.poetry.dependencies]

# Python version (>=3.8.3, <=3.12.2)
python = ">=3.8.3,<=3.12.2"

Expand Down
Loading