-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ext] databricks EXT integration #15955
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
0453b46
to
32b45f5
Compare
14e83f4
to
8003b18
Compare
32b45f5
to
22acced
Compare
8003b18
to
a89f1d7
Compare
fbd39a2
to
ba73bf3
Compare
a89f1d7
to
635ca4e
Compare
ba73bf3
to
43f3716
Compare
635ca4e
to
2f23d4b
Compare
Deploy preview for dagit-core-storybook ready! ✅ Preview Built with commit fba44c4. |
2f23d4b
to
353da68
Compare
353da68
to
6062d8f
Compare
ec813a2
to
80918d2
Compare
a47c7ed
to
f1dde17
Compare
f1dde17
to
cbd192d
Compare
6af3f7e
to
59bff80
Compare
class ExtBufferedFilesystemMessageWriterChannel(ExtBlobStoreMessageWriterChannel): | ||
def __init__(self, path: str, *, interval: float = 10): | ||
super().__init__(interval=interval) | ||
self._path = path | ||
|
||
def upload_messages_chunk(self, payload: IO, index: int) -> None: | ||
message_path = os.path.join(self._path, f"{index}.json") | ||
with open(message_path, "w") as f: | ||
f.write(payload.read()) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dbfs exposes a traditional I/O interface in python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it is mounted on /dbfs
while True: | ||
run = self.client.jobs.get_run(run_id) | ||
if run.state.life_cycle_state in ( | ||
jobs.RunLifeCycleState.TERMINATED, | ||
jobs.RunLifeCycleState.SKIPPED, | ||
): | ||
if run.state.result_state == jobs.RunResultState.SUCCESS: | ||
return | ||
else: | ||
raise DagsterExternalExecutionError( | ||
f"Error running Databricks job: {run.state.state_message}" | ||
) | ||
elif run.state.life_cycle_state == jobs.RunLifeCycleState.INTERNAL_ERROR: | ||
raise DagsterExternalExecutionError( | ||
f"Error running Databricks job: {run.state.state_message}" | ||
) | ||
time.sleep(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should log on every tick so the user is confident something is happening
## EXT Example | ||
|
||
This package includes a prototype API for launching databricks jobs with | ||
Dagster's EXT protocol. There are two ways to use the API: | ||
|
||
### (1) `ExtDatabricks` resource | ||
|
||
The `ExtDatabricks` resource provides a high-level API for launching | ||
databricks jobs using Dagster's EXT protocol. | ||
|
||
It takes a single `databricks.sdk.service.jobs.SubmitTask` specification. After | ||
setting up EXT communications channels (which by default use DBFS), it injects | ||
the information needed to connect to these channels from Databricks into the | ||
task specification. It then launches a Databricks job by passing the | ||
specification to `WorkspaceClient.jobs.submit`. It polls the job state and | ||
exits gracefully on success or failure: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should lowercase ext. It's not principled, but it looks cooler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong/misguide on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's discuss in standup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't end up discussing but I changed it. Personally am ambivalent.
context_injector: Optional[ExtContextInjector] = None, | ||
message_reader: Optional[ExtMessageReader] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebase on my PR
with dbfs_tempdir(self.dbfs_client) as tempdir: | ||
self.tempdir = tempdir | ||
yield | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought we were doing this sort of mutable state business
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was necessary given the existing setup of ExtBlobStoreMessageReader
Updated with some changes to make it unnecessary.
dbfs_client.delete(tempdir, recursive=True) | ||
|
||
|
||
class ExtDbfsContextInjector(ExtContextInjector): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its probably not the most consequential but I'm curious what would be the motivations to using this instead of ExtEnvContextInjector
. Is it anything beyond just concerns over env var size limits? Does databricks call those limits out explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our meeting with Enigma, their guy said that they frequently run into size limits when passing their "context" (in our ontology, extras) over CLI, thought it would happen with env vars too, and explicitly suggested a DBFS mechanism.
066249e
to
684284d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I'm not a fan of the implications of a
setup
method on the message reader. Do you anticipate needing that to modify state? - What is the plan for automated testing?
Internally, `ExtDatabricks` is using the `ext_protocol` context manager to set | ||
up communications. If you have existing code to launch/poll the job you do not | ||
want to change, or you just want more control than is permitted by | ||
`ExtDatabricks`, you can use this lower level API directly. All that is | ||
necessary is that (1) your Databricks job be launched within the scope of the | ||
`ext_process` context manager; (2) your job is launched on a cluster containing | ||
the environment variables available on the yielded `ext_context`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would drop "internally". It makes people think they shouldn't use it. This is a first-class supported API. Actually just drop the first line.
If you have existing code to launch/poll the job you do not
want to change, or you just want more control than is permitted by
ExtDatabricks
, you can use ext_protocol
.
tempdir: Optional[str] = None | ||
|
||
def __init__(self, *, interval: int = 10, client: WorkspaceClient): | ||
super().__init__(interval=interval) | ||
self.dbfs_client = files.DbfsAPI(client.api_client) | ||
|
||
@contextmanager | ||
def setup(self) -> Iterator[ExtParams]: | ||
with dbfs_tempdir(self.dbfs_client) as tempdir: | ||
self.tempdir = tempdir | ||
yield {"path": tempdir} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to be a property? can it not be a local variable in setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry the property was leftover from before, it is now passed down thru params
-- removed property
684284d
to
2654a60
Compare
The With the modifications in the current state of the PR, it shouldn't need to modify state on the class, instead it yields params which get passed down.
Add our databricks test account secrets to BK and remove the skip on BK pytest marks. |
2654a60
to
2e5ab2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
req'ing changed based on error swallowing
@abstractmethod | ||
def get_params(self) -> ExtParams: | ||
@contextmanager | ||
def setup(self) -> Iterator[ExtParams]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please name get_params
or with_params
or something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to get_params
|
||
while True: | ||
run = self.client.jobs.get_run(run_id) | ||
context.log.info(f"Run state: {run.state.life_cycle_state}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a more information message. Someone who is just viewing the run but has not read the code should be able to understand what is going on.
f"Current run state of databricks run {run_id}: {run.state.life_cycle_state}"
Would be cool to render a url the points to databricks as well, but that is probably context-specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the message along suggested lines.
I'm not sure how to get the URL at present and trying to get this in for release this AM.
except IOError: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just silently swallow the error?
At minimum we should warn or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error is swallowed because it's not an unexpected result. This is the polling mechanism for the presence of the next message chunk. If the chunk doesn't exist, it throws an IOError
. I've added a comment explaining this.
2e5ab2e
to
fba44c4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great. It is critical that you follow up quickly with real automated tests here.
unmounted_path = _assert_env_param_type(params, "path", str, self.__class__) | ||
path = os.path.join("/dbfs", unmounted_path.lstrip("/")) | ||
with open(path, "r") as f: | ||
data = json.load(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can just yield directly yield json.load(f)
fba44c4
to
096980a
Compare
- Add `dagster-ext` integration for Databricks. This is designed to interfere with the official databricks SDK as little as possible-- you pass in Databricks SDK data structures to `ExtDatabricks` and the only modification it makes is injecting the necessary environment variables. The rest of cluster config etc is left to the user. It is separate from the rest of the databricks integration. - Add example usage to `dagster_databricks/README.md` New unit tests (they are skipped on BK though)
## Summary & Motivation - Add `dagster-ext` integration for Databricks. This is designed to interfere with the official databricks SDK as little as possible-- you pass in Databricks SDK data structures to `ExtDatabricks` and the only modification it makes is injecting the necessary environment variables. The rest of cluster config etc is left to the user. It is separate from the rest of the databricks integration. - Add example usage to `dagster_databricks/README.md` ## How I Tested These Changes New unit tests (they are skipped on BK though)
f"dagster-pyspark{pin}", | ||
"databricks-cli~=0.17", # TODO: Remove this dependency in the next minor release. | ||
"databricks_api", # TODO: Remove this dependency in the next minor release. | ||
"databricks-sdk<0.7", # Breaking changes occur in minor versions. | ||
"databricks-sdk<0.9", # Breaking changes occur in minor versions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
X
|
||
def run( | ||
self, | ||
task: jobs.SubmitTask, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Y
Summary & Motivation
dagster-ext
integration for Databricks. This is designed to interfere with the official databricks SDK as little as possible-- you pass in Databricks SDK data structures toExtDatabricks
and the only modification it makes is injecting the necessary environment variables. The rest of cluster config etc is left to the user. It is separate from the rest of the databricks integration.dagster_databricks/README.md
How I Tested These Changes
New unit tests (they are skipped on BK though)