-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pipes] databricks unstructured log forwarding #16674
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
d0ba537
to
6271f93
Compare
31e284a
to
9586a94
Compare
6271f93
to
4b1eae1
Compare
9586a94
to
f38de5b
Compare
4b1eae1
to
548c678
Compare
f38de5b
to
66ddf03
Compare
548c678
to
5136c35
Compare
66ddf03
to
62b4adb
Compare
929b621
to
c311645
Compare
62b4adb
to
4e45cda
Compare
c311645
to
c7293a9
Compare
4e45cda
to
f1f6cdb
Compare
c7293a9
to
f87e1ec
Compare
f1f6cdb
to
83ddaff
Compare
f87e1ec
to
b2ac860
Compare
83ddaff
to
a5977c4
Compare
b2ac860
to
43aa232
Compare
a5977c4
to
6122f12
Compare
43aa232
to
536624b
Compare
6122f12
to
026a467
Compare
536624b
to
1b8e161
Compare
026a467
to
eb84985
Compare
1b8e161
to
7b751f0
Compare
Deploy preview for dagster-docs ready! Preview available at https://dagster-docs-b2fgzhihr-elementl.vercel.app Direct link to changed pages: |
2f01cb5
to
7f92f57
Compare
def _reader_thread( | ||
self, handler: "PipesMessageHandler", params: PipesParams, is_task_complete: Event | ||
def download_stdout_chunk(self, params: PipesParams) -> Optional[str]: | ||
raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NotImplementedError
instead of abstract methods because we don't need want to require every PipesBlobStoreMessageReader
to implement these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to reassess the approach here a bit per my inline comment.
def download_stdout_chunk(self, params: PipesParams) -> Optional[str]: | ||
raise NotImplementedError() | ||
|
||
def stdout_log_exists(self, params: PipesParams) -> bool: | ||
raise NotImplementedError() | ||
|
||
def download_stderr_chunk(self, params: PipesParams) -> Optional[str]: | ||
raise NotImplementedError() | ||
|
||
def stderr_log_exists(self, params: PipesParams) -> bool: | ||
raise NotImplementedError() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This structure seems totally off. We should composing this behavior in classes which implement exactly what they need here. This turns the message reader class here in a sort of chimera that is doing a bunch of things at the same time.
Agree. Refactored to extract a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking much better. Still think we should encapsulate the threads unless there is a compelling reason not to.
@alangenfeld you want to take a pass too?
stdout_thread = None | ||
stderr_thread = None | ||
try: | ||
thread = Thread( | ||
target=self._reader_thread, | ||
args=( | ||
handler, | ||
params, | ||
is_task_complete, | ||
), | ||
daemon=True, | ||
messages_thread = Thread( | ||
target=self._messages_thread, args=(handler, params, is_task_complete) | ||
) | ||
thread.start() | ||
messages_thread.start() | ||
if self.stdout_reader: | ||
stdout_thread = self.stdout_reader.start_thread(params, is_task_complete) | ||
if self.stderr_reader: | ||
stderr_thread = self.stderr_reader.start_thread(params, is_task_complete) | ||
yield params | ||
finally: | ||
self.wait_for_stdio_logs(params) | ||
is_task_complete.set() | ||
if thread: | ||
thread.join() | ||
if messages_thread: | ||
messages_thread.join() | ||
if stdout_thread: | ||
stdout_thread.join() | ||
if stderr_thread: | ||
stderr_thread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, seems like the PipesBlobStoreStdioReader
should encapsulate the thread completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated accordingly
|
||
|
||
class PipesS3MessageReader(PipesBlobStoreMessageReader): | ||
def __init__(self, *, interval: float = 10, bucket: str, client: boto3.client): | ||
super().__init__(interval=interval) | ||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a docblock, since this just got complicated. In particular explaining the behavior if the readers are None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a docstring here, and modified the docstring of the PipesBlobStoreMessageReader
base class with a description of the behavior for null readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just repeat that information here
@@ -231,3 +264,46 @@ def no_messages_debug_text(self) -> str: | |||
" PipesDbfsMessageWriter to be explicitly passed to open_dagster_pipes in the external" | |||
" process." | |||
) | |||
|
|||
|
|||
class PipesDbfsStdioReader(PipesBlobStoreStdioReader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be PipesDbfsTextIOReader
? Why Stdio
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stdio
because it derives from PipesBlobStoreStdioReader
, the purpose of which is to read stdout/stderr streams. Wouldn't be TextIO
because it doesn't operate on an arbitrary TextIO
object (it does write to one, which should typically be sys.stdout
or sys.stderr
.
python_modules/libraries/dagster-databricks/dagster_databricks_tests/test_pipes.py
Show resolved
Hide resolved
83f3d8c
to
73b846e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try the "NoOp" reader on for size,
|
||
|
||
class PipesS3MessageReader(PipesBlobStoreMessageReader): | ||
def __init__(self, *, interval: float = 10, bucket: str, client: boto3.client): | ||
super().__init__(interval=interval) | ||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just repeat that information here
time.sleep(5) | ||
time.sleep(30) # 30 seconds to make sure logs are flushed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make these named constants with comments etc
73b846e
to
f7ba024
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last change, but it is important
class PipesBlobStoreStdioReader(ABC): | ||
def __init__(self, *, interval: float = 10, target_stream: TextIO): | ||
self.interval = interval | ||
self.target_stream = target_stream | ||
self.thread: Optional[Thread] = None | ||
|
||
@abstractmethod | ||
def download_log_chunk(self, params: PipesParams) -> Optional[str]: ... | ||
|
||
@abstractmethod | ||
def log_exists(self, params: PipesParams) -> bool: ... | ||
|
||
def start(self, params: PipesParams, is_task_complete: Event) -> None: | ||
self.thread = Thread(target=self._reader_thread, args=(params, is_task_complete)) | ||
self.thread.start() | ||
|
||
def stop(self) -> None: | ||
if self.thread: | ||
self.thread.join() | ||
|
||
def _reader_thread( | ||
self, | ||
params: PipesParams, | ||
is_task_complete: Event, | ||
) -> None: | ||
start_or_last_download = datetime.datetime.now() | ||
while True: | ||
now = datetime.datetime.now() | ||
if (now - start_or_last_download).seconds > self.interval or is_task_complete.is_set(): | ||
start_or_last_download = now | ||
chunk = self.download_log_chunk(params) | ||
if chunk: | ||
self.target_stream.write(chunk) | ||
elif is_task_complete.is_set(): | ||
break | ||
time.sleep(self.interval) | ||
|
||
|
||
class PipesNoOpStdioReader(PipesBlobStoreStdioReader): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should inherit from a common pure interface, not the PipesBlobStoreStdioReader
that has a bunch stuff in it that will never get called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really feel that this is starting to overcomplicate the class hierarchy. We've dropped a few null checks, where IMO a None
value is more intuitively obvious for functionality (i.e. this doesn't read stdout/stderr at all), for a special NoOp class and now a one-off interface just to abstract out the common part between the NoOp and the (already abstract) variant that actually works?
I feel like I might be missing a broader vision here, do you see this interface being used elsewhere? And what should the name be? I would guess PipesBlobStoreStdioReader
would be the name of the interface, but then the working variant becomes... PipesBlobStoreThreadedStdioReader
?
Also, looking closely, I don't think in the current implementation there is anything in PipesBlobStoreStdioReader
that never gets called for no-op. Instead the no-op just always returns None
for the log chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point, I think probably we're going to want to restructure so there is just a top-level "Reader" class that composes a list of component readers (messages, stdout, stderr, maybe even other logs), rather than the current scenario where message reading is in the top-level but stderr/stdout is a component. So maybe hold off on more refactoring here for the moment?
f7ba024
to
283614f
Compare
I think you are overthinking this. Just add: class IPipesBlobStoreStdioReader(ABC):
@abstractmethod
def start(self, params: PipesParams, is_task_complete: Event) -> None: ...
@abstractmethod
def stop(self) --> None: ... And have both classes inherit from it. |
283614f
to
36002f1
Compare
OK, updated to make the base class more generic. |
def __init__( | ||
self, | ||
interval: float = 10, | ||
stdout_reader: Optional["PipesBlobStoreStdioReader"] = None, | ||
stderr_reader: Optional["PipesBlobStoreStdioReader"] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as users might be extending this, recommend putting check calls on stdout_reader
and stdout_writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
36002f1
to
204dcaa
Compare
## Summary & Motivation This adds stdout/stderr forwarding to the dagster-databricks pipes integration. It was a long road getting here with several dead ends. The current approach in this PR is to modify `PipesBlobStoreMessageReader` with `forward_{stdout,stderr}` boolean params and corresponding hooks for downloading stdout, stderr chunks. If `forward_{stdout,stderr}` is enabled, threads will be launched for the streams (alongside the message chunk thread) that periodically download stderr/stdout chunks and write them to the corresponding orchestration process streams. In the `PipesDbfsMessageReader`, instead of using an incrementing counter (as is used for messages), the stdout/stderr chunk downloaders are written to track a string index in the file corresponding to the stream. We repeatedly download the full file. Every time the file is downloaded, we only forward starting from the offset index. This approach of repeatedly downloading the full file and applying the offset only on the orchestration end can surely be improved to just download starting from the offset, but I did not implement that yet (there are some concerns around getting the indexing right given that the files are stored as base64, I think with padding). While it is possible that other integrations would need to make some changes on the pipes end, I didn't need to make any for databricks, because a DBFS location for `stdout`/`stderr` is configured when launching the job, so we don't need to do anything in the orchestration process. This introduces a potential asymmetry between the `PipesMessageReader` and `PipesMessageWriter`-- probably we will end up with just a `PipesReader`. There are definitely some other rough patches here: - Databricks does not let you directly configure the directory where stdout/stderr are written. Instead you set a root directory, and then logs get stored in `<root>/<cluster-id>/driver/{stdout,stderr}`. This introduces a difficulty because the cluster id does not exist until the job is launched (and you can't set it manually). Because the message reader gets set up before the job is launched, the message reader doesn't know where to look. - I got around this by setting the log root to a temporary directory and polling that directory for the first child to appear, which will be where the logs are stored. This is not ideal because users may want to retain the logs in DBFS. - Another approach would be to send the cluster id back in the new `opened` message, but to then thread this into the message reader requires additional plumbing work. For those who want to play with this, the workflow is to repeatedly run `dagster_databricks_tests/test_pipes.py::test_pipes_client`. This requires you to have `DATABRICKS_HOST` and `DATABRICKS_TOKEN` set in your env. `DATABRICKS_HOST` should be `https://dbc-07902917-6487.cloud.databricks.com`. `DATABRICKS_TOKEN` should be set to a value you generate by going to User Settings > Developer > Access Tokens in the Databricks UI. ## How I Tested These Changes Tested via `capsys` to make sure logs are forwarded.
Summary & Motivation
This adds stdout/stderr forwarding to the dagster-databricks pipes integration. It was a long road getting here with several dead ends.
The current approach in this PR is to modify
PipesBlobStoreMessageReader
withforward_{stdout,stderr}
boolean params and corresponding hooks for downloading stdout, stderr chunks. Ifforward_{stdout,stderr}
is enabled, threads will be launched for the streams (alongside the message chunk thread) that periodically download stderr/stdout chunks and write them to the corresponding orchestration process streams.In the
PipesDbfsMessageReader
, instead of using an incrementing counter (as is used for messages), the stdout/stderr chunk downloaders are written to track a string index in the file corresponding to the stream. We repeatedly download the full file. Every time the file is downloaded, we only forward starting from the offset index. This approach of repeatedly downloading the full file and applying the offset only on the orchestration end can surely be improved to just download starting from the offset, but I did not implement that yet (there are some concerns around getting the indexing right given that the files are stored as base64, I think with padding).While it is possible that other integrations would need to make some changes on the pipes end, I didn't need to make any for databricks, because a DBFS location for
stdout
/stderr
is configured when launching the job, so we don't need to do anything in the orchestration process. This introduces a potential asymmetry between thePipesMessageReader
andPipesMessageWriter
-- probably we will end up with just aPipesReader
.There are definitely some other rough patches here:
<root>/<cluster-id>/driver/{stdout,stderr}
. This introduces a difficulty because the cluster id does not exist until the job is launched (and you can't set it manually). Because the message reader gets set up before the job is launched, the message reader doesn't know where to look.opened
message, but to then thread this into the message reader requires additional plumbing work.For those who want to play with this, the workflow is to repeatedly run
dagster_databricks_tests/test_pipes.py::test_pipes_client
. This requires you to haveDATABRICKS_HOST
andDATABRICKS_TOKEN
set in your env.DATABRICKS_HOST
should behttps://dbc-07902917-6487.cloud.databricks.com
.DATABRICKS_TOKEN
should be set to a value you generate by going to User Settings > Developer > Access Tokens in the Databricks UI.How I Tested These Changes
Tested via
capsys
to make sure logs are forwarded.