Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the e2e example work with the Vertex AI model registry/deployer #3248

Draft
wants to merge 2 commits into
base: feature/vertex-ai-deployer-model-registry
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions examples/e2e/configs/deployer_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
# environment configuration
settings:
docker:
parent_image: public.ecr.aws/f7g3v0f1/zenml:stefan
python_package_installer: uv
required_integrations:
- aws
- evidently
- kubeflow
- kubernetes
- mlflow
- sklearn
- slack
- pandas

# configuration of steps
steps:
Expand Down
9 changes: 3 additions & 6 deletions examples/e2e/configs/inference_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
# environment configuration
settings:
docker:
parent_image: public.ecr.aws/f7g3v0f1/zenml:stefan
python_package_installer: uv
required_integrations:
- aws
- evidently
- kubeflow
- kubernetes
- mlflow
- sklearn
- slack
- pandas

# configuration of steps
steps:
Expand Down
9 changes: 3 additions & 6 deletions examples/e2e/configs/train_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
# environment configuration
settings:
docker:
parent_image: public.ecr.aws/f7g3v0f1/zenml:stefan
python_package_installer: uv
required_integrations:
- aws
- evidently
- kubeflow
- kubernetes
- mlflow
- sklearn
- slack
- pandas

# configuration of steps
steps:
Expand Down
2 changes: 2 additions & 0 deletions examples/e2e/pipelines/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
hp_tuning_select_best_model,
hp_tuning_single_search,
model_evaluator,
model_register,
model_trainer,
notify_on_failure,
notify_on_success,
Expand Down Expand Up @@ -109,6 +110,7 @@ def e2e_use_case_training(
model=best_model,
target=target,
)
model_register(after=["model_trainer"])
model_evaluator(
model=model,
dataset_trn=dataset_trn,
Expand Down
2 changes: 1 addition & 1 deletion examples/e2e/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@
compute_performance_metrics_on_current_data,
promote_with_metric_compare,
)
from .training import model_evaluator, model_trainer
from .training import model_evaluator, model_register, model_trainer
from .deployment import deployment_deploy
81 changes: 62 additions & 19 deletions examples/e2e/steps/deployment/deployment_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@

from zenml import ArtifactConfig, get_step_context, step
from zenml.client import Client
from zenml.integrations.mlflow.services.mlflow_deployment import (
MLFlowDeploymentService,
)
from zenml.integrations.mlflow.steps.mlflow_deployer import (
mlflow_model_registry_deployer_step,
)
from zenml.logger import get_logger
from zenml.services.service import BaseDeploymentService

mlflow_enabled = False
try:
from zenml.integrations.mlflow.steps.mlflow_deployer import (
mlflow_model_registry_deployer_step,
)

mlflow_enabled = True
except ImportError:
pass


logger = get_logger(__name__)


@step
def deployment_deploy() -> (
Annotated[
Optional[MLFlowDeploymentService],
Optional[BaseDeploymentService],
ArtifactConfig(name="mlflow_deployment", is_deployment_artifact=True),
]
):
Expand All @@ -58,20 +64,57 @@ def deployment_deploy() -> (
Returns:
The predictions as pandas series
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
if Client().active_stack.orchestrator.flavor == "local":
model = get_step_context().model

# deploy predictor service
deployment_service = mlflow_model_registry_deployer_step.entrypoint(
registry_model_name=model.name,
registry_model_version=model.run_metadata[
"model_registry_version"
],
replace_existing=True,
model_deployer = Client().active_stack.model_deployer
current_model = get_step_context().model

if mlflow_enabled:
if Client().active_stack.orchestrator.flavor == "local":
logger.warning(
"Skipping deployment as the orchestrator is not local."
)
deployment_service = None
else:
# deploy predictor service
deployment_service = (
mlflow_model_registry_deployer_step.entrypoint(
registry_model_name=current_model.name,
registry_model_version=current_model.run_metadata[
"model_registry_version"
],
replace_existing=True,
)
)
elif model_deployer is not None:
from zenml.integrations.gcp.model_deployers import (
VertexModelDeployer,
)
from zenml.integrations.gcp.services.vertex_deployment import (
VertexDeploymentConfig,
VertexDeploymentService,
)

if isinstance(model_deployer, VertexModelDeployer):

vertex_deployment_config = VertexDeploymentConfig(
location=model_deployer.config.location,
name="zenml-vertex-e2e",
model_name=current_model.name,
model_version=current_model.version,
model_id=f"{current_model.name}_{current_model.version}",
)
deployment_service = model_deployer.deploy_model(
config=vertex_deployment_config,
service_type=VertexDeploymentService.SERVICE_TYPE,
)

logger.info(
f"The deployed service info: {model_deployer.get_model_server_info(deployment_service)}"
)
else:
logger.warning("Skipping deployment as the orchestrator is not local.")
logger.warning(
"Skipping deployment as no model deployer is available."
)
deployment_service = None

### YOUR CODE ENDS HERE ###
return deployment_service
11 changes: 6 additions & 5 deletions examples/e2e/steps/inference/inference_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
from typing_extensions import Annotated

from zenml import get_step_context, step
from zenml.integrations.mlflow.services.mlflow_deployment import (
MLFlowDeploymentService,
)
from zenml.logger import get_logger
from zenml.services.service import BaseDeploymentService

logger = get_logger(__name__)

Expand Down Expand Up @@ -56,12 +54,15 @@ def inference_predict(
model = get_step_context().model

# get predictor
predictor_service: Optional[MLFlowDeploymentService] = model.load_artifact(
predictor_service: Optional[BaseDeploymentService] = model.load_artifact(
"mlflow_deployment"
)
if predictor_service is not None:
# run prediction from service
predictions = predictor_service.predict(request=dataset_inf)

# For MLFLow:
# predictions = predictor_service.predict(request=dataset_inf)
predictions = predictor_service.predict(instances=dataset_inf.to_dict("records"))
else:
logger.warning(
"Predicting from loaded model instead of deployment service "
Expand Down
23 changes: 12 additions & 11 deletions examples/e2e/steps/promotion/promote_with_metric_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,29 @@ def promote_with_metric_compare(
logger.info(f"Current model version was promoted to '{target_env}'.")

# Promote in Model Registry
latest_version_model_registry_number = latest_version.run_metadata[
latest_version_model_registry_number = latest_version.run_metadata.get(
"model_registry_version"
]
)
if current_version_number is None:
current_version_model_registry_number = (
latest_version_model_registry_number
)
else:
current_version_model_registry_number = (
current_version.run_metadata["model_registry_version"]
current_version.run_metadata.get("model_registry_version")
)
if current_version_model_registry_number:
promote_in_model_registry(
latest_version=latest_version_model_registry_number,
current_version=current_version_model_registry_number,
model_name=mlflow_model_name,
target_env=target_env.capitalize(),
)
promote_in_model_registry(
latest_version=latest_version_model_registry_number,
current_version=current_version_model_registry_number,
model_name=mlflow_model_name,
target_env=target_env.capitalize(),
)
promoted_version = latest_version_model_registry_number
else:
promoted_version = current_version.run_metadata[
promoted_version = current_version.run_metadata.get(
"model_registry_version"
]
)

logger.info(
f"Current model version in `{target_env}` is `{promoted_version}` registered in Model Registry"
Expand Down
1 change: 1 addition & 0 deletions examples/e2e/steps/training/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@


from .model_evaluator import model_evaluator
from .model_register import model_register
from .model_trainer import model_trainer
23 changes: 20 additions & 3 deletions examples/e2e/steps/training/model_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,36 @@
#


import mlflow
import pandas as pd
from sklearn.base import ClassifierMixin

from zenml import step
from zenml.client import Client
from zenml.logger import get_logger

mlflow_enabled = False
try:
import mlflow

from zenml.integrations.mlflow.experiment_trackers import (
MLFlowExperimentTracker,
)

mlflow_enabled = True
except ImportError:
pass

logger = get_logger(__name__)

experiment_tracker = Client().active_stack.experiment_tracker

if mlflow_enabled:
if not experiment_tracker or not isinstance(
experiment_tracker, MLFlowExperimentTracker
):
mlflow_enabled = False

@step(experiment_tracker=experiment_tracker.name)
@step(experiment_tracker=experiment_tracker.name if experiment_tracker else None)
def model_evaluator(
model: ClassifierMixin,
dataset_trn: pd.DataFrame,
Expand Down Expand Up @@ -88,7 +104,8 @@ def model_evaluator(
dataset_tst[target],
)
logger.info(f"Test accuracy={tst_acc*100:.2f}%")
mlflow.log_metric("testing_accuracy_score", tst_acc)
if mlflow_enabled:
mlflow.log_metric("testing_accuracy_score", tst_acc)

messages = []
if trn_acc < min_train_accuracy:
Expand Down
Loading
Loading