Skip to content

Commit

Permalink
restructure model
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Dec 19, 2024
1 parent 571a411 commit da30d13
Showing 1 changed file with 79 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,80 +27,6 @@

_logger = logging.getLogger(__name__)


async def _run_cli_command(
command: str,
*,
output_handlers: list[Callable[[str], Awaitable[None]]] | None = None,
) -> None:
"""
Raises:
ArchiveError: when it fails to execute the command
"""

process = await asyncio.create_subprocess_shell(
command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)

async def read_stream(
stream, chunk_size: NonNegativeInt = _DEFAULT_CHUNK_SIZE
) -> str:
command_output = ""

# Initialize buffer to store lookbehind window
lookbehind_buffer = ""

undecodable_chunk: bytes | None = None

while True:
read_chunk = await stream.read(chunk_size)
if not read_chunk:
# Process remaining buffer if any
if lookbehind_buffer and output_handlers:
await asyncio.gather(
*[handler(lookbehind_buffer) for handler in output_handlers]
)
break

try:
if undecodable_chunk:
chunk = (undecodable_chunk + read_chunk).decode("utf-8")
undecodable_chunk = None
else:
chunk = read_chunk.decode("utf-8")
except UnicodeDecodeError:
undecodable_chunk = read_chunk
continue

command_output += chunk

# Combine lookbehind buffer with new chunk
chunk_to_emit = lookbehind_buffer + chunk

if output_handlers:
await asyncio.gather(
*[handler(chunk_to_emit) for handler in output_handlers]
)

# Keep last window_size characters for next iteration
lookbehind_buffer = chunk_to_emit[-chunk_size:]

return command_output

# Wait for the process to complete and all output to be processed
command_output, _ = await asyncio.gather(
asyncio.create_task(read_stream(process.stdout)),
process.wait(),
)

if process.returncode != os.EX_OK:
msg = f"Could not run '{command}' error: '{command_output}'"
raise ArchiveError(msg)


_TOTAL_BYTES_RE: Final[str] = r" (\d+)\s*bytes "
_FILE_COUNT_RE: Final[str] = r" (\d+)\s*files"
_PROGRESS_PERCENT_RE: Final[str] = r" (?:100|\d?\d)% "
Expand Down Expand Up @@ -183,6 +109,85 @@ async def parse_chunk(self, chunk: str) -> None:
self.finished_emitted = True


async def _output_reader(
stream,
*,
output_handlers: list[Callable[[str], Awaitable[None]]] | None,
chunk_size: NonNegativeInt = _DEFAULT_CHUNK_SIZE,
) -> str:
command_output = ""

# Initialize buffer to store lookbehind window
lookbehind_buffer = ""

undecodable_chunk: bytes | None = None

while True:
read_chunk = await stream.read(chunk_size)
if not read_chunk:
# Process remaining buffer if any
if lookbehind_buffer and output_handlers:
await asyncio.gather(
*[handler(lookbehind_buffer) for handler in output_handlers]
)
break

try:
if undecodable_chunk:
chunk = (undecodable_chunk + read_chunk).decode("utf-8")
undecodable_chunk = None
else:
chunk = read_chunk.decode("utf-8")
except UnicodeDecodeError:
undecodable_chunk = read_chunk
continue

command_output += chunk

# Combine lookbehind buffer with new chunk
chunk_to_emit = lookbehind_buffer + chunk

if output_handlers:
await asyncio.gather(
*[handler(chunk_to_emit) for handler in output_handlers]
)

# Keep last window_size characters for next iteration
lookbehind_buffer = chunk_to_emit[-chunk_size:]

return command_output


async def _run_cli_command(
command: str,
*,
output_handlers: list[Callable[[str], Awaitable[None]]] | None = None,
) -> None:
"""
Raises:
ArchiveError: when it fails to execute the command
"""

process = await asyncio.create_subprocess_shell(
command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)

# Wait for the process to complete and all output to be processed
command_output, _ = await asyncio.gather(
asyncio.create_task(
_output_reader(process.stdout, output_handlers=output_handlers)
),
process.wait(),
)

if process.returncode != os.EX_OK:
msg = f"Could not run '{command}' error: '{command_output}'"
raise ArchiveError(msg)


async def archive_dir(
dir_to_compress: Path,
destination: Path,
Expand Down

0 comments on commit da30d13

Please sign in to comment.