Skip to content

Commit

Permalink
update import and lint in mlflow.py
Browse files Browse the repository at this point in the history
  • Loading branch information
yoonhyejin committed Dec 23, 2024
1 parent b50a148 commit 338afb4
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import time

from mlflow import MlflowClient
from mlflow.entities import Run
from mlflow.entities.model_registry import ModelVersion, RegisteredModel, Experiment
from mlflow.entities import Run, Experiment
from mlflow.entities.model_registry import ModelVersion, RegisteredModel
from mlflow.store.entities import PagedList
from pydantic.fields import Field

Expand Down Expand Up @@ -45,7 +45,14 @@
DataProcessInstanceRunResultClass,
DataProcessInstanceOutputClass,
)
from datahub.metadata.urns import DatasetUrn, DataPlatformUrn, MlModelUrn, MlModelGroupUrn, DataProcessInstanceUrn, DataPlatformInstanceUrn
from datahub.metadata.urns import (
DatasetUrn,
DataPlatformUrn,
MlModelUrn,
MlModelGroupUrn,
DataProcessInstanceUrn,
DataPlatformInstanceUrn,
)
from datahub.api.entities.dataprocess.dataprocess_instance import (
DataProcessInstance,
InstanceRunResult,
Expand Down Expand Up @@ -213,14 +220,16 @@ def _get_experiment_container_workunit(
experiment_container = Container(
key=ContainerKeyWithId(
platform=str(DataPlatformUrn.create_from_id("mlflow")),
id=experiment.name
id=experiment.name,
),
subtype="ML Experiment",
name=experiment.name,
description=experiment.tags.get("mlflow.note.content"),
) # TODO: this generates a urn as guid, should we change this to use experiment.id?

print("experiment.key.id:", experiment.key.id) # this should be same as container key as urn
print(
"experiment.key.id:", experiment.key.id
) # this should be same as container key as urn
print("experiment.key.as_urn(): ", experiment.key.as_urn())

workunits = [mcp.as_workunit() for mcp in experiment.generate_mcp()]
Expand Down Expand Up @@ -298,9 +307,7 @@ def _get_run_workunits(
workunits.append(
MetadataChangeProposalWrapper(
entityUrn=str(data_process_instance.urn),
aspect=DataProcessInstanceOutputClass(
outputs=[model_version_urn]
),
aspect=DataProcessInstanceOutputClass(outputs=[model_version_urn]),
).as_workunit()
)

Expand All @@ -323,7 +330,6 @@ def _get_run_workunits(
) # TODO: this should be SUCCESS, SKIPPED, FAILURE, UP_FOR_RETRY
duration_millis = run.info.end_time - run.info.start_time


if run.info.end_time:
workunits.append(
MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -357,7 +363,7 @@ def _get_run_workunits(
created=AuditStampClass(
time=created_time,
actor=created_actor,
)
),
),
).as_workunit()
)
Expand Down

0 comments on commit 338afb4

Please sign in to comment.