From 813f817060a94dae66cf884c4d65adba7417e028 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Thu, 29 Aug 2024 09:40:30 +0200 Subject: [PATCH] add PipesEMRClient --- pyright/alt-1/requirements-pinned.txt | 11 +- pyright/master/requirements-pinned.txt | 33 +++--- .../dagster-aws/dagster_aws/pipes/__init__.py | 8 +- .../dagster_aws/pipes/clients/__init__.py | 3 +- .../dagster_aws/pipes/clients/emr.py | 103 ++++++++++++++++++ python_modules/libraries/dagster-aws/setup.py | 2 +- 6 files changed, 136 insertions(+), 24 deletions(-) create mode 100644 python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr.py diff --git a/pyright/alt-1/requirements-pinned.txt b/pyright/alt-1/requirements-pinned.txt index 438297a5a028f..cd0cd7fe60466 100644 --- a/pyright/alt-1/requirements-pinned.txt +++ b/pyright/alt-1/requirements-pinned.txt @@ -26,7 +26,7 @@ bleach==6.1.0 boto3==1.35.7 boto3-stubs-lite==1.35.8 botocore==1.35.7 -botocore-stubs==1.35.7 +botocore-stubs==1.35.8 buildkite-test-collector==0.1.9 cachetools==5.5.0 caio==0.9.17 @@ -75,7 +75,7 @@ dbt-snowflake==1.8.3 debugpy==1.8.5 decopatch==1.4.10 decorator==5.1.1 -deepdiff==8.0.0 +deepdiff==8.0.1 defusedxml==0.7.1 deltalake==0.19.1 dill==0.3.8 @@ -108,7 +108,7 @@ graphene==3.3 graphql-core==3.2.3 graphql-relay==3.2.0 greenlet==3.0.3 -grpcio==1.66.0 +grpcio==1.66.1 grpcio-health-checking==1.62.3 grpcio-status==1.62.3 grpcio-tools==1.62.3 @@ -171,6 +171,7 @@ multidict==6.0.5 multimethod==1.10 mypy==1.11.2 mypy-boto3-ecs==1.35.2 +mypy-boto3-emr==1.35.0 mypy-boto3-glue==1.35.3 mypy-extensions==1.0.0 mypy-protobuf==3.6.0 @@ -185,7 +186,7 @@ numpy==2.1.0 oauth2client==4.1.3 oauthlib==3.2.2 objgraph==3.6.1 -orderly-set==5.2.1 +orderly-set==5.2.2 orjson==3.10.7 overrides==7.7.0 packaging==24.1 @@ -329,7 +330,7 @@ wcwidth==0.2.13 webcolors==24.8.0 webencodings==0.5.1 websocket-client==1.8.0 -websockets==13.0 +websockets==13.0.1 wheel==0.44.0 wrapt==1.16.0 yarl==1.9.4 diff --git a/pyright/master/requirements-pinned.txt b/pyright/master/requirements-pinned.txt index 4a42c9139db0c..54e9b7b4527d0 100644 --- a/pyright/master/requirements-pinned.txt +++ b/pyright/master/requirements-pinned.txt @@ -60,7 +60,7 @@ bokeh==3.5.2 boto3==1.35.8 boto3-stubs-lite==1.35.8 botocore==1.35.8 -botocore-stubs==1.35.7 +botocore-stubs==1.35.8 buildkite-test-collector==0.1.9 cachecontrol==0.14.0 cached-property==1.5.2 @@ -185,7 +185,7 @@ dbt-semantic-interfaces==0.5.1 debugpy==1.8.5 decopatch==1.4.10 decorator==5.1.1 -deepdiff==8.0.0 +deepdiff==8.0.1 defusedxml==0.7.1 deltalake==0.17.4 deprecated==1.2.14 @@ -255,7 +255,7 @@ graphql-relay==3.2.0 graphviz==0.20.3 great-expectations==0.17.11 greenlet==3.0.3 -grpcio==1.66.0 +grpcio==1.66.1 grpcio-health-checking==1.62.3 grpcio-status==1.62.3 grpcio-tools==1.62.3 @@ -291,7 +291,7 @@ jiter==0.5.0 jmespath==1.0.1 joblib==1.4.2 json5==0.9.25 -jsondiff==2.2.0 +jsondiff==2.2.1 jsonpatch==1.33 jsonpath-ng==1.6.1 jsonpointer==3.0.0 @@ -358,6 +358,7 @@ msgpack==1.0.8 multidict==6.0.5 multimethod==1.10 mypy-boto3-ecs==1.35.2 +mypy-boto3-emr==1.35.0 mypy-boto3-glue==1.35.3 mypy-extensions==1.0.0 mypy-protobuf==3.6.0 @@ -395,16 +396,16 @@ onnxruntime==1.19.0 openai==1.42.0 openapi-schema-validator==0.6.2 openapi-spec-validator==0.7.1 -opentelemetry-api==1.26.0 -opentelemetry-exporter-otlp==1.26.0 -opentelemetry-exporter-otlp-proto-common==1.26.0 -opentelemetry-exporter-otlp-proto-grpc==1.26.0 -opentelemetry-exporter-otlp-proto-http==1.26.0 -opentelemetry-proto==1.26.0 -opentelemetry-sdk==1.26.0 -opentelemetry-semantic-conventions==0.47b0 +opentelemetry-api==1.27.0 +opentelemetry-exporter-otlp==1.27.0 +opentelemetry-exporter-otlp-proto-common==1.27.0 +opentelemetry-exporter-otlp-proto-grpc==1.27.0 +opentelemetry-exporter-otlp-proto-http==1.27.0 +opentelemetry-proto==1.27.0 +opentelemetry-sdk==1.27.0 +opentelemetry-semantic-conventions==0.48b0 ordered-set==4.1.0 -orderly-set==5.2.1 +orderly-set==5.2.2 orjson==3.10.7 outcome==1.3.0.post0 overrides==7.7.0 @@ -436,7 +437,7 @@ ply==3.11 polars==1.6.0 portalocker==2.10.1 prison==0.2.1 -progressbar2==4.4.2 +progressbar2==4.5.0 prometheus-client==0.20.0 prometheus-flask-exporter==0.23.1 prompt-toolkit==3.0.47 @@ -634,14 +635,14 @@ uvicorn==0.30.6 uvloop==0.20.0 vine==5.1.0 virtualenv==20.25.0 -wandb==0.17.7 +wandb==0.17.8 watchdog==5.0.0 watchfiles==0.24.0 wcwidth==0.2.13 webcolors==24.8.0 webencodings==0.5.1 websocket-client==1.8.0 -websockets==13.0 +websockets==13.0.1 werkzeug==2.2.3 wheel==0.44.0 widgetsnbextension==4.0.13 diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py index e513f5cc16adf..da3ecd8049fcb 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/__init__.py @@ -1,4 +1,9 @@ -from dagster_aws.pipes.clients import PipesECSClient, PipesGlueClient, PipesLambdaClient +from dagster_aws.pipes.clients import ( + PipesECSClient, + PipesEMRClient, + PipesGlueClient, + PipesLambdaClient, +) from dagster_aws.pipes.context_injectors import ( PipesLambdaEventContextInjector, PipesS3ContextInjector, @@ -13,6 +18,7 @@ "PipesGlueClient", "PipesLambdaClient", "PipesECSClient", + "PipesEMRClient", "PipesS3ContextInjector", "PipesLambdaEventContextInjector", "PipesS3MessageReader", diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py index b7625af2e2cfb..5112d4761f094 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/__init__.py @@ -1,5 +1,6 @@ from dagster_aws.pipes.clients.ecs import PipesECSClient +from dagster_aws.pipes.clients.emr import PipesEMRClient from dagster_aws.pipes.clients.glue import PipesGlueClient from dagster_aws.pipes.clients.lambda_ import PipesLambdaClient -__all__ = ["PipesGlueClient", "PipesLambdaClient", "PipesECSClient"] +__all__ = ["PipesGlueClient", "PipesLambdaClient", "PipesECSClient", "PipesEMRClient"] diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr.py new file mode 100644 index 0000000000000..8243d0c075062 --- /dev/null +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr.py @@ -0,0 +1,103 @@ +from typing import TYPE_CHECKING, Optional + +import boto3 +from dagster._annotations import experimental +from dagster._core.execution.context.compute import OpExecutionContext +from dagster._core.pipes.client import PipesContextInjector, PipesMessageReader +from mypy_boto3_emr.type_defs import ( + DescribeClusterOutputTypeDef, + RunJobFlowInputRequestTypeDef, + RunJobFlowOutputTypeDef, +) + +from dagster_aws.pipes.clients.base import PipesBaseClient, class_docstring, run_docstring +from dagster_aws.pipes.message_readers import PipesCloudWatchMessageReader + +if TYPE_CHECKING: + from mypy_boto3_emr.literals import ClusterStateType + + +AWS_SERVICE_NAME = "EMR" + + +@experimental +@class_docstring(AWS_SERVICE_NAME) +class PipesEMRClient( + PipesBaseClient[ + RunJobFlowInputRequestTypeDef, RunJobFlowOutputTypeDef, DescribeClusterOutputTypeDef + ] +): + AWS_SERVICE_NAME = AWS_SERVICE_NAME + + def __init__( + self, + client=None, + context_injector: Optional[PipesContextInjector] = None, + message_reader: Optional[PipesMessageReader] = None, + forward_termination: bool = True, + ): + super().__init__( + client=client or boto3.client("emr"), + context_injector=context_injector, + message_reader=message_reader, + forward_termination=forward_termination, + ) + + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + + # is there a better way to do this? + # is it possible to automatically inject the class attributes into the method docs? + run = run_docstring(AWS_SERVICE_NAME, "run_job_flow")(PipesBaseClient.run) + + def _start( + self, context: OpExecutionContext, params: "RunJobFlowInputRequestTypeDef" + ) -> "RunJobFlowOutputTypeDef": + response = self._client.run_job_flow(**params) + cluster_id = response["JobFlowId"] + context.log.info( + f"[pipes] {self.AWS_SERVICE_NAME} job started with cluster id {cluster_id}" + ) + return response + + def _wait_for_completion( + self, context: OpExecutionContext, start_response: RunJobFlowOutputTypeDef + ) -> "DescribeClusterOutputTypeDef": + cluster_id = start_response["JobFlowId"] + self._client.get_waiter("cluster_running").wait(ClusterId=cluster_id) + context.log.info(f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} running") + # now wait for the job to complete + self._client.get_waiter("cluster_terminated").wait(ClusterId=cluster_id) + + response = self._client.describe_cluster(ClusterId=cluster_id) + + state: ClusterStateType = response["Cluster"]["Status"]["State"] + + context.log.info( + f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} completed with state: {state}" + ) + + if state == "FAILED": + context.log.error(f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} failed") + raise Exception(f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} failed") + + return response + + def _read_messages(self, context: OpExecutionContext, response: DescribeClusterOutputTypeDef): + if isinstance(self._message_reader, PipesCloudWatchMessageReader): + # we can get cloudwatch logs from the known log group + log_group, log_stream = response["Cluster"]["LogUri"].split(":", 1) # pyright: ignore (reportTypedDictNotRequiredAccess) + context.log.info(f"[pipes] Reading logs from {log_group}/{log_stream}") + self._message_reader.consume_cloudwatch_logs( + log_group, + log_stream, + start_time=int( + response["Cluster"]["Status"]["Timeline"]["CreationDateTime"].timestamp() * 1000 # pyright: ignore (reportTypedDictNotRequiredAccess) + ), + ) + + def _terminate(self, context: OpExecutionContext, start_response: RunJobFlowOutputTypeDef): + cluster_id = start_response["JobFlowId"] + context.log.info(f"[pipes] Terminating {self.AWS_SERVICE_NAME} job {cluster_id}") + self._client.terminate_job_flows(JobFlowIds=[cluster_id]) diff --git a/python_modules/libraries/dagster-aws/setup.py b/python_modules/libraries/dagster-aws/setup.py index 1808a13fcd820..b8d70c149379d 100644 --- a/python_modules/libraries/dagster-aws/setup.py +++ b/python_modules/libraries/dagster-aws/setup.py @@ -37,7 +37,7 @@ def get_version() -> str: python_requires=">=3.8,<3.13", install_requires=[ "boto3", - "boto3-stubs-lite[ecs,glue]", + "boto3-stubs-lite[ecs,glue,emr]", f"dagster{pin}", "packaging", "requests",