Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typehints, missing documentation & formatting #6

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 214 additions & 95 deletions streamcapture/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
`monkeypatch` optional parameter to the constructor. When enabled, the workaround
overwrites `stream.write(...)` by an implementation that sends everything to `os.write(self.fd,...)`.
This workaround is enabled when `monkeypatch=True` and disabled when `monkeypatch=False`.
The default is `monkeypatch=None`, in which case monkeypatching is enabled only when
The default is `monkeypatch=None`, in which case monkeypatching is enabled only when
`platform.system()=='Windows'`.

When writing to multiple streams and file descriptors, sometimes the order in which the writes
Expand Down Expand Up @@ -99,111 +99,230 @@
import sys, streamcapture
writer = streamcapture.Writer(open('logfile.txt','wb'),2)
with streamcapture.StreamCapture(sys.stdout,writer), streamcapture.StreamCapture(sys.stderr,writer):
print("This goes to stdout and is captured to logfile.txt")
print("This goes to stderr and is also captured to logfile.txt",file=sys.stderr)
print("This goes to stdout and is captured to logfile.txt")
print("This goes to stderr and is also captured to logfile.txt",file=sys.stderr)
```

In the above example, writer will be closed twice: once from the `StreamCapture(sys.stdout,...)`
object, and once from the `StreamCapture(sys.stderr,...)` object. Correspondingly, the `count` parameter
of the `streamcapture.Writer` was set to `2`, so that the underlying stream is only closed after 2
calls to `writer.close()`.
"""
import io
import os, threading, platform
from types import TracebackType
from typing import Optional, Callable, Union, Type, TextIO

import os, sys, threading, platform, select

class Writer:
def __init__(self,stream,count = None,lock_write = False):
"""`Writer` constructor."""
(self.stream,self.lock_write) = (stream,lock_write)
if count is None:
(self.count,self.increment) = (0,1)
else:
(self.count,self.increment) = (count,0)
self.lock = threading.Lock()
self._write = self.locked_write if lock_write else stream.write
def write_from(self,data,cap):
self._write(data)
def writer_open(self):
with self.lock:
self.count += self.increment
def close(self):
"""When one is done using a `Writer`, one calls `Writer.close()`. This acquires `Writer.lock` so it is
thread-safe. Each time `Writer.close()` is called, `Writer.count` is decremented. When `Writer.count`
reaches `0`, `stream.close()` is called."""
with self.lock:
self.count -= 1
if self.count>0:
return
self.stream.close()
def locked_write(self,z):
with self.lock:
self.stream.write(z)
stream: io.IOBase
count: int
increment: int
lock: threading.Lock
_write: Callable[[bytes], int]

def __init__(self, stream: io.IOBase, count: Optional[int] = None, lock_write: bool = False):
"""`Writer` constructor.

Wrapper of a stream to which bytes may be written. Introduces an optional lock for which write which
may be enabled through `lock_write`.

:param stream: The stream to wrap.
:param count: The starting number of users of this writer.
:param lock_write: Grab the lock before each write operation.
"""
(self.stream, self.lock_write) = (stream, lock_write)
if count is None:
(self.count, self.increment) = (0, 1)
else:
(self.count, self.increment) = (count, 0)
self.lock = threading.Lock()
self._write = self.locked_write if lock_write else stream.write # type: ignore[assignment]

def write_from(self, data: bytes, cap: 'FDCapture') -> int:
"""Perform a write operation.

:param data: The bytes to write.
:param cap: Unused. Remains for legacy purposes.

:return: The amount of bytes written.
"""
return self._write(data)

def writer_open(self) -> None:
"""Register that the writer is used."""
with self.lock:
self.count += self.increment

def close(self) -> None:
"""Closes the writer and the underlying stream

When one is done using a `Writer`, one calls `Writer.close()`. This acquires `Writer.lock` so it is
thread-safe. Each time `Writer.close()` is called, `Writer.count` is decremented. When `Writer.count`
reaches `0`, `stream.close()` is called.
"""
with self.lock:
self.count -= 1
if self.count > 0:
return
self.stream.close()

def locked_write(self, z: bytes) -> int:
"""Perform the write operation in a thread-safe manner.

:param z: Bytes to write.
:return: Return the amount of bytes written
"""
with self.lock:
written = self.stream.write(z)
return written


class FDCapture:
def __init__(self,fd,writer,echo=True,magic=b'\x04\x81\x00\xff'):
"""`FDCapture` constructor."""
if(hasattr(writer,'writer_open')):
writer.writer_open()
(self.active, self.writer, self.fd, self.echo, self.magic) = (True,writer,fd,echo,magic)
self.write = (lambda data: self.writer.write_from(data,self)) if hasattr(writer,'write_from') else writer.write
(self.pipe_read_fd, self.pipe_write_fd) = os.pipe()
self.dup_fd = os.dup(fd)
os.dup2(self.pipe_write_fd,fd)
self.thread = threading.Thread(target=self.printer)
self.thread.start()
def printer(self):
"""This is the thread that listens to the pipe output and passes it to the writer stream."""
try:
looping = True
while looping:
data = os.read(self.pipe_read_fd,100000)
foo = data.split(self.magic)

if len(foo)>=2:
looping = False

for segment in foo:
if len(segment) == 0:
# Pipe is closed
looping = False
break
self.write(segment)
if self.echo:
os.write(self.dup_fd,segment)
finally:
os.close(self.pipe_read_fd)
def close(self):
"""When you want to "uncapture" a stream, use this method."""
if not self.active:
return
self.active = False
os.write(self.fd,self.magic)
self.thread.join()
os.dup2(self.dup_fd,self.fd)
os.close(self.pipe_write_fd)
os.close(self.dup_fd)

def __enter__(self):
return self
def __exit__(self,a,b,c):
self.close()
"""Redirect all output from a file descriptor and write it to `writer`."""

active: bool
writer: Union[io.IOBase, Writer]
fd: int
echo: bool
magic: bytes
write: Callable[[bytes], int]

pipe_read_fd: int
pipe_write_fd: int
dup_fd: int
"""Placeholder filedescriptor where the stream originally wrote to."""
thread: threading.Thread

def __init__(
self,
fd: int,
writer: Union[io.IOBase, Writer],
echo: bool,
magic: bytes = b"\x04\x81\x00\xff",
):
"""`FDCapture` constructor.

:param fd: The filedescriptor to capture.
:param writer: Any bytes received from `fd` are written to this writer.
:param echo: Enable to also write bytes received to `fd` as well.
:param magic: The magic packet which denotes that the capturing process should stop.
"""
if hasattr(writer, "writer_open"):
writer.writer_open()
(self.active, self.writer, self.fd, self.echo, self.magic) = (True, writer, fd, echo, magic)
self.write = (
(lambda data: self.writer.write_from(data, self)) # type: ignore[union-attr, assignment]
if hasattr(writer, "write_from")
else writer.write
)
(self.pipe_read_fd, self.pipe_write_fd) = os.pipe()
self.dup_fd = os.dup(fd)
os.dup2(self.pipe_write_fd, fd)
self.thread = threading.Thread(target=self.printer)
self.thread.start()

def printer(self):
"""This is the thread that listens to the pipe output and passes it to the writer stream."""
try:
looping = True
while looping:
data = os.read(self.pipe_read_fd, 100000)
foo = data.split(self.magic)

# magic segment was found in data
if len(foo) >= 2:
looping = False

for segment in foo:
# Pipe is closed
if len(segment) == 0:
looping = False
break
self.write(segment)
if self.echo:
os.write(self.dup_fd, segment)
finally:
os.close(self.pipe_read_fd)

def close(self):
"""When you want to "uncapture" a stream, use this method."""
if not self.active:
return
self.active = False

os.write(self.fd, self.magic)
self.thread.join()
os.dup2(self.dup_fd, self.fd)
os.close(self.pipe_write_fd)
os.close(self.dup_fd)

def __enter__(self):
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()


class StreamCapture:
def __init__(self,stream,writer,echo=True,monkeypatch=None):
"""The `StreamCapture` constructor."""
self.fdcapture = FDCapture(stream.fileno(),writer,echo)
self.stream = stream
self.monkeypatch = platform.system()=='Windows' if monkeypatch is None else monkeypatch
if self.monkeypatch:
self.oldwrite = stream.write
stream.buffer.write = lambda z: os.write(stream.fileno(), z)
def close(self):
"""When you want to "uncapture" a stream, use this method."""
self.stream.flush()
self.fdcapture.close()
if self.monkeypatch:
self.stream.write = self.oldwrite
def __enter__(self):
return self
def __exit__(self,a,b,c):
self.close()
"""Interface for users to redirect a stream to another `io.IOBase`"""

fdcapture: FDCapture
stream: Union[io.IOBase, TextIO]
monkeypatch: bool
oldwrite: Optional[Callable[[Union[bytes, str]], None]]

def __init__(
self,
stream_to_redirect: Union[io.IOBase, TextIO],
writer: io.IOBase,
echo: bool = True,
monkeypatch: Optional[bool] = None,
) -> None:
"""The `StreamCapture` constructor.

:param stream_to_redirect: Stream which will be redirected.
:param writer: The stream will be redirected to this writer. It must derive from io.IOBase.
:param echo: If the redirected stream should also write any output to the original stream.
:param monkeypatch: If monkeypatching is necessary. Default is None which will perform
the monkeypatch in case this is run on Windows. Otherwise, the value of monkeypatch
is used.
"""
self.fdcapture = FDCapture(stream_to_redirect.fileno(), writer, echo)
self.stream = stream_to_redirect
self.monkeypatch = platform.system() == "Windows" if monkeypatch is None else monkeypatch
if self.monkeypatch:
self.oldwrite = stream_to_redirect.write # type: ignore[assignment]
stream_to_redirect.write = lambda z: os.write( # type: ignore[method-assign]
stream_to_redirect.fileno(), z.encode() if hasattr(z, "encode") else z
)
else:
self.oldwrite = None

def close(self) -> None:
"""When you want to "uncapture" a stream, use this method."""
self.stream.flush()
self.fdcapture.close()
if self.monkeypatch:
self.stream.write = self.oldwrite # type: ignore[assignment,method-assign]

def __enter__(self):
"""Start the stream redirect as a contextmanager."""
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Stop the stream redirect as a contextmanager.

Same as running StreamCapture.close()
"""
self.close()