From 5570c0c70ed889d1e4eeb362c88aa98b8229f152 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 12:39:35 +0100 Subject: [PATCH 01/26] Create Sagemaker pipeline schedules if specified --- .../orchestrators/sagemaker_orchestrator.py | 146 ++++++++++++++---- 1 file changed, 112 insertions(+), 34 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index f832647a97..6564414df4 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -15,6 +15,7 @@ import os import re +from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -245,12 +246,8 @@ def prepare_or_run_pipeline( Yields: A dictionary of metadata related to the pipeline run. """ - if deployment.schedule: - logger.warning( - "The Sagemaker Orchestrator currently does not support the " - "use of schedules. The `schedule` will be ignored " - "and the pipeline will be run immediately." - ) + # Get the session and client + session = self._get_sagemaker_session() # sagemaker requires pipelineName to use alphanum and hyphens only unsanitized_orchestrator_run_name = get_orchestrator_run_name( @@ -459,7 +456,7 @@ def prepare_or_run_pipeline( sagemaker_steps.append(sagemaker_step) - # construct the pipeline from the sagemaker_steps + # Create the pipeline pipeline = Pipeline( name=orchestrator_run_name, steps=sagemaker_steps, @@ -479,38 +476,119 @@ def prepare_or_run_pipeline( if settings.pipeline_tags else None, ) - execution = pipeline.start() - logger.warning( - "Steps can take 5-15 minutes to start running " - "when using the Sagemaker Orchestrator." - ) - # Yield metadata based on the generated execution object - yield from self.compute_metadata( - execution=execution, settings=settings - ) + # Handle scheduling if specified + if deployment.schedule: + if settings.synchronous: + logger.warning( + "The 'synchronous' setting is ignored for scheduled pipelines since " + "they run independently of the deployment process." + ) - # mainly for testing purposes, we wait for the pipeline to finish - if settings.synchronous: - logger.info( - "Executing synchronously. Waiting for pipeline to finish... \n" - "At this point you can `Ctrl-C` out without cancelling the " - "execution." + events_client = session.boto_session.client("events") + rule_name = f"zenml-{deployment.pipeline_configuration.name}" + + # Determine first execution time based on schedule type + if deployment.schedule.cron_expression: + schedule_expr = f"cron({deployment.schedule.cron_expression})" + next_execution = ( + None # Exact time calculation would require cron parsing + ) + elif deployment.schedule.interval_second: + minutes = ( + deployment.schedule.interval_second.total_seconds() / 60 + ) + schedule_expr = f"rate({int(minutes)} minutes)" + next_execution = ( + datetime.utcnow() + deployment.schedule.interval_second + ) + elif deployment.schedule.run_once_start_time: + schedule_expr = f"at({deployment.schedule.run_once_start_time.strftime('%Y-%m-%dT%H:%M:%S')})" + next_execution = deployment.schedule.run_once_start_time + + events_client.put_rule( + Name=rule_name, + ScheduleExpression=schedule_expr, + State="ENABLED" + ) + + # Add the SageMaker pipeline as target + events_client.put_targets( + Rule=rule_name, + Targets=[ + { + "Id": f"zenml-target-{deployment.pipeline_configuration.name}", + "Arn": f"arn:aws:sagemaker:{session.boto_region_name}:{session.boto_session.client('sts').get_caller_identity()['Account']}:pipeline/{orchestrator_run_name}", + "RoleArn": self.config.execution_role, + } + ], ) - try: - execution.wait( - delay=POLLING_DELAY, max_attempts=MAX_POLLING_ATTEMPTS + + logger.info( + f"Successfully scheduled pipeline with rule: {rule_name}\n" + f"Schedule type: {schedule_expr}\n" + + ( + f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} UTC" + if next_execution + else f"Using cron expression: {deployment.schedule.cron_expression}" ) - logger.info("Pipeline completed successfully.") - except WaiterError: - raise RuntimeError( - "Timed out while waiting for pipeline execution to " - "finish. For long-running pipelines we recommend " - "configuring your orchestrator for asynchronous execution. " - "The following command does this for you: \n" - f"`zenml orchestrator update {self.name} " - f"--synchronous=False`" + + ( + f" (and every {int(minutes)} minutes after)" + if deployment.schedule.interval_second + else "" ) + ) + + # Yield metadata about the schedule + yield { + "schedule_rule_name": rule_name, + "schedule_type": ( + "cron" + if deployment.schedule.cron_expression + else "rate" + if deployment.schedule.interval_second + else "one-time" + ), + "schedule_expression": schedule_expr, + "pipeline_name": orchestrator_run_name, + "next_execution_time": next_execution.isoformat() + if next_execution + else None, + } + else: + # Execute the pipeline immediately if no schedule is specified + execution = pipeline.start() + logger.warning( + "Steps can take 5-15 minutes to start running " + "when using the Sagemaker Orchestrator." + ) + + # Yield metadata based on the generated execution object + yield from self.compute_metadata( + execution=execution, settings=settings + ) + + # mainly for testing purposes, we wait for the pipeline to finish + if settings.synchronous: + logger.info( + "Executing synchronously. Waiting for pipeline to finish... \n" + "At this point you can `Ctrl-C` out without cancelling the " + "execution." + ) + try: + execution.wait( + delay=POLLING_DELAY, max_attempts=MAX_POLLING_ATTEMPTS + ) + logger.info("Pipeline completed successfully.") + except WaiterError: + raise RuntimeError( + "Timed out while waiting for pipeline execution to " + "finish. For long-running pipelines we recommend " + "configuring your orchestrator for asynchronous execution. " + "The following command does this for you: \n" + f"`zenml orchestrator update {self.name} " + f"--synchronous=False`" + ) def get_pipeline_run_metadata( self, run_id: UUID From 6f7bf288a177fa1bf070f0abf65956e1086741a3 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 13:31:33 +0100 Subject: [PATCH 02/26] Add property to check if orchestrator is schedulable --- .../flavors/sagemaker_orchestrator_flavor.py | 9 +++++++ .../orchestrators/sagemaker_orchestrator.py | 26 ++++++++++--------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py b/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py index 2898f24cc6..ea17571d77 100644 --- a/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py +++ b/src/zenml/integrations/aws/flavors/sagemaker_orchestrator_flavor.py @@ -132,6 +132,15 @@ class SagemakerOrchestratorSettings(BaseSettings): ("processor_role", "execution_role"), ("processor_tags", "tags") ) + @property + def is_schedulable(self) -> bool: + """Whether the orchestrator is schedulable or not. + + Returns: + Whether the orchestrator is schedulable or not. + """ + return True + @model_validator(mode="before") def validate_model(cls, data: Dict[str, Any]) -> Dict[str, Any]: """Check if model is configured correctly. diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 6564414df4..2d75dfe8b4 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -490,22 +490,24 @@ def prepare_or_run_pipeline( # Determine first execution time based on schedule type if deployment.schedule.cron_expression: - schedule_expr = f"cron({deployment.schedule.cron_expression})" - next_execution = ( - None # Exact time calculation would require cron parsing - ) + # AWS EventBridge requires cron expressions in format: cron(0 12 * * ? *) + # Strip any "cron(" prefix if it exists + cron_exp = deployment.schedule.cron_expression.replace("cron(", "").replace(")", "") + schedule_expr = f"cron({cron_exp})" + next_execution = None elif deployment.schedule.interval_second: - minutes = ( - deployment.schedule.interval_second.total_seconds() / 60 - ) - schedule_expr = f"rate({int(minutes)} minutes)" - next_execution = ( - datetime.utcnow() + deployment.schedule.interval_second - ) + minutes = max(1, int(deployment.schedule.interval_second.total_seconds() / 60)) + schedule_expr = f"rate({minutes} minutes)" + next_execution = datetime.utcnow() + deployment.schedule.interval_second elif deployment.schedule.run_once_start_time: - schedule_expr = f"at({deployment.schedule.run_once_start_time.strftime('%Y-%m-%dT%H:%M:%S')})" + # Format for specific date/time: cron(Minutes Hours Day-of-month Month ? Year) + # Example: cron(0 12 1 1 ? 2024) + dt = deployment.schedule.run_once_start_time + schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" next_execution = deployment.schedule.run_once_start_time + logger.info(f"Creating EventBridge rule with schedule expression: {schedule_expr}") + events_client.put_rule( Name=rule_name, ScheduleExpression=schedule_expr, From 0212388d2db16523350abdd2d868bbfac898b15a Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 13:54:05 +0100 Subject: [PATCH 03/26] Add EventBridge rule for SageMaker pipeline execution --- .../orchestrators/sagemaker_orchestrator.py | 94 +++++++++++++++++-- 1 file changed, 88 insertions(+), 6 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 2d75dfe8b4..b0f2cef247 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -13,6 +13,7 @@ # permissions and limitations under the License. """Implementation of the SageMaker orchestrator.""" +import json import os import re from datetime import datetime @@ -492,13 +493,92 @@ def prepare_or_run_pipeline( if deployment.schedule.cron_expression: # AWS EventBridge requires cron expressions in format: cron(0 12 * * ? *) # Strip any "cron(" prefix if it exists - cron_exp = deployment.schedule.cron_expression.replace("cron(", "").replace(")", "") + cron_exp = deployment.schedule.cron_expression.replace( + "cron(", "" + ).replace(")", "") schedule_expr = f"cron({cron_exp})" next_execution = None elif deployment.schedule.interval_second: - minutes = max(1, int(deployment.schedule.interval_second.total_seconds() / 60)) + minutes = max( + 1, + int( + deployment.schedule.interval_second.total_seconds() + / 60 + ), + ) + schedule_expr = f"rate({minutes} minutes)" + next_execution = ( + datetime.utcnow() + deployment.schedule.interval_second + ) + elif deployment.schedule.run_once_start_time: + # Format for specific date/time: cron(Minutes Hours Day-of-month Month ? Year) + # Example: cron(0 12 1 1 ? 2024) + dt = deployment.schedule.run_once_start_time + schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" + next_execution = deployment.schedule.run_once_start_time + + logger.info( + f"Creating EventBridge rule with schedule expression: {schedule_expr}" + ) + + # Create IAM policy for EventBridge to trigger SageMaker pipeline + iam_client = session.boto_session.client("iam") + + # Create the policy document + policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["sagemaker:StartPipelineExecution"], + "Resource": f"arn:aws:sagemaker:{session.boto_region_name}:{session.boto_session.client('sts').get_caller_identity()['Account']}:pipeline/{orchestrator_run_name}", + } + ], + } + + # Create or update the role policy + try: + role_name = self.config.execution_role.split("/")[ + -1 + ] # Extract role name from ARN + policy_name = f"zenml-eventbridge-{orchestrator_run_name}" + + iam_client.put_role_policy( + RoleName=role_name, + PolicyName=policy_name, + PolicyDocument=json.dumps(policy_document), + ) + + logger.info(f"Created/Updated IAM policy: {policy_name}") + except Exception as e: + logger.error(f"Failed to create/update IAM policy: {e}") + raise + + # Create the EventBridge rule + events_client = session.boto_session.client("events") + rule_name = f"zenml-{deployment.pipeline_configuration.name}" + + # Determine first execution time based on schedule type + if deployment.schedule.cron_expression: + # AWS EventBridge requires cron expressions in format: cron(0 12 * * ? *) + # Strip any "cron(" prefix if it exists + cron_exp = deployment.schedule.cron_expression.replace( + "cron(", "" + ).replace(")", "") + schedule_expr = f"cron({cron_exp})" + next_execution = None + elif deployment.schedule.interval_second: + minutes = max( + 1, + int( + deployment.schedule.interval_second.total_seconds() + / 60 + ), + ) schedule_expr = f"rate({minutes} minutes)" - next_execution = datetime.utcnow() + deployment.schedule.interval_second + next_execution = ( + datetime.utcnow() + deployment.schedule.interval_second + ) elif deployment.schedule.run_once_start_time: # Format for specific date/time: cron(Minutes Hours Day-of-month Month ? Year) # Example: cron(0 12 1 1 ? 2024) @@ -506,15 +586,17 @@ def prepare_or_run_pipeline( schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" next_execution = deployment.schedule.run_once_start_time - logger.info(f"Creating EventBridge rule with schedule expression: {schedule_expr}") + logger.info( + f"Creating EventBridge rule with schedule expression: {schedule_expr}" + ) events_client.put_rule( Name=rule_name, ScheduleExpression=schedule_expr, - State="ENABLED" + State="ENABLED", ) - # Add the SageMaker pipeline as target + # Add the SageMaker pipeline as target with the role events_client.put_targets( Rule=rule_name, Targets=[ From 21253127c5abc668594e04ea6d109a9a32228767 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 15:05:35 +0100 Subject: [PATCH 04/26] Update IAM policy and trust relationship for EventBridge --- .../orchestrators/sagemaker_orchestrator.py | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index b0f2cef247..d0bd3a1fb0 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -521,10 +521,13 @@ def prepare_or_run_pipeline( f"Creating EventBridge rule with schedule expression: {schedule_expr}" ) - # Create IAM policy for EventBridge to trigger SageMaker pipeline + # Create IAM policy and trust relationship for EventBridge iam_client = session.boto_session.client("iam") + role_name = self.config.execution_role.split("/")[ + -1 + ] # Extract role name from ARN - # Create the policy document + # Create the policy document (existing) policy_document = { "Version": "2012-10-17", "Statement": [ @@ -536,22 +539,41 @@ def prepare_or_run_pipeline( ], } - # Create or update the role policy + # Create the trust relationship document + trust_relationship = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "events.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + try: - role_name = self.config.execution_role.split("/")[ - -1 - ] # Extract role name from ARN + # Update the role policy (existing) policy_name = f"zenml-eventbridge-{orchestrator_run_name}" - iam_client.put_role_policy( RoleName=role_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document), ) - logger.info(f"Created/Updated IAM policy: {policy_name}") + + # Update the trust relationship + iam_client.update_assume_role_policy( + RoleName=role_name, + PolicyDocument=json.dumps(trust_relationship), + ) + logger.info( + f"Updated trust relationship for role: {role_name}" + ) + except Exception as e: - logger.error(f"Failed to create/update IAM policy: {e}") + logger.error( + f"Failed to update IAM policy or trust relationship: {e}" + ) raise # Create the EventBridge rule From 966f71223582373f648ca75c77c921c11bd41e9f Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 15:10:49 +0100 Subject: [PATCH 05/26] Refactor schedule metadata generation for Sagemaker orchestrator --- .../orchestrators/sagemaker_orchestrator.py | 62 ++++++++++++++----- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index d0bd3a1fb0..7c864770a8 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -646,21 +646,21 @@ def prepare_or_run_pipeline( ) # Yield metadata about the schedule - yield { - "schedule_rule_name": rule_name, - "schedule_type": ( - "cron" - if deployment.schedule.cron_expression - else "rate" - if deployment.schedule.interval_second - else "one-time" - ), - "schedule_expression": schedule_expr, - "pipeline_name": orchestrator_run_name, - "next_execution_time": next_execution.isoformat() - if next_execution - else None, - } + schedule_type = ( + "cron" + if deployment.schedule.cron_expression + else "rate" + if deployment.schedule.interval_second + else "one-time" + ) + + yield self.compute_schedule_metadata( + rule_name=rule_name, + schedule_expr=schedule_expr, + pipeline_name=orchestrator_run_name, + next_execution=next_execution, + schedule_type=schedule_type, + ) else: # Execute the pipeline immediately if no schedule is specified execution = pipeline.start() @@ -896,3 +896,35 @@ def _compute_orchestrator_run_id( f"There was an issue while extracting the pipeline run ID: {e}" ) return None + + def compute_schedule_metadata( + self, + rule_name: str, + schedule_expr: str, + pipeline_name: str, + next_execution: Optional[datetime], + schedule_type: str, + ) -> Dict[str, MetadataType]: + """Generate metadata for scheduled pipeline executions. + + Args: + rule_name: The name of the EventBridge rule + schedule_expr: The schedule expression (cron or rate) + pipeline_name: Name of the SageMaker pipeline + next_execution: Next scheduled execution time + schedule_type: Type of schedule (cron/rate/one-time) + + Returns: + A dictionary of metadata related to the schedule. + """ + metadata: Dict[str, MetadataType] = { + "schedule_rule_name": rule_name, + "schedule_type": schedule_type, + "schedule_expression": schedule_expr, + "pipeline_name": pipeline_name, + } + + if next_execution: + metadata["next_execution_time"] = next_execution.isoformat() + + return metadata From dd7110c9fa3d3e7866a956facdb51a41b79720c7 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 15:17:54 +0100 Subject: [PATCH 06/26] Add scheduling support for SageMaker orchestrator --- .../component-guide/orchestrators/custom.md | 2 +- .../orchestrators/sagemaker.md | 100 +++++++++++++++++- .../build-pipelines/schedule-a-pipeline.md | 2 +- 3 files changed, 97 insertions(+), 7 deletions(-) diff --git a/docs/book/component-guide/orchestrators/custom.md b/docs/book/component-guide/orchestrators/custom.md index 539aecdd6b..0aa68ac8b0 100644 --- a/docs/book/component-guide/orchestrators/custom.md +++ b/docs/book/component-guide/orchestrators/custom.md @@ -141,7 +141,7 @@ To see a full end-to-end worked example of a custom orchestrator, [see here](htt There are some additional optional features that your orchestrator can implement: -* **Running pipelines on a schedule**: if your orchestrator supports running pipelines on a schedule, make sure to handle `deployment.schedule` if it exists. If your orchestrator does not support schedules, you should either log a warning and or even raise an exception in case the user tries to schedule a pipeline. +* **Running pipelines on a schedule**: if your orchestrator supports running pipelines on a schedule, make sure to handle `deployment.schedule` if it exists. If your orchestrator ules, you should either log a warning and or even raise an exception in case the user tries to schedule a pipeline. * **Specifying hardware resources**: If your orchestrator supports setting resources like CPUs, GPUs or memory for the pipeline or specific steps, make sure to handle the values defined in `step.config.resource_settings`. See the code sample below for additional helper methods to check whether any resources are required from your orchestrator. ### Code sample diff --git a/docs/book/component-guide/orchestrators/sagemaker.md b/docs/book/component-guide/orchestrators/sagemaker.md index 6464333934..74dfcdc335 100644 --- a/docs/book/component-guide/orchestrators/sagemaker.md +++ b/docs/book/component-guide/orchestrators/sagemaker.md @@ -153,10 +153,6 @@ Alternatively, for a more detailed view of log messages during SageMaker pipelin ![SageMaker CloudWatch Logs](../../.gitbook/assets/sagemaker-cloudwatch-logs.png) -### Run pipelines on a schedule - -The ZenML Sagemaker orchestrator doesn't currently support running pipelines on a schedule. We maintain a public roadmap for ZenML, which you can find [here](https://zenml.io/roadmap). We welcome community contributions (see more [here](https://github.com/zenml-io/zenml/blob/main/CONTRIBUTING.md)) so if you want to enable scheduling for Sagemaker, please [do let us know](https://zenml.io/slack)! - ### Configuration at pipeline or step level When running your ZenML pipeline with the Sagemaker orchestrator, the configuration set when configuring the orchestrator as a ZenML component will be used by default. However, it is possible to provide additional configuration at the pipeline or step level. This allows you to run whole pipelines or individual steps with alternative configurations. For example, this allows you to run the training process with a heavier, GPU-enabled instance type, while running other steps with lighter instances. @@ -339,4 +335,98 @@ This approach allows for more granular tagging, giving you flexibility in how yo Note that if you wish to use this orchestrator to run steps on a GPU, you will need to follow [the instructions on this page](../../how-to/pipeline-development/training-with-gpus/README.md) to ensure that it works. It requires adding some extra settings customization and is essential to enable CUDA for the GPU to give its full acceleration. -
ZenML Scarf
+### Scheduling Pipelines + +The SageMaker orchestrator supports running pipelines on a schedule using AWS EventBridge. You can configure schedules in three ways: + +* Using a cron expression +* Using a fixed interval +* Running once at a specific time + +```python +from zenml import pipeline +from datetime import datetime, timedelta + +# Using a cron expression (runs daily at 2 AM UTC) +@pipeline(schedule=Schedule(cron_expression="0 2 * * *")) +def my_scheduled_pipeline(): + # Your pipeline steps here + pass + +# Using an interval (runs every 2 hours) +@pipeline(schedule=Schedule(interval_second=timedelta(hours=2))) +def my_interval_pipeline(): + # Your pipeline steps here + pass + +# Running once at a specific time +@pipeline(schedule=Schedule(run_once_start_time=datetime(2024, 12, 31, 23, 59))) +def my_one_time_pipeline(): + # Your pipeline steps here + pass +``` + +When you deploy a scheduled pipeline, ZenML will: +1. Create an EventBridge rule with the specified schedule +2. Configure the necessary IAM permissions +3. Set up the SageMaker pipeline as the target + +#### Required IAM Permissions + +When using scheduled pipelines, you need to ensure your IAM role has the correct permissions and trust relationships: + +1. Trust Relationships +Your execution role needs to trust both SageMaker and EventBridge services. Add this trust relationship to your role: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": [ + "sagemaker.amazonaws.com", + "events.amazonaws.com" + ] + }, + "Action": "sts:AssumeRole" + } + ] +} +``` + +2. Required IAM Policies +In addition to the basic SageMaker permissions, you'll need: + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "events:PutRule", + "events:PutTargets", + "events:DeleteRule", + "events:RemoveTargets", + "events:DescribeRule", + "events:ListTargetsByRule" + ], + "Resource": "arn:aws:events:*:*:rule/zenml-*" + }, + { + "Effect": "Allow", + "Action": [ + "iam:GetRole", + "iam:GetRolePolicy", + "iam:PutRolePolicy", + "iam:UpdateAssumeRolePolicy" + ], + "Resource": "arn:aws:iam::*:role/*" + } + ] +} +``` + +
ZenML Scarf
\ No newline at end of file diff --git a/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md b/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md index be725e386f..f922339393 100644 --- a/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md +++ b/docs/book/how-to/pipeline-development/build-pipelines/schedule-a-pipeline.md @@ -18,7 +18,7 @@ Schedules don't work for all orchestrators. Here is a list of all supported orch | [KubernetesOrchestrator](../../../component-guide/orchestrators/kubernetes.md) | ✅ | | [LocalOrchestrator](../../../component-guide/orchestrators/local.md) | ⛔️ | | [LocalDockerOrchestrator](../../../component-guide/orchestrators/local-docker.md) | ⛔️ | -| [SagemakerOrchestrator](../../../component-guide/orchestrators/sagemaker.md) | ⛔️ | +| [SagemakerOrchestrator](../../../component-guide/orchestrators/sagemaker.md) | ✅ | | [SkypilotAWSOrchestrator](../../../component-guide/orchestrators/skypilot-vm.md) | ⛔️ | | [SkypilotAzureOrchestrator](../../../component-guide/orchestrators/skypilot-vm.md) | ⛔️ | | [SkypilotGCPOrchestrator](../../../component-guide/orchestrators/skypilot-vm.md) | ⛔️ | From fcf934e67b5d3536b573965e177da66b0f63ff50 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 15:21:54 +0100 Subject: [PATCH 07/26] Remove trust relationship logic in Sagemaker orchestrator --- .../orchestrators/sagemaker_orchestrator.py | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 7c864770a8..0efb112fe7 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -539,18 +539,6 @@ def prepare_or_run_pipeline( ], } - # Create the trust relationship document - trust_relationship = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": {"Service": "events.amazonaws.com"}, - "Action": "sts:AssumeRole", - } - ], - } - try: # Update the role policy (existing) policy_name = f"zenml-eventbridge-{orchestrator_run_name}" @@ -561,15 +549,6 @@ def prepare_or_run_pipeline( ) logger.info(f"Created/Updated IAM policy: {policy_name}") - # Update the trust relationship - iam_client.update_assume_role_policy( - RoleName=role_name, - PolicyDocument=json.dumps(trust_relationship), - ) - logger.info( - f"Updated trust relationship for role: {role_name}" - ) - except Exception as e: logger.error( f"Failed to update IAM policy or trust relationship: {e}" From 358b3e811c22c1ecec51cd7550afd278d63096f6 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 15:25:34 +0100 Subject: [PATCH 08/26] Handle unsupported schedule in custom orchestrator --- docs/book/component-guide/orchestrators/custom.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/book/component-guide/orchestrators/custom.md b/docs/book/component-guide/orchestrators/custom.md index 0aa68ac8b0..539aecdd6b 100644 --- a/docs/book/component-guide/orchestrators/custom.md +++ b/docs/book/component-guide/orchestrators/custom.md @@ -141,7 +141,7 @@ To see a full end-to-end worked example of a custom orchestrator, [see here](htt There are some additional optional features that your orchestrator can implement: -* **Running pipelines on a schedule**: if your orchestrator supports running pipelines on a schedule, make sure to handle `deployment.schedule` if it exists. If your orchestrator ules, you should either log a warning and or even raise an exception in case the user tries to schedule a pipeline. +* **Running pipelines on a schedule**: if your orchestrator supports running pipelines on a schedule, make sure to handle `deployment.schedule` if it exists. If your orchestrator does not support schedules, you should either log a warning and or even raise an exception in case the user tries to schedule a pipeline. * **Specifying hardware resources**: If your orchestrator supports setting resources like CPUs, GPUs or memory for the pipeline or specific steps, make sure to handle the values defined in `step.config.resource_settings`. See the code sample below for additional helper methods to check whether any resources are required from your orchestrator. ### Code sample From 7ce08b63d2418e8311db475b857f7a5664d64751 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 16:40:16 +0100 Subject: [PATCH 09/26] Refactor yield statement to use 'yield from' syntax --- .../integrations/aws/orchestrators/sagemaker_orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 0efb112fe7..72bfa14c9e 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -633,7 +633,7 @@ def prepare_or_run_pipeline( else "one-time" ) - yield self.compute_schedule_metadata( + yield from self.compute_schedule_metadata( rule_name=rule_name, schedule_expr=schedule_expr, pipeline_name=orchestrator_run_name, From 72bdae1aababf3bb64eb6639128263b767aea33b Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 16:47:53 +0100 Subject: [PATCH 10/26] Ensure IAM permissions for scheduled SageMaker pipelines --- .../orchestrators/sagemaker.md | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/docs/book/component-guide/orchestrators/sagemaker.md b/docs/book/component-guide/orchestrators/sagemaker.md index 74dfcdc335..ecac6529c8 100644 --- a/docs/book/component-guide/orchestrators/sagemaker.md +++ b/docs/book/component-guide/orchestrators/sagemaker.md @@ -373,10 +373,10 @@ When you deploy a scheduled pipeline, ZenML will: #### Required IAM Permissions -When using scheduled pipelines, you need to ensure your IAM role has the correct permissions and trust relationships: +When using scheduled pipelines, you need to ensure your IAM role has the correct permissions and trust relationships. Here's a detailed breakdown of why each permission is needed: -1. Trust Relationships -Your execution role needs to trust both SageMaker and EventBridge services. Add this trust relationship to your role: +1. **Trust Relationships** +Your execution role needs to trust both SageMaker and EventBridge services to allow them to assume the role: ```json { @@ -386,8 +386,8 @@ Your execution role needs to trust both SageMaker and EventBridge services. Add "Effect": "Allow", "Principal": { "Service": [ - "sagemaker.amazonaws.com", - "events.amazonaws.com" + "sagemaker.amazonaws.com", // Required for SageMaker execution + "events.amazonaws.com" // Required for EventBridge to trigger pipelines ] }, "Action": "sts:AssumeRole" @@ -396,8 +396,8 @@ Your execution role needs to trust both SageMaker and EventBridge services. Add } ``` -2. Required IAM Policies -In addition to the basic SageMaker permissions, you'll need: +2. **Required IAM Policies** +In addition to the basic SageMaker permissions, the AWS credentials used by the service connector (or provided directly to the orchestrator) need the following permissions to create and manage scheduled pipelines: ```json { @@ -406,22 +406,22 @@ In addition to the basic SageMaker permissions, you'll need: { "Effect": "Allow", "Action": [ - "events:PutRule", - "events:PutTargets", - "events:DeleteRule", - "events:RemoveTargets", - "events:DescribeRule", - "events:ListTargetsByRule" + "events:PutRule", // Required to create schedule rules + "events:PutTargets", // Required to set pipeline as target + "events:DeleteRule", // Required for cleanup + "events:RemoveTargets", // Required for cleanup + "events:DescribeRule", // Required to verify rule creation + "events:ListTargetsByRule" // Required to verify target setup ], "Resource": "arn:aws:events:*:*:rule/zenml-*" }, { "Effect": "Allow", "Action": [ - "iam:GetRole", - "iam:GetRolePolicy", - "iam:PutRolePolicy", - "iam:UpdateAssumeRolePolicy" + "iam:GetRole", // Required to verify role exists + "iam:GetRolePolicy", // Required to check existing policies + "iam:PutRolePolicy", // Required to add new policies + "iam:UpdateAssumeRolePolicy" // Required to update trust relationships ], "Resource": "arn:aws:iam::*:role/*" } @@ -429,4 +429,12 @@ In addition to the basic SageMaker permissions, you'll need: } ``` +These permissions enable: +* Creation and management of EventBridge rules for scheduling +* Setting up trust relationships between services +* Managing IAM policies required for the scheduled execution +* Cleanup of resources when schedules are removed + +Without these permissions, the scheduling functionality will fail with access denied errors. +
ZenML Scarf
\ No newline at end of file From abf46101d4de831931002e2cbbbf87237efb7cc9 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 16:48:00 +0100 Subject: [PATCH 11/26] Update authentication instructions for SageMaker orchestrator --- docs/book/component-guide/orchestrators/sagemaker.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/book/component-guide/orchestrators/sagemaker.md b/docs/book/component-guide/orchestrators/sagemaker.md index ecac6529c8..2da138af2d 100644 --- a/docs/book/component-guide/orchestrators/sagemaker.md +++ b/docs/book/component-guide/orchestrators/sagemaker.md @@ -59,7 +59,7 @@ There are three ways you can authenticate your orchestrator and link it to the I {% tabs %} {% tab title="Authentication via Service Connector" %} -The recommended way to authenticate your SageMaker orchestrator is by registering an [AWS Service Connector](../../how-to/infrastructure-deployment/auth-management/aws-service-connector.md) and connecting it to your SageMaker orchestrator: +The recommended way to authenticate your SageMaker orchestrator is by registering an [AWS Service Connector](../../how-to/infrastructure-deployment/auth-management/aws-service-connector.md) and connecting it to your SageMaker orchestrator. If you plan to use scheduled pipelines, ensure the credentials used by the service connector have the necessary EventBridge and IAM permissions listed in the [Required IAM Permissions](#required-iam-permissions) section: ```shell zenml service-connector register --type aws -i @@ -72,7 +72,7 @@ zenml stack register -o ... --set {% endtab %} {% tab title="Explicit Authentication" %} -Instead of creating a service connector, you can also configure your AWS authentication credentials directly in the orchestrator: +Instead of creating a service connector, you can also configure your AWS authentication credentials directly in the orchestrator. If you plan to use scheduled pipelines, ensure these credentials have the necessary EventBridge and IAM permissions listed in the [Required IAM Permissions](#required-iam-permissions) section: ```shell zenml orchestrator register \ @@ -88,7 +88,7 @@ See the [`SagemakerOrchestratorConfig` SDK Docs](https://sdkdocs.zenml.io/latest {% endtab %} {% tab title="Implicit Authentication" %} -If you neither connect your orchestrator to a service connector nor configure credentials explicitly, ZenML will try to implicitly authenticate to AWS via the `default` profile in your local [AWS configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). +If you neither connect your orchestrator to a service connector nor configure credentials explicitly, ZenML will try to implicitly authenticate to AWS via the `default` profile in your local [AWS configuration file](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). If you plan to use scheduled pipelines, ensure this profile has the necessary EventBridge and IAM permissions listed in the [Required IAM Permissions](#required-iam-permissions) section: ```shell zenml orchestrator register \ From 67705c261eeae0789a516d39066a7605ea46ad63 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 21:35:39 +0100 Subject: [PATCH 12/26] Refactor Sagemaker orchestrator metadata handling --- .../orchestrators/sagemaker_orchestrator.py | 142 ++++++++++++------ 1 file changed, 99 insertions(+), 43 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 72bfa14c9e..1abae0806d 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -521,7 +521,7 @@ def prepare_or_run_pipeline( f"Creating EventBridge rule with schedule expression: {schedule_expr}" ) - # Create IAM policy and trust relationship for EventBridge + # Create IAM policy for EventBridge iam_client = session.boto_session.client("iam") role_name = self.config.execution_role.split("/")[ -1 @@ -633,12 +633,17 @@ def prepare_or_run_pipeline( else "one-time" ) - yield from self.compute_schedule_metadata( - rule_name=rule_name, - schedule_expr=schedule_expr, - pipeline_name=orchestrator_run_name, - next_execution=next_execution, - schedule_type=schedule_type, + schedule_metadata = { + "rule_name": rule_name, + "schedule_type": schedule_type, + "schedule_expr": schedule_expr, + "pipeline_name": orchestrator_run_name, + "next_execution": next_execution, + } + + yield from self.compute_metadata( + execution=schedule_metadata, + settings=settings, ) else: # Execute the pipeline immediately if no schedule is specified @@ -757,7 +762,7 @@ def compute_metadata( """Generate run metadata based on the generated Sagemaker Execution. Args: - execution: The corresponding _PipelineExecution object. + execution: The corresponding _PipelineExecution object or schedule metadata dict. settings: The Sagemaker orchestrator settings. Yields: @@ -766,19 +771,40 @@ def compute_metadata( # Metadata metadata: Dict[str, MetadataType] = {} - # Orchestrator Run ID - if run_id := self._compute_orchestrator_run_id(execution): - metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id + # Handle schedule metadata if execution is a dict + if isinstance(execution, dict): + metadata.update( + { + "schedule_rule_name": execution["rule_name"], + "schedule_type": execution["schedule_type"], + "schedule_expression": execution["schedule_expr"], + "pipeline_name": execution["pipeline_name"], + } + ) + + if next_execution := execution.get("next_execution"): + metadata["next_execution_time"] = next_execution.isoformat() + + # Add orchestrator metadata using the same pattern as execution metadata + if orchestrator_url := self._compute_schedule_url(execution): + metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url) - # URL to the Sagemaker's pipeline view - if orchestrator_url := self._compute_orchestrator_url(execution): - metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url) + if logs_url := self._compute_schedule_logs_url( + execution, settings + ): + metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url) + else: + # Handle execution metadata + if run_id := self._compute_orchestrator_run_id(execution): + metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id + + if orchestrator_url := self._compute_orchestrator_url(execution): + metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url) - # URL to the corresponding CloudWatch page - if logs_url := self._compute_orchestrator_logs_url( - execution, settings - ): - metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url) + if logs_url := self._compute_orchestrator_logs_url( + execution, settings + ): + metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url) yield metadata @@ -876,34 +902,64 @@ def _compute_orchestrator_run_id( ) return None - def compute_schedule_metadata( - self, - rule_name: str, - schedule_expr: str, - pipeline_name: str, - next_execution: Optional[datetime], - schedule_type: str, - ) -> Dict[str, MetadataType]: - """Generate metadata for scheduled pipeline executions. + @staticmethod + def _compute_schedule_url(schedule_info: Dict[str, Any]) -> Optional[str]: + """Generate the SageMaker Console URL for a scheduled pipeline. Args: - rule_name: The name of the EventBridge rule - schedule_expr: The schedule expression (cron or rate) - pipeline_name: Name of the SageMaker pipeline - next_execution: Next scheduled execution time - schedule_type: Type of schedule (cron/rate/one-time) + schedule_info: Dictionary containing schedule information. Returns: - A dictionary of metadata related to the schedule. + The URL to the pipeline in the SageMaker console. """ - metadata: Dict[str, MetadataType] = { - "schedule_rule_name": rule_name, - "schedule_type": schedule_type, - "schedule_expression": schedule_expr, - "pipeline_name": pipeline_name, - } + try: + # Get the Sagemaker session + session = boto3.Session(region_name=schedule_info["region"]) + sagemaker_client = session.client("sagemaker") + + # List the Studio domains and get the Studio Domain ID + domains_response = sagemaker_client.list_domains() + studio_domain_id = domains_response["Domains"][0]["DomainId"] + + return ( + f"https://studio-{studio_domain_id}.studio.{schedule_info['region']}." + f"sagemaker.aws/pipelines/view/{schedule_info['pipeline_name']}" + ) + except Exception as e: + logger.warning( + f"There was an issue while extracting the pipeline url: {e}" + ) + return None + + @staticmethod + def _compute_schedule_logs_url( + schedule_info: Dict[str, Any], + settings: SagemakerOrchestratorSettings, + ) -> Optional[str]: + """Generate the CloudWatch URL for a scheduled pipeline. - if next_execution: - metadata["next_execution_time"] = next_execution.isoformat() + Args: + schedule_info: Dictionary containing schedule information. + settings: The Sagemaker orchestrator settings. - return metadata + Returns: + The URL to query the pipeline logs in CloudWatch. + """ + try: + use_training_jobs = True + if settings.use_training_step is not None: + use_training_jobs = settings.use_training_step + + job_type = "Training" if use_training_jobs else "Processing" + + return ( + f"https://{schedule_info['region']}.console.aws.amazon.com/" + f"cloudwatch/home?region={schedule_info['region']}#logsV2:" + f"log-groups/log-group/$252Faws$252Fsagemaker$252F{job_type}Jobs" + f"$3FlogStreamNameFilter$3Dpipelines-" + ) + except Exception as e: + logger.warning( + f"There was an issue while extracting the logs url: {e}" + ) + return None From d190f31a3056173b5716199d46f3ea457035f8a0 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 21:43:33 +0100 Subject: [PATCH 13/26] Add unit tests for SageMaker orchestrator metadata --- .../test_sagemaker_orchestrator.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py index 788dcb0506..8a92768cba 100644 --- a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py +++ b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py @@ -13,8 +13,22 @@ # permissions and limitations under the License. +from datetime import datetime, timedelta +from unittest.mock import MagicMock, patch + +from zenml.constants import ( + METADATA_ORCHESTRATOR_LOGS_URL, + METADATA_ORCHESTRATOR_RUN_ID, + METADATA_ORCHESTRATOR_URL, +) from zenml.enums import StackComponentType from zenml.integrations.aws.flavors import SagemakerOrchestratorFlavor +from zenml.integrations.aws.flavors.sagemaker_orchestrator_flavor import ( + SagemakerOrchestratorSettings, +) +from zenml.integrations.aws.orchestrators.sagemaker_orchestrator import ( + SagemakerOrchestrator, +) def test_sagemaker_orchestrator_flavor_attributes(): @@ -23,3 +37,110 @@ def test_sagemaker_orchestrator_flavor_attributes(): flavor = SagemakerOrchestratorFlavor() assert flavor.type == StackComponentType.ORCHESTRATOR assert flavor.name == "sagemaker" + + +def test_compute_schedule_metadata(): + """Tests that schedule metadata is computed correctly.""" + # Setup + orchestrator = SagemakerOrchestrator( + name="test_orchestrator", + id="test-id", + config={}, + flavor="sagemaker", + type="orchestrator", + user="test-user", + workspace="test-workspace", + created="2023-01-01", + updated="2023-01-01", + ) + settings = SagemakerOrchestratorSettings() + + # Mock schedule info + next_execution = datetime.utcnow() + timedelta(hours=1) + schedule_info = { + "rule_name": "test-rule", + "schedule_type": "rate", + "schedule_expr": "rate(1 hour)", + "pipeline_name": "test-pipeline", + "next_execution": next_execution, + "region": "us-west-2", + "account_id": "123456789012", + } + + # Mock boto3 session and SageMaker client + mock_sagemaker_client = MagicMock() + mock_sagemaker_client.list_domains.return_value = { + "Domains": [{"DomainId": "d-test123"}] + } + + with patch("boto3.Session") as mock_session: + mock_session.return_value.client.return_value = mock_sagemaker_client + + # Get metadata + metadata = next( + orchestrator.compute_metadata( + execution=schedule_info, + settings=settings, + ) + ) + + # Verify schedule-specific metadata + assert metadata["schedule_rule_name"] == "test-rule" + assert metadata["schedule_type"] == "rate" + assert metadata["schedule_expression"] == "rate(1 hour)" + assert metadata["pipeline_name"] == "test-pipeline" + assert metadata["next_execution_time"] == next_execution.isoformat() + + # Verify orchestrator metadata + assert metadata[METADATA_ORCHESTRATOR_URL] == ( + "https://studio-d-test123.studio.us-west-2.sagemaker.aws/pipelines/view/test-pipeline" + ) + assert metadata[METADATA_ORCHESTRATOR_LOGS_URL].startswith( + "https://us-west-2.console.aws.amazon.com/cloudwatch/home" + ) + + +def test_compute_schedule_metadata_error_handling(): + """Tests error handling in schedule metadata computation.""" + orchestrator = SagemakerOrchestrator( + name="test_orchestrator", + id="test-id", + config={}, + flavor="sagemaker", + type="orchestrator", + user="test-user", + workspace="test-workspace", + created="2023-01-01", + updated="2023-01-01", + ) + settings = SagemakerOrchestratorSettings() + + # Invalid schedule info missing required fields + schedule_info = { + "rule_name": "test-rule", + "schedule_type": "rate", # Add minimum required fields + "schedule_expr": "rate(1 hour)", + "pipeline_name": "test-pipeline", + } + + with patch("boto3.Session") as mock_session: + mock_session.side_effect = Exception("Failed to create session") + + # Get metadata - should not raise exception + metadata = next( + orchestrator.compute_metadata( + execution=schedule_info, + settings=settings, + ) + ) + + # Basic metadata should still be present + assert metadata["schedule_rule_name"] == "test-rule" + assert metadata["schedule_type"] == "rate" + assert metadata["schedule_expression"] == "rate(1 hour)" + assert metadata["pipeline_name"] == "test-pipeline" + + # URLs should be None due to error + assert METADATA_ORCHESTRATOR_RUN_ID not in metadata + assert METADATA_ORCHESTRATOR_URL not in metadata + assert METADATA_ORCHESTRATOR_LOGS_URL not in metadata From f1cabc726cf307b1d346884973fcb910ba7085e4 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 21:45:04 +0100 Subject: [PATCH 14/26] Add exception handling for pipeline preparation errors --- .../integrations/aws/orchestrators/sagemaker_orchestrator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 1abae0806d..eb86affa36 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -243,6 +243,7 @@ def prepare_or_run_pipeline( `boto3.Session` object. TypeError: If the network_config passed is not compatible with the AWS SageMaker NetworkConfig class. + Exception: If there is an error during pipeline preparation or execution. Yields: A dictionary of metadata related to the pipeline run. From 726b47aa02f46b991bbe266c0dd0e8cda6e72ca2 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:06:00 +0100 Subject: [PATCH 15/26] Add timezone information to first execution message --- .../integrations/aws/orchestrators/sagemaker_orchestrator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index eb86affa36..69a8b4a830 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -614,7 +614,8 @@ def prepare_or_run_pipeline( f"Successfully scheduled pipeline with rule: {rule_name}\n" f"Schedule type: {schedule_expr}\n" + ( - f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} UTC" + f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} " + f"({next_execution.astimezone().tzinfo})" if next_execution else f"Using cron expression: {deployment.schedule.cron_expression}" ) From 80c8e8e422e5e2aa7573cd8f8b898cccccedc750 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:11:54 +0100 Subject: [PATCH 16/26] Add timezone support to AWS SageMaker orchestrator --- .../orchestrators/sagemaker_orchestrator.py | 46 +++++++++++++++---- .../test_sagemaker_orchestrator.py | 9 ++-- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 69a8b4a830..711c7a8156 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -16,7 +16,7 @@ import json import os import re -from datetime import datetime +from datetime import datetime, timezone from typing import ( TYPE_CHECKING, Any, @@ -512,14 +512,28 @@ def prepare_or_run_pipeline( datetime.utcnow() + deployment.schedule.interval_second ) elif deployment.schedule.run_once_start_time: - # Format for specific date/time: cron(Minutes Hours Day-of-month Month ? Year) - # Example: cron(0 12 1 1 ? 2024) - dt = deployment.schedule.run_once_start_time + # Convert local time to UTC for EventBridge + dt = deployment.schedule.run_once_start_time.astimezone( + timezone.utc + ) schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" next_execution = deployment.schedule.run_once_start_time logger.info( - f"Creating EventBridge rule with schedule expression: {schedule_expr}" + f"Creating EventBridge rule with schedule expression: {schedule_expr}\n" + f"Note: AWS EventBridge schedules are always executed in UTC timezone.\n" + + ( + f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} " + f"({next_execution.astimezone().tzinfo}) / " + f"{next_execution.astimezone(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} (UTC)" + if next_execution + else f"Using cron expression: {deployment.schedule.cron_expression}" + ) + + ( + f" (and every {int(minutes)} minutes after)" + if deployment.schedule.interval_second + else "" + ) ) # Create IAM policy for EventBridge @@ -582,14 +596,28 @@ def prepare_or_run_pipeline( datetime.utcnow() + deployment.schedule.interval_second ) elif deployment.schedule.run_once_start_time: - # Format for specific date/time: cron(Minutes Hours Day-of-month Month ? Year) - # Example: cron(0 12 1 1 ? 2024) - dt = deployment.schedule.run_once_start_time + # Convert local time to UTC for EventBridge + dt = deployment.schedule.run_once_start_time.astimezone( + timezone.utc + ) schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" next_execution = deployment.schedule.run_once_start_time logger.info( - f"Creating EventBridge rule with schedule expression: {schedule_expr}" + f"Creating EventBridge rule with schedule expression: {schedule_expr}\n" + f"Note: AWS EventBridge schedules are always executed in UTC timezone.\n" + + ( + f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} " + f"({next_execution.astimezone().tzinfo}) / " + f"{next_execution.astimezone(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} (UTC)" + if next_execution + else f"Using cron expression: {deployment.schedule.cron_expression}" + ) + + ( + f" (and every {int(minutes)} minutes after)" + if deployment.schedule.interval_second + else "" + ) ) events_client.put_rule( diff --git a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py index 8a92768cba..5cc06497a2 100644 --- a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py +++ b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py @@ -13,7 +13,7 @@ # permissions and limitations under the License. -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch from zenml.constants import ( @@ -55,8 +55,8 @@ def test_compute_schedule_metadata(): ) settings = SagemakerOrchestratorSettings() - # Mock schedule info - next_execution = datetime.utcnow() + timedelta(hours=1) + # Mock schedule info with timezone-aware datetime in UTC + next_execution = datetime.now(timezone.utc) + timedelta(hours=1) schedule_info = { "rule_name": "test-rule", "schedule_type": "rate", @@ -91,6 +91,9 @@ def test_compute_schedule_metadata(): assert metadata["pipeline_name"] == "test-pipeline" assert metadata["next_execution_time"] == next_execution.isoformat() + # Verify that boto3 Session was created with correct region + mock_session.assert_called_once_with(region_name="us-west-2") + # Verify orchestrator metadata assert metadata[METADATA_ORCHESTRATOR_URL] == ( "https://studio-d-test123.studio.us-west-2.sagemaker.aws/pipelines/view/test-pipeline" From da8fd35018f8607a8110ef769777183e9507e67b Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:19:31 +0100 Subject: [PATCH 17/26] Update error handling in SagemakerOrchestrator --- .../aws/orchestrators/sagemaker_orchestrator.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 711c7a8156..11ee935313 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -31,7 +31,7 @@ import boto3 import sagemaker -from botocore.exceptions import WaiterError +from botocore.exceptions import BotoCoreError, ClientError, WaiterError from sagemaker.network import NetworkConfig from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.execution_variables import ExecutionVariables @@ -243,7 +243,9 @@ def prepare_or_run_pipeline( `boto3.Session` object. TypeError: If the network_config passed is not compatible with the AWS SageMaker NetworkConfig class. - Exception: If there is an error during pipeline preparation or execution. + KeyError: If required fields are missing from schedule_info. + ClientError: If there's an AWS API error + BotoCoreError: If there's an error in the AWS SDK Yields: A dictionary of metadata related to the pipeline run. @@ -564,9 +566,15 @@ def prepare_or_run_pipeline( ) logger.info(f"Created/Updated IAM policy: {policy_name}") - except Exception as e: + except (ClientError, BotoCoreError) as e: logger.error( - f"Failed to update IAM policy or trust relationship: {e}" + f"Failed to update IAM policy: {e}. " + f"Please ensure you have sufficient IAM permissions." + ) + raise + except KeyError as e: + logger.error( + f"Missing required field for IAM policy creation: {e}" ) raise From ab0c06dcf3fd1d8ec7e924d7c7672e2a37b15e76 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:21:42 +0100 Subject: [PATCH 18/26] Update error handling messages for AWS in Sagemaker orchestrator --- .../integrations/aws/orchestrators/sagemaker_orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 11ee935313..29e0389494 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -244,8 +244,8 @@ def prepare_or_run_pipeline( TypeError: If the network_config passed is not compatible with the AWS SageMaker NetworkConfig class. KeyError: If required fields are missing from schedule_info. - ClientError: If there's an AWS API error - BotoCoreError: If there's an error in the AWS SDK + ClientError: If there's an AWS API error. + BotoCoreError: If there's an error in the AWS SDK. Yields: A dictionary of metadata related to the pipeline run. From 3c12d4594d0d853e046bf714bfccbb1a462e93c6 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:37:35 +0100 Subject: [PATCH 19/26] Refactor error handling in SagemakerOrchestrator --- .../aws/orchestrators/sagemaker_orchestrator.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 29e0389494..367d2b060a 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -243,9 +243,6 @@ def prepare_or_run_pipeline( `boto3.Session` object. TypeError: If the network_config passed is not compatible with the AWS SageMaker NetworkConfig class. - KeyError: If required fields are missing from schedule_info. - ClientError: If there's an AWS API error. - BotoCoreError: If there's an error in the AWS SDK. Yields: A dictionary of metadata related to the pipeline run. @@ -567,16 +564,17 @@ def prepare_or_run_pipeline( logger.info(f"Created/Updated IAM policy: {policy_name}") except (ClientError, BotoCoreError) as e: - logger.error( + logger.warning( f"Failed to update IAM policy: {e}. " - f"Please ensure you have sufficient IAM permissions." + f"Please ensure your execution role has sufficient permissions " + f"to start pipeline executions." ) - raise except KeyError as e: - logger.error( - f"Missing required field for IAM policy creation: {e}" + logger.warning( + f"Missing required field for IAM policy creation: {e}. " + f"Please ensure your execution role has sufficient permissions " + f"to start pipeline executions." ) - raise # Create the EventBridge rule events_client = session.boto_session.client("events") From 51a499a884f4dc3467d5f825bfffc1376ee89ece Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:43:11 +0100 Subject: [PATCH 20/26] Handle insufficient permissions creating EventBridge rules --- .../orchestrators/sagemaker_orchestrator.py | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 367d2b060a..5c9b0f54a5 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -240,7 +240,8 @@ def prepare_or_run_pipeline( Raises: RuntimeError: If a connector is used that does not return a - `boto3.Session` object. + `boto3.Session` object, or if there are insufficient permissions + to create EventBridge rules. TypeError: If the network_config passed is not compatible with the AWS SageMaker NetworkConfig class. @@ -626,23 +627,28 @@ def prepare_or_run_pipeline( ) ) - events_client.put_rule( - Name=rule_name, - ScheduleExpression=schedule_expr, - State="ENABLED", - ) - - # Add the SageMaker pipeline as target with the role - events_client.put_targets( - Rule=rule_name, - Targets=[ - { - "Id": f"zenml-target-{deployment.pipeline_configuration.name}", - "Arn": f"arn:aws:sagemaker:{session.boto_region_name}:{session.boto_session.client('sts').get_caller_identity()['Account']}:pipeline/{orchestrator_run_name}", - "RoleArn": self.config.execution_role, - } - ], - ) + try: + events_client.put_rule( + Name=rule_name, + ScheduleExpression=schedule_expr, + State="ENABLED", + ) + # Add the SageMaker pipeline as target with the role + events_client.put_targets( + Rule=rule_name, + Targets=[ + { + "Id": f"zenml-target-{deployment.pipeline_configuration.name}", + "Arn": f"arn:aws:sagemaker:{session.boto_region_name}:{session.boto_session.client('sts').get_caller_identity()['Account']}:pipeline/{orchestrator_run_name}", + "RoleArn": self.config.execution_role, + } + ], + ) + except (ClientError, BotoCoreError) as e: + raise RuntimeError( + f"Failed to create EventBridge target. Please ensure you have " + f"sufficient permissions to create and manage EventBridge targets: {str(e)}" + ) from e logger.info( f"Successfully scheduled pipeline with rule: {rule_name}\n" From 7d65a20e0fc8e7d3f3ea791d77084e74a5dda19c Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:44:09 +0100 Subject: [PATCH 21/26] Update error message for EventBridge creation failure --- .../integrations/aws/orchestrators/sagemaker_orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 5c9b0f54a5..6e82623382 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -646,8 +646,8 @@ def prepare_or_run_pipeline( ) except (ClientError, BotoCoreError) as e: raise RuntimeError( - f"Failed to create EventBridge target. Please ensure you have " - f"sufficient permissions to create and manage EventBridge targets: {str(e)}" + f"Failed to create EventBridge rule or target. Please ensure you have " + f"sufficient permissions to create and manage EventBridge rules and targets: {str(e)}" ) from e logger.info( From e2ecaf3dbd186d2a360a5fd885d213beb96ab8c2 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 22:51:10 +0100 Subject: [PATCH 22/26] Remove logging in SagemakerOrchestrator class --- .../aws/orchestrators/sagemaker_orchestrator.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 6e82623382..7413902984 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -519,23 +519,6 @@ def prepare_or_run_pipeline( schedule_expr = f"cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})" next_execution = deployment.schedule.run_once_start_time - logger.info( - f"Creating EventBridge rule with schedule expression: {schedule_expr}\n" - f"Note: AWS EventBridge schedules are always executed in UTC timezone.\n" - + ( - f"First execution will occur at: {next_execution.strftime('%Y-%m-%d %H:%M:%S')} " - f"({next_execution.astimezone().tzinfo}) / " - f"{next_execution.astimezone(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} (UTC)" - if next_execution - else f"Using cron expression: {deployment.schedule.cron_expression}" - ) - + ( - f" (and every {int(minutes)} minutes after)" - if deployment.schedule.interval_second - else "" - ) - ) - # Create IAM policy for EventBridge iam_client = session.boto_session.client("iam") role_name = self.config.execution_role.split("/")[ From a5fd82bac4a73b352c75845ef89e6acfb3c7ee0d Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Fri, 20 Dec 2024 23:02:53 +0100 Subject: [PATCH 23/26] Refactor orchestrator metadata computation logic --- .../orchestrators/sagemaker_orchestrator.py | 71 ------------------- .../test_sagemaker_orchestrator.py | 21 ------ 2 files changed, 92 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index 7413902984..f751db42db 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -809,15 +809,6 @@ def compute_metadata( if next_execution := execution.get("next_execution"): metadata["next_execution_time"] = next_execution.isoformat() - - # Add orchestrator metadata using the same pattern as execution metadata - if orchestrator_url := self._compute_schedule_url(execution): - metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url) - - if logs_url := self._compute_schedule_logs_url( - execution, settings - ): - metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url) else: # Handle execution metadata if run_id := self._compute_orchestrator_run_id(execution): @@ -926,65 +917,3 @@ def _compute_orchestrator_run_id( f"There was an issue while extracting the pipeline run ID: {e}" ) return None - - @staticmethod - def _compute_schedule_url(schedule_info: Dict[str, Any]) -> Optional[str]: - """Generate the SageMaker Console URL for a scheduled pipeline. - - Args: - schedule_info: Dictionary containing schedule information. - - Returns: - The URL to the pipeline in the SageMaker console. - """ - try: - # Get the Sagemaker session - session = boto3.Session(region_name=schedule_info["region"]) - sagemaker_client = session.client("sagemaker") - - # List the Studio domains and get the Studio Domain ID - domains_response = sagemaker_client.list_domains() - studio_domain_id = domains_response["Domains"][0]["DomainId"] - - return ( - f"https://studio-{studio_domain_id}.studio.{schedule_info['region']}." - f"sagemaker.aws/pipelines/view/{schedule_info['pipeline_name']}" - ) - except Exception as e: - logger.warning( - f"There was an issue while extracting the pipeline url: {e}" - ) - return None - - @staticmethod - def _compute_schedule_logs_url( - schedule_info: Dict[str, Any], - settings: SagemakerOrchestratorSettings, - ) -> Optional[str]: - """Generate the CloudWatch URL for a scheduled pipeline. - - Args: - schedule_info: Dictionary containing schedule information. - settings: The Sagemaker orchestrator settings. - - Returns: - The URL to query the pipeline logs in CloudWatch. - """ - try: - use_training_jobs = True - if settings.use_training_step is not None: - use_training_jobs = settings.use_training_step - - job_type = "Training" if use_training_jobs else "Processing" - - return ( - f"https://{schedule_info['region']}.console.aws.amazon.com/" - f"cloudwatch/home?region={schedule_info['region']}#logsV2:" - f"log-groups/log-group/$252Faws$252Fsagemaker$252F{job_type}Jobs" - f"$3FlogStreamNameFilter$3Dpipelines-" - ) - except Exception as e: - logger.warning( - f"There was an issue while extracting the logs url: {e}" - ) - return None diff --git a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py index 5cc06497a2..9d4299b9fc 100644 --- a/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py +++ b/tests/integration/integrations/aws/orchestrators/test_sagemaker_orchestrator.py @@ -16,11 +16,6 @@ from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch -from zenml.constants import ( - METADATA_ORCHESTRATOR_LOGS_URL, - METADATA_ORCHESTRATOR_RUN_ID, - METADATA_ORCHESTRATOR_URL, -) from zenml.enums import StackComponentType from zenml.integrations.aws.flavors import SagemakerOrchestratorFlavor from zenml.integrations.aws.flavors.sagemaker_orchestrator_flavor import ( @@ -91,17 +86,6 @@ def test_compute_schedule_metadata(): assert metadata["pipeline_name"] == "test-pipeline" assert metadata["next_execution_time"] == next_execution.isoformat() - # Verify that boto3 Session was created with correct region - mock_session.assert_called_once_with(region_name="us-west-2") - - # Verify orchestrator metadata - assert metadata[METADATA_ORCHESTRATOR_URL] == ( - "https://studio-d-test123.studio.us-west-2.sagemaker.aws/pipelines/view/test-pipeline" - ) - assert metadata[METADATA_ORCHESTRATOR_LOGS_URL].startswith( - "https://us-west-2.console.aws.amazon.com/cloudwatch/home" - ) - def test_compute_schedule_metadata_error_handling(): """Tests error handling in schedule metadata computation.""" @@ -142,8 +126,3 @@ def test_compute_schedule_metadata_error_handling(): assert metadata["schedule_type"] == "rate" assert metadata["schedule_expression"] == "rate(1 hour)" assert metadata["pipeline_name"] == "test-pipeline" - - # URLs should be None due to error - assert METADATA_ORCHESTRATOR_RUN_ID not in metadata - assert METADATA_ORCHESTRATOR_URL not in metadata - assert METADATA_ORCHESTRATOR_LOGS_URL not in metadata From a005e5a2c862dd1c5b4f1d352eda0dc81ccb8427 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 22 Dec 2024 20:41:38 +0100 Subject: [PATCH 24/26] Update handling of scheduled pipeline updates in SageMaker.md --- docs/book/component-guide/orchestrators/sagemaker.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/book/component-guide/orchestrators/sagemaker.md b/docs/book/component-guide/orchestrators/sagemaker.md index 2da138af2d..0602166b8d 100644 --- a/docs/book/component-guide/orchestrators/sagemaker.md +++ b/docs/book/component-guide/orchestrators/sagemaker.md @@ -371,6 +371,10 @@ When you deploy a scheduled pipeline, ZenML will: 2. Configure the necessary IAM permissions 3. Set up the SageMaker pipeline as the target +{% hint style="info" %} +If you run the same pipeline with a schedule multiple times, the existing schedule will be updated with the new settings rather than creating a new schedule. This allows you to modify schedules by simply running the pipeline again with new schedule parameters. +{% endhint %} + #### Required IAM Permissions When using scheduled pipelines, you need to ensure your IAM role has the correct permissions and trust relationships. Here's a detailed breakdown of why each permission is needed: From 8811f3f7abb37a8c3637f2dfb99e5f9fcf1b0318 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 23 Dec 2024 08:55:52 +0100 Subject: [PATCH 25/26] Add optional IAM permissions for policy updates --- .../orchestrators/sagemaker.md | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/docs/book/component-guide/orchestrators/sagemaker.md b/docs/book/component-guide/orchestrators/sagemaker.md index 0602166b8d..869c988f76 100644 --- a/docs/book/component-guide/orchestrators/sagemaker.md +++ b/docs/book/component-guide/orchestrators/sagemaker.md @@ -418,14 +418,23 @@ In addition to the basic SageMaker permissions, the AWS credentials used by the "events:ListTargetsByRule" // Required to verify target setup ], "Resource": "arn:aws:events:*:*:rule/zenml-*" - }, + } + ] +} +``` + +The following IAM permissions are optional but recommended to allow automatic policy updates for the execution role: +```json +{ + "Version": "2012-10-17", + "Statement": [ { "Effect": "Allow", "Action": [ - "iam:GetRole", // Required to verify role exists - "iam:GetRolePolicy", // Required to check existing policies - "iam:PutRolePolicy", // Required to add new policies - "iam:UpdateAssumeRolePolicy" // Required to update trust relationships + "iam:GetRole", // For verifying role exists + "iam:GetRolePolicy", // For checking existing policies + "iam:PutRolePolicy", // For adding new policies + "iam:UpdateAssumeRolePolicy" // For updating trust relationships ], "Resource": "arn:aws:iam::*:role/*" } @@ -439,6 +448,6 @@ These permissions enable: * Managing IAM policies required for the scheduled execution * Cleanup of resources when schedules are removed -Without these permissions, the scheduling functionality will fail with access denied errors. +Without the EventBridge permissions, the scheduling functionality will fail. Without the IAM permissions, you'll need to manually ensure your execution role has the necessary permissions to start pipeline executions.
ZenML Scarf
\ No newline at end of file From 44fa6c5039dc3b5751a08e781e81605c6b05a631 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 23 Dec 2024 09:16:00 +0100 Subject: [PATCH 26/26] Remove redundant code for getting SageMaker session --- .../integrations/aws/orchestrators/sagemaker_orchestrator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py index f751db42db..72066fae64 100644 --- a/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py +++ b/src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py @@ -248,9 +248,6 @@ def prepare_or_run_pipeline( Yields: A dictionary of metadata related to the pipeline run. """ - # Get the session and client - session = self._get_sagemaker_session() - # sagemaker requires pipelineName to use alphanum and hyphens only unsanitized_orchestrator_run_name = get_orchestrator_run_name( pipeline_name=deployment.pipeline_configuration.name