Skip to content

Commit

Permalink
add PipesEMRClient
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Aug 29, 2024
1 parent e380677 commit 813f817
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 24 deletions.
11 changes: 6 additions & 5 deletions pyright/alt-1/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bleach==6.1.0
boto3==1.35.7
boto3-stubs-lite==1.35.8
botocore==1.35.7
botocore-stubs==1.35.7
botocore-stubs==1.35.8
buildkite-test-collector==0.1.9
cachetools==5.5.0
caio==0.9.17
Expand Down Expand Up @@ -75,7 +75,7 @@ dbt-snowflake==1.8.3
debugpy==1.8.5
decopatch==1.4.10
decorator==5.1.1
deepdiff==8.0.0
deepdiff==8.0.1
defusedxml==0.7.1
deltalake==0.19.1
dill==0.3.8
Expand Down Expand Up @@ -108,7 +108,7 @@ graphene==3.3
graphql-core==3.2.3
graphql-relay==3.2.0
greenlet==3.0.3
grpcio==1.66.0
grpcio==1.66.1
grpcio-health-checking==1.62.3
grpcio-status==1.62.3
grpcio-tools==1.62.3
Expand Down Expand Up @@ -171,6 +171,7 @@ multidict==6.0.5
multimethod==1.10
mypy==1.11.2
mypy-boto3-ecs==1.35.2
mypy-boto3-emr==1.35.0
mypy-boto3-glue==1.35.3
mypy-extensions==1.0.0
mypy-protobuf==3.6.0
Expand All @@ -185,7 +186,7 @@ numpy==2.1.0
oauth2client==4.1.3
oauthlib==3.2.2
objgraph==3.6.1
orderly-set==5.2.1
orderly-set==5.2.2
orjson==3.10.7
overrides==7.7.0
packaging==24.1
Expand Down Expand Up @@ -329,7 +330,7 @@ wcwidth==0.2.13
webcolors==24.8.0
webencodings==0.5.1
websocket-client==1.8.0
websockets==13.0
websockets==13.0.1
wheel==0.44.0
wrapt==1.16.0
yarl==1.9.4
Expand Down
33 changes: 17 additions & 16 deletions pyright/master/requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bokeh==3.5.2
boto3==1.35.8
boto3-stubs-lite==1.35.8
botocore==1.35.8
botocore-stubs==1.35.7
botocore-stubs==1.35.8
buildkite-test-collector==0.1.9
cachecontrol==0.14.0
cached-property==1.5.2
Expand Down Expand Up @@ -185,7 +185,7 @@ dbt-semantic-interfaces==0.5.1
debugpy==1.8.5
decopatch==1.4.10
decorator==5.1.1
deepdiff==8.0.0
deepdiff==8.0.1
defusedxml==0.7.1
deltalake==0.17.4
deprecated==1.2.14
Expand Down Expand Up @@ -255,7 +255,7 @@ graphql-relay==3.2.0
graphviz==0.20.3
great-expectations==0.17.11
greenlet==3.0.3
grpcio==1.66.0
grpcio==1.66.1
grpcio-health-checking==1.62.3
grpcio-status==1.62.3
grpcio-tools==1.62.3
Expand Down Expand Up @@ -291,7 +291,7 @@ jiter==0.5.0
jmespath==1.0.1
joblib==1.4.2
json5==0.9.25
jsondiff==2.2.0
jsondiff==2.2.1
jsonpatch==1.33
jsonpath-ng==1.6.1
jsonpointer==3.0.0
Expand Down Expand Up @@ -358,6 +358,7 @@ msgpack==1.0.8
multidict==6.0.5
multimethod==1.10
mypy-boto3-ecs==1.35.2
mypy-boto3-emr==1.35.0
mypy-boto3-glue==1.35.3
mypy-extensions==1.0.0
mypy-protobuf==3.6.0
Expand Down Expand Up @@ -395,16 +396,16 @@ onnxruntime==1.19.0
openai==1.42.0
openapi-schema-validator==0.6.2
openapi-spec-validator==0.7.1
opentelemetry-api==1.26.0
opentelemetry-exporter-otlp==1.26.0
opentelemetry-exporter-otlp-proto-common==1.26.0
opentelemetry-exporter-otlp-proto-grpc==1.26.0
opentelemetry-exporter-otlp-proto-http==1.26.0
opentelemetry-proto==1.26.0
opentelemetry-sdk==1.26.0
opentelemetry-semantic-conventions==0.47b0
opentelemetry-api==1.27.0
opentelemetry-exporter-otlp==1.27.0
opentelemetry-exporter-otlp-proto-common==1.27.0
opentelemetry-exporter-otlp-proto-grpc==1.27.0
opentelemetry-exporter-otlp-proto-http==1.27.0
opentelemetry-proto==1.27.0
opentelemetry-sdk==1.27.0
opentelemetry-semantic-conventions==0.48b0
ordered-set==4.1.0
orderly-set==5.2.1
orderly-set==5.2.2
orjson==3.10.7
outcome==1.3.0.post0
overrides==7.7.0
Expand Down Expand Up @@ -436,7 +437,7 @@ ply==3.11
polars==1.6.0
portalocker==2.10.1
prison==0.2.1
progressbar2==4.4.2
progressbar2==4.5.0
prometheus-client==0.20.0
prometheus-flask-exporter==0.23.1
prompt-toolkit==3.0.47
Expand Down Expand Up @@ -634,14 +635,14 @@ uvicorn==0.30.6
uvloop==0.20.0
vine==5.1.0
virtualenv==20.25.0
wandb==0.17.7
wandb==0.17.8
watchdog==5.0.0
watchfiles==0.24.0
wcwidth==0.2.13
webcolors==24.8.0
webencodings==0.5.1
websocket-client==1.8.0
websockets==13.0
websockets==13.0.1
werkzeug==2.2.3
wheel==0.44.0
widgetsnbextension==4.0.13
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from dagster_aws.pipes.clients import PipesECSClient, PipesGlueClient, PipesLambdaClient
from dagster_aws.pipes.clients import (
PipesECSClient,
PipesEMRClient,
PipesGlueClient,
PipesLambdaClient,
)
from dagster_aws.pipes.context_injectors import (
PipesLambdaEventContextInjector,
PipesS3ContextInjector,
Expand All @@ -13,6 +18,7 @@
"PipesGlueClient",
"PipesLambdaClient",
"PipesECSClient",
"PipesEMRClient",
"PipesS3ContextInjector",
"PipesLambdaEventContextInjector",
"PipesS3MessageReader",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster_aws.pipes.clients.ecs import PipesECSClient
from dagster_aws.pipes.clients.emr import PipesEMRClient
from dagster_aws.pipes.clients.glue import PipesGlueClient
from dagster_aws.pipes.clients.lambda_ import PipesLambdaClient

__all__ = ["PipesGlueClient", "PipesLambdaClient", "PipesECSClient"]
__all__ = ["PipesGlueClient", "PipesLambdaClient", "PipesECSClient", "PipesEMRClient"]
103 changes: 103 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/emr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from typing import TYPE_CHECKING, Optional

import boto3
from dagster._annotations import experimental
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.pipes.client import PipesContextInjector, PipesMessageReader
from mypy_boto3_emr.type_defs import (
DescribeClusterOutputTypeDef,
RunJobFlowInputRequestTypeDef,
RunJobFlowOutputTypeDef,
)

from dagster_aws.pipes.clients.base import PipesBaseClient, class_docstring, run_docstring
from dagster_aws.pipes.message_readers import PipesCloudWatchMessageReader

if TYPE_CHECKING:
from mypy_boto3_emr.literals import ClusterStateType


AWS_SERVICE_NAME = "EMR"


@experimental
@class_docstring(AWS_SERVICE_NAME)
class PipesEMRClient(
PipesBaseClient[
RunJobFlowInputRequestTypeDef, RunJobFlowOutputTypeDef, DescribeClusterOutputTypeDef
]
):
AWS_SERVICE_NAME = AWS_SERVICE_NAME

def __init__(
self,
client=None,
context_injector: Optional[PipesContextInjector] = None,
message_reader: Optional[PipesMessageReader] = None,
forward_termination: bool = True,
):
super().__init__(
client=client or boto3.client("emr"),
context_injector=context_injector,
message_reader=message_reader,
forward_termination=forward_termination,
)

@classmethod
def _is_dagster_maintained(cls) -> bool:
return True

# is there a better way to do this?
# is it possible to automatically inject the class attributes into the method docs?
run = run_docstring(AWS_SERVICE_NAME, "run_job_flow")(PipesBaseClient.run)

def _start(
self, context: OpExecutionContext, params: "RunJobFlowInputRequestTypeDef"
) -> "RunJobFlowOutputTypeDef":
response = self._client.run_job_flow(**params)
cluster_id = response["JobFlowId"]
context.log.info(
f"[pipes] {self.AWS_SERVICE_NAME} job started with cluster id {cluster_id}"
)
return response

def _wait_for_completion(
self, context: OpExecutionContext, start_response: RunJobFlowOutputTypeDef
) -> "DescribeClusterOutputTypeDef":
cluster_id = start_response["JobFlowId"]
self._client.get_waiter("cluster_running").wait(ClusterId=cluster_id)
context.log.info(f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} running")
# now wait for the job to complete
self._client.get_waiter("cluster_terminated").wait(ClusterId=cluster_id)

response = self._client.describe_cluster(ClusterId=cluster_id)

state: ClusterStateType = response["Cluster"]["Status"]["State"]

context.log.info(
f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} completed with state: {state}"
)

if state == "FAILED":
context.log.error(f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} failed")
raise Exception(f"[pipes] {self.AWS_SERVICE_NAME} job {cluster_id} failed")

return response

def _read_messages(self, context: OpExecutionContext, response: DescribeClusterOutputTypeDef):
if isinstance(self._message_reader, PipesCloudWatchMessageReader):
# we can get cloudwatch logs from the known log group
log_group, log_stream = response["Cluster"]["LogUri"].split(":", 1) # pyright: ignore (reportTypedDictNotRequiredAccess)
context.log.info(f"[pipes] Reading logs from {log_group}/{log_stream}")
self._message_reader.consume_cloudwatch_logs(
log_group,
log_stream,
start_time=int(
response["Cluster"]["Status"]["Timeline"]["CreationDateTime"].timestamp() * 1000 # pyright: ignore (reportTypedDictNotRequiredAccess)
),
)

def _terminate(self, context: OpExecutionContext, start_response: RunJobFlowOutputTypeDef):
cluster_id = start_response["JobFlowId"]
context.log.info(f"[pipes] Terminating {self.AWS_SERVICE_NAME} job {cluster_id}")
self._client.terminate_job_flows(JobFlowIds=[cluster_id])
2 changes: 1 addition & 1 deletion python_modules/libraries/dagster-aws/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_version() -> str:
python_requires=">=3.8,<3.13",
install_requires=[
"boto3",
"boto3-stubs-lite[ecs,glue]",
"boto3-stubs-lite[ecs,glue,emr]",
f"dagster{pin}",
"packaging",
"requests",
Expand Down

0 comments on commit 813f817

Please sign in to comment.