Skip to content

Commit

Permalink
simplify outputs parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Dec 19, 2024
1 parent da30d13 commit 9ca1090
Showing 1 changed file with 15 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
_FILE_COUNT_RE: Final[str] = r" (\d+)\s*files"
_PROGRESS_PERCENT_RE: Final[str] = r" (?:100|\d?\d)% "
_ALL_DONE_RE: Final[str] = r"Everything is Ok"
# NOTE: the size of `chunk_to_emit` should not be too big nor too small otherwise it might skip some updates
# NOTE: the size of `chunk_to_emit` should in theory contain everything that above regexes capture
_DEFAULT_CHUNK_SIZE: Final[NonNegativeInt] = 20


Expand Down Expand Up @@ -115,45 +115,36 @@ async def _output_reader(
output_handlers: list[Callable[[str], Awaitable[None]]] | None,
chunk_size: NonNegativeInt = _DEFAULT_CHUNK_SIZE,
) -> str:
command_output = ""
# NOTE: we do not read line by line but chunk by chunk otherwise we'd miss progress updates
# the key is to read the smallest possible chunks of data so that the progress can be properly parsed
if output_handlers is None:
output_handlers = []

# Initialize buffer to store lookbehind window
lookbehind_buffer = ""
command_output = ""

undecodable_chunk: bytes | None = None
lookbehind_buffer = "" # store the last chunk

while True:
read_chunk = await stream.read(chunk_size)
if not read_chunk:
# Process remaining buffer if any
if lookbehind_buffer and output_handlers:
if lookbehind_buffer:
await asyncio.gather(
*[handler(lookbehind_buffer) for handler in output_handlers]
)
break

try:
if undecodable_chunk:
chunk = (undecodable_chunk + read_chunk).decode("utf-8")
undecodable_chunk = None
else:
chunk = read_chunk.decode("utf-8")
except UnicodeDecodeError:
undecodable_chunk = read_chunk
continue
# `errors=replace`: avoids getting stuck when can't parse utf-8
chunk = read_chunk.decode("utf-8", errors="replace")

command_output += chunk

# Combine lookbehind buffer with new chunk
chunk_to_emit = lookbehind_buffer + chunk

if output_handlers:
await asyncio.gather(
*[handler(chunk_to_emit) for handler in output_handlers]
)

# Keep last window_size characters for next iteration
lookbehind_buffer = chunk_to_emit[-chunk_size:]
lookbehind_buffer = chunk_to_emit[-len(chunk) :]

await asyncio.gather(*[handler(chunk_to_emit) for handler in output_handlers])

return command_output

Expand Down Expand Up @@ -258,6 +249,7 @@ async def unarchive_dir(
num_steps=1, description=IDStr(f"extracting {archive_to_extract.name}")
)

# get archive information
archive_info_parser = ArchiveInfoParser()
await _run_cli_command(
f"7z l {archive_to_extract}",
Expand All @@ -279,6 +271,7 @@ async def unarchive_dir(
)
)

# extract archive
async def progress_handler(byte_progress: NonNegativeInt) -> None:
if tqdm_progress.update(byte_progress) and log_cb:
with log_catch(_logger, reraise=False):
Expand Down

0 comments on commit 9ca1090

Please sign in to comment.