Skip to content

Commit

Permalink
[ext] databricks EXT integration (#15955)
Browse files Browse the repository at this point in the history
## Summary & Motivation

- Add `dagster-ext` integration for Databricks. This is designed to
interfere with the official databricks SDK as little as possible-- you
pass in Databricks SDK data structures to `ExtDatabricks` and the only
modification it makes is injecting the necessary environment variables.
The rest of cluster config etc is left to the user. It is separate from
the rest of the databricks integration.
- Add example usage to `dagster_databricks/README.md`

## How I Tested These Changes

New unit tests (they are skipped on BK though)
  • Loading branch information
smackesey authored Sep 15, 2023
1 parent 1eeafa8 commit 59bcc1f
Show file tree
Hide file tree
Showing 12 changed files with 495 additions and 18 deletions.
1 change: 1 addition & 0 deletions docs/sphinx/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### dagster packages
"../../python_modules/automation",
"../../python_modules/dagster",
"../../python_modules/dagster-ext",
"../../python_modules/dagster-graphql",
"../../python_modules/dagit",
"../../python_modules/dagster-webserver",
Expand Down
1 change: 1 addition & 0 deletions python_modules/automation/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ download = True
passenv = CI_PULL_REQUEST COVERALLS_REPO_TOKEN BUILDKITE*
deps =
-e ../dagster[test]
-e ../dagster-ext
-e ../dagster-graphql
-e ../libraries/dagster-managed-elements
-e ../libraries/dagster-airbyte
Expand Down
40 changes: 38 additions & 2 deletions python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ def _upload_loop(self, is_task_complete: Event) -> None:
time.sleep(1)


class ExtBufferedFilesystemMessageWriterChannel(ExtBlobStoreMessageWriterChannel):
def __init__(self, path: str, *, interval: float = 10):
super().__init__(interval=interval)
self._path = path

def upload_messages_chunk(self, payload: IO, index: int) -> None:
message_path = os.path.join(self._path, f"{index}.json")
with open(message_path, "w") as f:
f.write(payload.read())


# ########################
# ##### IO - DEFAULT
# ########################
Expand Down Expand Up @@ -500,7 +511,6 @@ class ExtS3MessageWriter(ExtBlobStoreMessageWriter):
# client is a boto3.client("s3") object
def __init__(self, client: Any, *, interval: float = 10):
super().__init__(interval=interval)
self._interval = _assert_param_type(interval, float, self.__class__.__name__, "interval")
# Not checking client type for now because it's a boto3.client object and we don't want to
# depend on boto3.
self._client = client
Expand All @@ -515,7 +525,7 @@ def make_channel(
client=self._client,
bucket=bucket,
key_prefix=key_prefix,
interval=self._interval,
interval=self.interval,
)


Expand All @@ -538,6 +548,32 @@ def upload_messages_chunk(self, payload: IO, index: int) -> None:
)


# ########################
# ##### IO - DBFS
# ########################


class ExtDbfsContextLoader(ExtContextLoader):
@contextmanager
def load_context(self, params: ExtParams) -> Iterator[ExtContextData]:
unmounted_path = _assert_env_param_type(params, "path", str, self.__class__)
path = os.path.join("/dbfs", unmounted_path.lstrip("/"))
with open(path, "r") as f:
yield json.load(f)


class ExtDbfsMessageWriter(ExtBlobStoreMessageWriter):
def make_channel(
self,
params: ExtParams,
) -> "ExtBufferedFilesystemMessageWriterChannel":
unmounted_path = _assert_env_param_type(params, "path", str, self.__class__)
return ExtBufferedFilesystemMessageWriterChannel(
path=os.path.join("/dbfs", unmounted_path.lstrip("/")),
interval=self.interval,
)


# ########################
# ##### CONTEXT
# ########################
Expand Down
1 change: 0 additions & 1 deletion python_modules/dagster/dagster/_core/ext/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def run(
*,
context: OpExecutionContext,
extras: Optional[ExtExtras] = None,
message_reader: Optional["ExtMessageReader"] = None,
) -> None:
...

Expand Down
20 changes: 10 additions & 10 deletions python_modules/dagster/dagster/_core/ext/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,43 +130,43 @@ def read_messages(
self,
handler: "ExtMessageHandler",
) -> Iterator[ExtParams]:
with self.setup():
with self.get_params() as params:
is_task_complete = Event()
thread = None
try:
thread = Thread(
target=self._reader_thread,
args=(
handler,
params,
is_task_complete,
),
daemon=True,
)
thread.start()
yield self.get_params()
yield params
finally:
is_task_complete.set()
if thread:
thread.join()

@contextmanager
def setup(self) -> Iterator[None]:
yield

@abstractmethod
def get_params(self) -> ExtParams:
@contextmanager
def get_params(self) -> Iterator[ExtParams]:
...

@abstractmethod
def download_messages_chunk(self, index: int) -> Optional[str]:
def download_messages_chunk(self, index: int, params: ExtParams) -> Optional[str]:
...

def _reader_thread(self, handler: "ExtMessageHandler", is_task_complete: Event) -> None:
def _reader_thread(
self, handler: "ExtMessageHandler", params: ExtParams, is_task_complete: Event
) -> None:
start_or_last_download = datetime.datetime.now()
while True:
now = datetime.datetime.now()
if (now - start_or_last_download).seconds > self.interval or is_task_complete.is_set():
chunk = self.download_messages_chunk(self.counter)
chunk = self.download_messages_chunk(self.counter, params)
start_or_last_download = now
if chunk:
for line in chunk.split("\n"):
Expand Down
10 changes: 6 additions & 4 deletions python_modules/libraries/dagster-aws/dagster_aws/ext.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random
import string
from typing import Optional
from contextlib import contextmanager
from typing import Iterator, Optional

import boto3
import dagster._check as check
Expand All @@ -18,10 +19,11 @@ def __init__(self, *, interval: int = 10, bucket: str, client: boto3.client):
self.key_prefix = "".join(random.choices(string.ascii_letters, k=30))
self.client = client

def get_params(self) -> ExtParams:
return {"bucket": self.bucket, "key_prefix": self.key_prefix}
@contextmanager
def get_params(self) -> Iterator[ExtParams]:
yield {"bucket": self.bucket, "key_prefix": self.key_prefix}

def download_messages_chunk(self, index: int) -> Optional[str]:
def download_messages_chunk(self, index: int, params: ExtParams) -> Optional[str]:
key = f"{self.key_prefix}/{index}.json"
try:
obj = self.client.get_object(Bucket=self.bucket, Key=key)
Expand Down
146 changes: 146 additions & 0 deletions python_modules/libraries/dagster-databricks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,149 @@

The docs for `dagster-databricks` can be found
[here](https://docs.dagster.io/_apidocs/libraries/dagster-databricks).

## ext example

This package includes a prototype API for launching databricks jobs with
Dagster's EXT protocol. There are two ways to use the API:

### (1) `ExtDatabricks` resource

The `ExtDatabricks` resource provides a high-level API for launching
databricks jobs using Dagster's ext protocol.

`ExtDatabricks.run` takes a single `databricks.sdk.service.jobs.SubmitTask`
specification. After setting up ext communications channels (which by default
use DBFS), it injects the information needed to connect to these channels from
Databricks into the task specification. It then launches a Databricks job by
passing the specification to `WorkspaceClient.jobs.submit`. It polls the job
state and exits gracefully on success or failure:


```
import os
from dagster import AssetExecutionContext, Definitions, asset
from dagster_databricks import ExtDatabricks
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
@asset
def databricks_asset(context: AssetExecutionContext, ext: ExtDatabricks):
# task specification will be passed to databricks as-is, except for the
# injection of environment variables
task = jobs.SubmitTask.from_dict({
"new_cluster": { ... },
"libraries": [
# must include dagster-ext-process
{"pypi": {"package": "dagster-ext-process"}},
],
"task_key": "some-key",
"spark_python_task": {
"python_file": "dbfs:/myscript.py",
"source": jobs.Source.WORKSPACE,
}
})
# Arbitrary json-serializable data you want access to from the `ExtContext`
# in the databricks runtime. Assume `sample_rate` is a parameter used by
# the target job's business logic.
extras = {"sample_rate": 1.0}
# synchronously execute the databricks job
ext.run(
task=task,
context=context,
extras=extras,
)
client = WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
)
defs = Definitions(
assets=[databricks_asset],
resources = {"ext": ExtDatabricks(client)}
)
```

`ExtDatabricks.run` requires that the targeted python script
(`dbfs:/myscript.py` above) already exist in DBFS. Here is what it might look
like:

```
### dbfs:/myscript.py
# `dagster_ext` must be available in the databricks python environment
from dagster_ext import ExtDbfsContextLoader, ExtDbfsMessageWriter, init_dagster_ext
# Sets up communication channels and downloads the context data sent from Dagster.
# Note that while other `context_loader` and `message_writer` settings are
# possible, it is recommended to use the below settings for Databricks.
context = init_dagster_ext(
context_loader=ExtDbfsContextLoader(),
message_writer=ExtDbfsMessageWriter()
)
# Access the `extras` dict passed when launching the job from Dagster.
sample_rate = context.get_extra("sample_rate")
# Stream log message back to Dagster
context.log(f"Using sample rate: {sample_rate}")
# ... your code that computes and persists the asset
# Stream arbitrary metadata back to Dagster. This will be attached to the
# associated `AssetMaterialization`
context.report_asset_metadata("some_metric", get_metric(), metadata_type="text")
# Stream data version back to Dagster. This will also be attached to the
# associated `AssetMaterialization`.
context.report_asset_data_version(get_data_version())
```

### (2) `ext_protocol` context manager

If you have existing code to launch/poll the job you do not want to change, or
you just want more control than is permitted by `ExtDatabricks`, you can use
`ext_protocol`. All that is necessary is that (1) your Databricks job be
launched within the scope of the `ext_process` context manager; (2) your job is
launched on a cluster containing the environment variables available on the
yielded `ext_context`.

```
import os
from dagster import AssetExecutionContext, ext_protocol
from dagster_databricks import ExtDbfsContextInjector, ExtDbfsMessageReader
from databricks.sdk import WorkspaceClient
@asset
def databricks_asset(context: AssetExecutionContext):
client = WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"],
)
# Arbitrary json-serializable data you want access to from the `ExtContext`
# in the databricks runtime. Assume `sample_rate` is a parameter used by
# the target job's business logic.
extras = {"sample_rate": 1.0}
# Sets up ext communications channels
with ext_protocol(
context=context,
extras=extras,
context_injector=ExtDbfsContextInjector(client=client),
message_reader=ExtDbfsMessageReader(client=client),
) as ext_context:
# Dict[str, str] with environment variables containing ext comms info.
env_vars = ext_context.get_external_process_env_vars()
# Some function that handles launching/monitoring of the databricks job.
# It must ensure that the `env_vars` are set on the executing cluster.
custom_databricks_launch_code(env_vars)
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
DatabricksPySparkStepLauncher as DatabricksPySparkStepLauncher,
databricks_pyspark_step_launcher as databricks_pyspark_step_launcher,
)
from .ext import (
ExtDatabricks as ExtDatabricks,
ExtDbfsContextInjector as ExtDbfsContextInjector,
ExtDbfsMessageReader as ExtDbfsMessageReader,
dbfs_tempdir as dbfs_tempdir,
)
from .ops import (
create_databricks_run_now_op as create_databricks_run_now_op,
create_databricks_submit_run_op as create_databricks_submit_run_op,
Expand Down
Loading

1 comment on commit 59bcc1f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-646mmwr6i-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 59bcc1f.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.