Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More defensive approach to value exporting #48

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions src/neptune_mlflow_exporter/impl/components/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import mlflow
from mlflow.entities import Experiment
from mlflow.entities import Run as MlflowRun
from neptune.utils import stringify_unsupported

from neptune_mlflow_exporter.impl.artifact_strategy import choose_upload_strategy

Expand All @@ -42,39 +43,47 @@ def export_experiment_metadata(neptune_run: NeptuneRun, experiment: Experiment)
neptune_run["experiment/name"] = experiment.name

# https://stackoverflow.com/questions/9744775/how-to-convert-integer-timestamp-into-a-datetime
neptune_run["experiment/creation_time"] = datetime.fromtimestamp(experiment.creation_time / 1e3)
neptune_run["experiment/last_update_time"] = datetime.fromtimestamp(experiment.last_update_time / 1e3)
if experiment.creation_time is not None:
neptune_run["experiment/creation_time"] = datetime.fromtimestamp(experiment.creation_time / 1e3)

if experiment.last_update_time is not None:
neptune_run["experiment/last_update_time"] = datetime.fromtimestamp(experiment.last_update_time / 1e3)

@staticmethod
def export_run_info(neptune_run: NeptuneRun, mlflow_run: MlflowRun) -> None:
info = dict(mlflow_run.info)

info["start_time"] = datetime.fromtimestamp(mlflow_run.info.start_time / 1e3)
info["end_time"] = datetime.fromtimestamp(mlflow_run.info.end_time / 1e3)
if mlflow_run.info.start_time is not None:
info["start_time"] = datetime.fromtimestamp(mlflow_run.info.start_time / 1e3)

if mlflow_run.info.end_time is not None:
info["end_time"] = datetime.fromtimestamp(mlflow_run.info.end_time / 1e3)

neptune_run["run_info"] = info
neptune_run["run_info"] = stringify_unsupported(info)
neptune_run["sys/name"] = info["run_name"]

def export_run_data(self, neptune_run: NeptuneRun, mlflow_run: MlflowRun) -> None:
data_dict = mlflow_run.data.to_dictionary()
metric_keys = data_dict["metrics"].keys()
del data_dict["metrics"]

if "metrics" in data_dict:
metric_keys = data_dict["metrics"].keys()
del data_dict["metrics"]

for key in metric_keys:
metrics = self.mlflow_client.get_metric_history(
run_id=mlflow_run.info.run_id,
key=key,
)
metric_values = list(map(lambda metric: metric.value, metrics))
metric_timestamps = list(
map(lambda metric: metric.timestamp / 1e3 if metric.timestamp else None, metrics)
)
metric_steps = list(map(lambda metric: metric.step, metrics))

neptune_run[f"run_data/metrics/{key}"].extend(
metric_values, steps=metric_steps, timestamps=metric_timestamps
)
neptune_run["run_data"] = data_dict

for key in metric_keys:
metrics = self.mlflow_client.get_metric_history(
run_id=mlflow_run.info.run_id,
key=key,
)
metric_values = list(map(lambda metric: metric.value, metrics))
metric_timestamps = list(map(lambda metric: metric.timestamp / 1e3, metrics))
metric_steps = list(map(lambda metric: metric.step, metrics))

neptune_run[f"run_data/metrics/{key}"].extend(
metric_values, steps=metric_steps, timestamps=metric_timestamps
)

def export_artifacts(
self, neptune_run: NeptuneRun, mlflow_run: MlflowRun, max_artifact_size: int, tracking_uri: Optional[str]
) -> None:
Expand Down
Loading