From 1da3aa4b67bf6eb90d1a240f2d39cb48c0067985 Mon Sep 17 00:00:00 2001 From: amesar Date: Sat, 20 Jul 2024 17:16:01 -0400 Subject: [PATCH] Issue #191: added support to export nested runs for Databricks MLflow only --- mlflow_export_import/common/click_options.py | 2 +- .../experiment/export_experiment.py | 88 ++++++++++--------- .../experiment/nested_runs_utils.py | 27 ++++++ 3 files changed, 76 insertions(+), 41 deletions(-) create mode 100644 mlflow_export_import/experiment/nested_runs_utils.py diff --git a/mlflow_export_import/common/click_options.py b/mlflow_export_import/common/click_options.py index 1c7ca04..303a8c6 100644 --- a/mlflow_export_import/common/click_options.py +++ b/mlflow_export_import/common/click_options.py @@ -98,7 +98,7 @@ def opt_run_ids(function): def opt_check_nested_runs(function): function = click.option("--check-nested-runs", - help="Check if run is parent of nested run", + help="Check if run in the 'run-ids' option is a parent of nested runs and export all the nested runs.", type=bool, default=False, show_default=True diff --git a/mlflow_export_import/experiment/export_experiment.py b/mlflow_export_import/experiment/export_experiment.py index 3e528fc..ac1656e 100644 --- a/mlflow_export_import/experiment/export_experiment.py +++ b/mlflow_export_import/experiment/export_experiment.py @@ -12,7 +12,8 @@ opt_notebook_formats, opt_export_permissions, opt_run_start_time, - opt_export_deleted_runs + opt_export_deleted_runs, + opt_check_nested_runs ) from mlflow_export_import.common.iterators import SearchRunsIterator from mlflow_export_import.common import utils, io_utils, mlflow_utils @@ -20,6 +21,7 @@ from mlflow_export_import.common.timestamp_utils import fmt_ts_millis, utc_str_to_millis from mlflow_export_import.client.client_utils import create_mlflow_client, create_dbx_client from mlflow_export_import.run.export_run import export_run +from . import nested_runs_utils _logger = utils.getLogger(__name__) @@ -31,6 +33,7 @@ def export_experiment( export_permissions = False, run_start_time = None, export_deleted_runs = False, + check_nested_runs = False, notebook_formats = None, mlflow_client = None ): @@ -39,12 +42,14 @@ def export_experiment( :param: output_dir: Output directory. :param: run_ids: List of run IDs to export. If None then export all run IDs. :param: export_permissions - Export Databricks permissions. + :param: export_deleted_runs - Export deleted runs. + :param: check_nested_runs - Check if run in the 'run-ids' option is a parent of nested runs and + export all the nested runs. :param: run_start_time - Only export runs started after this UTC time (inclusive). Format: YYYY-MM-DD. :param: notebook_formats: List of notebook formats to export. Values are SOURCE, HTML, JUPYTER or DBC. :param: mlflow_client: MLflow client. :return: Number of successful and number of failed runs. """ - mlflow_client = mlflow_client or create_mlflow_client() dbx_client = create_dbx_client(mlflow_client) @@ -62,10 +67,9 @@ def export_experiment( failed_run_ids = [] num_runs_exported = 0 if run_ids: - for _,run_id in enumerate(run_ids): - _export_run(mlflow_client, run_id, exp.experiment_id, output_dir, ok_run_ids, failed_run_ids, - run_start_time, run_start_time_str, export_deleted_runs, notebook_formats) - num_runs_exported += 1 + runs = _get_runs(mlflow_client, run_ids, exp, failed_run_ids) + if check_nested_runs: # ZZ + runs = nested_runs_utils.get_nested_runs(mlflow_client, runs) # else: kwargs = {} if run_start_time: @@ -73,10 +77,12 @@ def export_experiment( if export_deleted_runs: from mlflow.entities import ViewType kwargs["view_type"] = ViewType.ALL - for _,run in enumerate(SearchRunsIterator(mlflow_client, exp.experiment_id, **kwargs)): - _export_run(mlflow_client, run, exp.experiment_id, output_dir, ok_run_ids, failed_run_ids, - run_start_time, run_start_time_str, export_deleted_runs, notebook_formats) - num_runs_exported += 1 + runs = SearchRunsIterator(mlflow_client, exp.experiment_id, **kwargs) + + for run in runs: + _export_run(mlflow_client, run, output_dir, ok_run_ids, failed_run_ids, + run_start_time, run_start_time_str, export_deleted_runs, notebook_formats) + num_runs_exported += 1 info_attr = { "num_total_runs": (num_runs_exported), @@ -105,42 +111,20 @@ def export_experiment( return len(ok_run_ids), len(failed_run_ids) -def _export_run(mlflow_client, run_or_run_id, experiment_id, output_dir, +def _export_run(mlflow_client, run, output_dir, ok_run_ids, failed_run_ids, run_start_time, run_start_time_str, export_deleted_runs, notebook_formats ): - - if isinstance(run_or_run_id, str): - try: - run = mlflow_client.get_run(run_or_run_id) - except Exception as e: - err_msg = { "run_id": run_or_run_id, "experiment_id": experiment_id, "Exception": e } - _logger.warning(f"Run export failed: {err_msg}") - failed_run_ids.append(run_or_run_id) - return - else: - run = run_or_run_id - if run_start_time and run.info.start_time < run_start_time: - msg = { - "run_id": {run.info.run_id}, - "experiment_id": {run.info.experiment_id}, - "start_time": fmt_ts_millis(run.info.start_time), - "run_start_time": run_start_time_str - } - _logger.info(f"Not exporting run: {msg}") - return - - if run.info.experiment_id != experiment_id: + if run_start_time and run.info.start_time < run_start_time: msg = { - "run_id": {run.info.run_id}, - "run.experiment_id": {run.info.experiment_id}, - "experiment_id": {experiment_id} + "run_id": {run.info.run_id}, + "experiment_id": {run.info.experiment_id}, + "start_time": fmt_ts_millis(run.info.start_time), + "run_start_time": run_start_time_str } - _logger.warning(f"Not exporting run since it doesn't belong to experiment: {msg}") - failed_run_ids.append(run.info.run_id) + _logger.info(f"Not exporting run: {msg}") return - is_success = export_run( run_id = run.info.run_id, output_dir = os.path.join(output_dir, run.info.run_id), @@ -154,6 +138,28 @@ def _export_run(mlflow_client, run_or_run_id, experiment_id, output_dir, failed_run_ids.append(run.info.run_id) +def _get_runs(mlflow_client, run_ids, exp, failed_run_ids): + runs = [] + for run_id in run_ids: + try: + run = mlflow_client.get_run(run_id) + if run.info.experiment_id == exp.experiment_id: + runs.append(run) + else: + msg = { + "run_id": {run.info.run_id}, + "run.experiment_id": {run.info.experiment_id}, + "experiment_id": {exp.experiment_id} + } + _logger.warning(f"Not exporting run since it doesn't belong to experiment: {msg}") + failed_run_ids.append(run.info.run_id) + except Exception as e: + msg = { "run_id": run_id, "experiment_id": exp.experiment_id, "Exception": e } + _logger.warning(f"Run export failed: {msg}") + failed_run_ids.append(run_id) + return runs + + @click.command() @opt_experiment @opt_output_dir @@ -161,9 +167,10 @@ def _export_run(mlflow_client, run_or_run_id, experiment_id, output_dir, @opt_export_permissions @opt_run_start_time @opt_export_deleted_runs +@opt_check_nested_runs @opt_notebook_formats -def main(experiment, output_dir, run_ids, export_permissions, run_start_time, export_deleted_runs, notebook_formats): +def main(experiment, output_dir, run_ids, export_permissions, run_start_time, export_deleted_runs, check_nested_runs, notebook_formats): _logger.info("Options:") for k,v in locals().items(): _logger.info(f" {k}: {v}") @@ -178,6 +185,7 @@ def main(experiment, output_dir, run_ids, export_permissions, run_start_time, ex export_permissions = export_permissions, run_start_time = run_start_time, export_deleted_runs = export_deleted_runs, + check_nested_runs = check_nested_runs, notebook_formats = utils.string_to_list(notebook_formats) ) diff --git a/mlflow_export_import/experiment/nested_runs_utils.py b/mlflow_export_import/experiment/nested_runs_utils.py new file mode 100644 index 0000000..c09bc7f --- /dev/null +++ b/mlflow_export_import/experiment/nested_runs_utils.py @@ -0,0 +1,27 @@ +from mlflow_export_import.common import utils +from mlflow_export_import.common.iterators import SearchRunsIterator + +_logger = utils.getLogger(__name__) + + +def get_nested_runs(client, runs): + """ + Return set of run_ids and their nested run descendants from list of run IDs. + """ + if utils.calling_databricks(): + return get_by_rootRunId(client, runs) + else: + _logger.warning(f"OSS MLflow nested run export not yet supported") + return runs + + +def get_by_rootRunId(client, runs): + """ + Return list of nested run descendants (includes the root run). + Unlike Databricks MLflow, OSS MLflow does not add the 'mlflow.rootRunId' tag to child runs. + """ + descendant_runs= [] + for run in runs: + filter = f"tags.mlflow.rootRunId = '{run.info.run_id}'" + descendant_runs += list(SearchRunsIterator(client, run.info.experiment_id, filter=filter)) + return descendant_runs