Skip to content

Commit

Permalink
Issue #191: added support to export nested runs for Databricks MLflow…
Browse files Browse the repository at this point in the history
… only
  • Loading branch information
amesar committed Jul 20, 2024
1 parent 302e303 commit 1da3aa4
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 41 deletions.
2 changes: 1 addition & 1 deletion mlflow_export_import/common/click_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def opt_run_ids(function):

def opt_check_nested_runs(function):
function = click.option("--check-nested-runs",
help="Check if run is parent of nested run",
help="Check if run in the 'run-ids' option is a parent of nested runs and export all the nested runs.",
type=bool,
default=False,
show_default=True
Expand Down
88 changes: 48 additions & 40 deletions mlflow_export_import/experiment/export_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
opt_notebook_formats,
opt_export_permissions,
opt_run_start_time,
opt_export_deleted_runs
opt_export_deleted_runs,
opt_check_nested_runs
)
from mlflow_export_import.common.iterators import SearchRunsIterator
from mlflow_export_import.common import utils, io_utils, mlflow_utils
from mlflow_export_import.common import ws_permissions_utils
from mlflow_export_import.common.timestamp_utils import fmt_ts_millis, utc_str_to_millis
from mlflow_export_import.client.client_utils import create_mlflow_client, create_dbx_client
from mlflow_export_import.run.export_run import export_run
from . import nested_runs_utils

_logger = utils.getLogger(__name__)

Expand All @@ -31,6 +33,7 @@ def export_experiment(
export_permissions = False,
run_start_time = None,
export_deleted_runs = False,
check_nested_runs = False,
notebook_formats = None,
mlflow_client = None
):
Expand All @@ -39,12 +42,14 @@ def export_experiment(
:param: output_dir: Output directory.
:param: run_ids: List of run IDs to export. If None then export all run IDs.
:param: export_permissions - Export Databricks permissions.
:param: export_deleted_runs - Export deleted runs.
:param: check_nested_runs - Check if run in the 'run-ids' option is a parent of nested runs and
export all the nested runs.
:param: run_start_time - Only export runs started after this UTC time (inclusive). Format: YYYY-MM-DD.
:param: notebook_formats: List of notebook formats to export. Values are SOURCE, HTML, JUPYTER or DBC.
:param: mlflow_client: MLflow client.
:return: Number of successful and number of failed runs.
"""

mlflow_client = mlflow_client or create_mlflow_client()
dbx_client = create_dbx_client(mlflow_client)

Expand All @@ -62,21 +67,22 @@ def export_experiment(
failed_run_ids = []
num_runs_exported = 0
if run_ids:
for _,run_id in enumerate(run_ids):
_export_run(mlflow_client, run_id, exp.experiment_id, output_dir, ok_run_ids, failed_run_ids,
run_start_time, run_start_time_str, export_deleted_runs, notebook_formats)
num_runs_exported += 1
runs = _get_runs(mlflow_client, run_ids, exp, failed_run_ids)
if check_nested_runs: # ZZ
runs = nested_runs_utils.get_nested_runs(mlflow_client, runs) #
else:
kwargs = {}
if run_start_time:
kwargs["filter"] = f"start_time > {run_start_time}"
if export_deleted_runs:
from mlflow.entities import ViewType
kwargs["view_type"] = ViewType.ALL
for _,run in enumerate(SearchRunsIterator(mlflow_client, exp.experiment_id, **kwargs)):
_export_run(mlflow_client, run, exp.experiment_id, output_dir, ok_run_ids, failed_run_ids,
run_start_time, run_start_time_str, export_deleted_runs, notebook_formats)
num_runs_exported += 1
runs = SearchRunsIterator(mlflow_client, exp.experiment_id, **kwargs)

for run in runs:
_export_run(mlflow_client, run, output_dir, ok_run_ids, failed_run_ids,
run_start_time, run_start_time_str, export_deleted_runs, notebook_formats)
num_runs_exported += 1

info_attr = {
"num_total_runs": (num_runs_exported),
Expand Down Expand Up @@ -105,42 +111,20 @@ def export_experiment(
return len(ok_run_ids), len(failed_run_ids)


def _export_run(mlflow_client, run_or_run_id, experiment_id, output_dir,
def _export_run(mlflow_client, run, output_dir,
ok_run_ids, failed_run_ids,
run_start_time, run_start_time_str,
export_deleted_runs, notebook_formats
):

if isinstance(run_or_run_id, str):
try:
run = mlflow_client.get_run(run_or_run_id)
except Exception as e:
err_msg = { "run_id": run_or_run_id, "experiment_id": experiment_id, "Exception": e }
_logger.warning(f"Run export failed: {err_msg}")
failed_run_ids.append(run_or_run_id)
return
else:
run = run_or_run_id
if run_start_time and run.info.start_time < run_start_time:
msg = {
"run_id": {run.info.run_id},
"experiment_id": {run.info.experiment_id},
"start_time": fmt_ts_millis(run.info.start_time),
"run_start_time": run_start_time_str
}
_logger.info(f"Not exporting run: {msg}")
return

if run.info.experiment_id != experiment_id:
if run_start_time and run.info.start_time < run_start_time:
msg = {
"run_id": {run.info.run_id},
"run.experiment_id": {run.info.experiment_id},
"experiment_id": {experiment_id}
"run_id": {run.info.run_id},
"experiment_id": {run.info.experiment_id},
"start_time": fmt_ts_millis(run.info.start_time),
"run_start_time": run_start_time_str
}
_logger.warning(f"Not exporting run since it doesn't belong to experiment: {msg}")
failed_run_ids.append(run.info.run_id)
_logger.info(f"Not exporting run: {msg}")
return

is_success = export_run(
run_id = run.info.run_id,
output_dir = os.path.join(output_dir, run.info.run_id),
Expand All @@ -154,16 +138,39 @@ def _export_run(mlflow_client, run_or_run_id, experiment_id, output_dir,
failed_run_ids.append(run.info.run_id)


def _get_runs(mlflow_client, run_ids, exp, failed_run_ids):
runs = []
for run_id in run_ids:
try:
run = mlflow_client.get_run(run_id)
if run.info.experiment_id == exp.experiment_id:
runs.append(run)
else:
msg = {
"run_id": {run.info.run_id},
"run.experiment_id": {run.info.experiment_id},
"experiment_id": {exp.experiment_id}
}
_logger.warning(f"Not exporting run since it doesn't belong to experiment: {msg}")
failed_run_ids.append(run.info.run_id)
except Exception as e:
msg = { "run_id": run_id, "experiment_id": exp.experiment_id, "Exception": e }
_logger.warning(f"Run export failed: {msg}")
failed_run_ids.append(run_id)
return runs


@click.command()
@opt_experiment
@opt_output_dir
@opt_run_ids
@opt_export_permissions
@opt_run_start_time
@opt_export_deleted_runs
@opt_check_nested_runs
@opt_notebook_formats

def main(experiment, output_dir, run_ids, export_permissions, run_start_time, export_deleted_runs, notebook_formats):
def main(experiment, output_dir, run_ids, export_permissions, run_start_time, export_deleted_runs, check_nested_runs, notebook_formats):
_logger.info("Options:")
for k,v in locals().items():
_logger.info(f" {k}: {v}")
Expand All @@ -178,6 +185,7 @@ def main(experiment, output_dir, run_ids, export_permissions, run_start_time, ex
export_permissions = export_permissions,
run_start_time = run_start_time,
export_deleted_runs = export_deleted_runs,
check_nested_runs = check_nested_runs,
notebook_formats = utils.string_to_list(notebook_formats)
)

Expand Down
27 changes: 27 additions & 0 deletions mlflow_export_import/experiment/nested_runs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from mlflow_export_import.common import utils
from mlflow_export_import.common.iterators import SearchRunsIterator

_logger = utils.getLogger(__name__)


def get_nested_runs(client, runs):
"""
Return set of run_ids and their nested run descendants from list of run IDs.
"""
if utils.calling_databricks():
return get_by_rootRunId(client, runs)
else:
_logger.warning(f"OSS MLflow nested run export not yet supported")
return runs


def get_by_rootRunId(client, runs):
"""
Return list of nested run descendants (includes the root run).
Unlike Databricks MLflow, OSS MLflow does not add the 'mlflow.rootRunId' tag to child runs.
"""
descendant_runs= []
for run in runs:
filter = f"tags.mlflow.rootRunId = '{run.info.run_id}'"
descendant_runs += list(SearchRunsIterator(client, run.info.experiment_id, filter=filter))
return descendant_runs

0 comments on commit 1da3aa4

Please sign in to comment.