Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Replace unarchiving function #6959

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,8 @@ jobs:
cache-dependency-glob: "**/dynamic-sidecar/requirements/ci.txt"
- name: show system version
run: ./ci/helpers/show_system_versions.bash
- name: install 7zip
run: sudo ./ci/github/helpers/install_7zip.bash
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this done here and not in the dynamic-sidecar.bash script?

- name: install
run: ./ci/github/unit-testing/dynamic-sidecar.bash install
- name: typecheck
Expand Down
12 changes: 12 additions & 0 deletions ci/github/helpers/install_7zip.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
#
# Installs the latest version of 7zip plugin
#

# http://redsymbol.net/articles/unofficial-bash-strict-mode/
set -o errexit # abort on nonzero exitstatus
set -o nounset # abort on unbound variable
set -o pipefail # don't hide errors within pipes
IFS=$'\n\t'

exec "$( dirname -- "$0"; )"/../../../scripts/install_7zip.bash
28 changes: 28 additions & 0 deletions scripts/install_7zip.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
#
# Installs 7zip
#

# http://redsymbol.net/articles/unofficial-bash-strict-mode/
set -o errexit # abort on nonzero exitstatus
set -o nounset # abort on unbound variable
set -o pipefail # don't hide errors within pipes
IFS=$'\n\t'


SEVEN_ZIP_VERSION="2409"
## 7z compression
echo "create install dir"
rm -rf /tmp/7zip
mkdir -p /tmp/7zip
cd /tmp/7zip

curl -LO https://www.7-zip.org/a/7z${SEVEN_ZIP_VERSION}-linux-x64.tar.xz
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: why not to have our own image with this already in place instead of downloading and installing it every time?

tar -xvf 7z${SEVEN_ZIP_VERSION}-linux-x64.tar.xz
cp 7zz /usr/bin/7z

echo "remove install dir"
rm -rf /tmp/7zip

echo "test installation"
7z --help
5 changes: 5 additions & 0 deletions services/dynamic-sidecar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ RUN \
apt-get update && \
apt-get install -y --no-install-recommends\
curl \
xz-utils \
gnupg \
lsb-release \
&& mkdir -p /etc/apt/keyrings \
Expand All @@ -56,6 +57,10 @@ RUN \
RUN \
--mount=type=bind,source=scripts/install_rclone.bash,target=install_rclone.bash \
./install_rclone.bash
# install 7zip
RUN \
--mount=type=bind,source=scripts/install_7zip.bash,target=install_7zip.bash \
./install_7zip.bash

RUN AWS_CLI_VERSION="2.11.11" \
&& curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip" -o "awscliv2.zip" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ class ContainerExecCommandFailedError(BaseDynamicSidecarError):
"Command '{command}' exited with code '{exit_code}'"
"and output: '{command_result}'"
)


class SevenZipError(BaseDynamicSidecarError):
msg_template = "Could not finish command: '{command}'\nReason: {command_result}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
msg_template = "Could not finish command: '{command}'\nReason: {command_result}"
msg_template = "Could not complete 7zip command: '{command}'\nReason: {command_result}"

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _close_transport(proc: Process):

async def async_command(
command: str,
timeout: float | None = None,
timeout: float | None = None, # noqa: ASYNC109
pipe_as_input: str | None = None,
env_vars: dict[str, str] | None = None,
) -> CommandResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from models_library.projects_nodes_io import NodeIDStr
from models_library.services_types import ServicePortKey
from pydantic import ByteSize
from servicelib.archiving_utils import PrunableFolder, archive_dir, unarchive_dir
from servicelib.archiving_utils import PrunableFolder, archive_dir
from servicelib.async_utils import run_sequentially_in_context
from servicelib.file_utils import remove_directory
from servicelib.logging_utils import log_context
Expand All @@ -36,6 +36,7 @@

from ..core.settings import ApplicationSettings, get_settings
from ..modules.notifications import PortNotifier
from .seven_zip_wrapper import unarchive_zip_to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it not make sense to put that one also in servicelib?
Also if you ensure compatible interfaces, that would even call for a common interface?



class PortTypeName(str, Enum):
Expand All @@ -46,7 +47,7 @@ class PortTypeName(str, Enum):
_FILE_TYPE_PREFIX = "data:"
_KEY_VALUE_FILE_NAME = "key_values.json"

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

# OUTPUTS section

Expand Down Expand Up @@ -95,7 +96,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
port_notifier: PortNotifier,
) -> None:
# pylint: disable=too-many-branches
logger.debug("uploading data to simcore...")
_logger.debug("uploading data to simcore...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this kind of logs, I guess you could use the log decorator and spare 1 line of code

start_time = time.perf_counter()

settings: ApplicationSettings = get_settings()
Expand Down Expand Up @@ -138,7 +139,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
if is_file_type(port.property_type):
src_folder = outputs_path / port.key
files_and_folders_list = list(src_folder.rglob("*"))
logger.debug("Discovered files to upload %s", files_and_folders_list)
_logger.debug("Discovered files to upload %s", files_and_folders_list)

if not files_and_folders_list:
ports_values[port.key] = (None, None)
Expand Down Expand Up @@ -213,9 +214,9 @@ async def _archive_dir_notified(
if port.key in data and data[port.key] is not None:
ports_values[port.key] = (data[port.key], None)
else:
logger.debug("Port %s not found in %s", port.key, data)
_logger.debug("Port %s not found in %s", port.key, data)
else:
logger.debug("No file %s to fetch port values from", data_file)
_logger.debug("No file %s to fetch port values from", data_file)

if archiving_tasks:
await limited_gather(*archiving_tasks, limit=4)
Expand All @@ -228,8 +229,8 @@ async def _archive_dir_notified(

elapsed_time = time.perf_counter() - start_time
total_bytes = sum(_get_size_of_value(x) for x in ports_values.values())
logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)
_logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
_logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)


# INPUTS section
Expand All @@ -243,14 +244,28 @@ def _is_zip_file(file_path: Path) -> bool:
_shutil_move = aiofiles.os.wrap(shutil.move)


async def _move_file_to_input_port(
final_path: Path, downloaded_file: Path, dest_folder: PrunableFolder
) -> None:
Comment on lines +247 to +249
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function named _move_file_to_input_port takes arguments that do not have any port in their name. even the function does not check anything port related...
actually I don't even really understand what dest_folder is? was that the original folder where the downloaded_file is living?

so this funciton

  • moves downloaded_file into final_path
  • then prunes dest_folder exluding final_path
    maybe a bit of refactoring is in order? at least with naming the arguments?

with log_context(_logger, logging.DEBUG, f"moving {downloaded_file}"):
final_path = final_path / downloaded_file.name
final_path.parent.mkdir(exist_ok=True, parents=True)

await _shutil_move(downloaded_file, final_path)

# NOTE: after the port content changes, make sure old files
# which are no longer part of the port, are removed
dest_folder.prune(exclude={final_path})


async def _get_data_from_port(
port: Port, *, target_dir: Path, progress_bar: ProgressBarData
) -> tuple[Port, ItemConcreteValue | None, ByteSize]:
async with progress_bar.sub_progress(
steps=2 if is_file_type(port.property_type) else 1,
description=IDStr("getting data"),
) as sub_progress:
with log_context(logger, logging.DEBUG, f"getting {port.key=}"):
with log_context(_logger, logging.DEBUG, f"getting {port.key=}"):
port_data = await port.get(sub_progress)

if is_file_type(port.property_type):
Expand All @@ -261,42 +276,36 @@ async def _get_data_from_port(
if not downloaded_file or not downloaded_file.exists():
# the link may be empty
# remove files all files from disk when disconnecting port
logger.debug("removing contents of dir %s", final_path)
await remove_directory(
final_path, only_children=True, ignore_errors=True
)
with log_context(
_logger, logging.DEBUG, f"removing contents of dir '{final_path}'"
):
await remove_directory(
final_path, only_children=True, ignore_errors=True
)
return port, None, ByteSize(0)

transferred_bytes = downloaded_file.stat().st_size

# in case of valid file, it is either uncompressed and/or moved to the final directory
with log_context(logger, logging.DEBUG, "creating directory"):
with log_context(_logger, logging.DEBUG, "creating directory"):
final_path.mkdir(exist_ok=True, parents=True)
port_data = f"{final_path}"
dest_folder = PrunableFolder(final_path)

if _is_zip_file(downloaded_file):
# unzip updated data to dest_path
logger.debug("unzipping %s", downloaded_file)
unarchived: set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file,
destination_folder=final_path,
progress_bar=sub_progress,
)

dest_folder.prune(exclude=unarchived)

logger.debug("all unzipped in %s", final_path)
with log_context(
_logger,
logging.DEBUG,
f"unzipping '{downloaded_file}' to {final_path}",
):
unarchived: set[Path] = await unarchive_zip_to(
downloaded_file, final_path, sub_progress
)
dest_folder.prune(exclude=unarchived)
else:
logger.debug("moving %s", downloaded_file)
final_path = final_path / Path(downloaded_file).name
await _shutil_move(str(downloaded_file), final_path)

# NOTE: after the download the current value of the port
# makes sure previously downloaded files are removed
dest_folder.prune(exclude={final_path})
await _move_file_to_input_port(final_path, downloaded_file, dest_folder)

logger.debug("all moved to %s", final_path)
_logger.debug("all moved to %s", final_path)
else:
transferred_bytes = sys.getsizeof(port_data)

Expand All @@ -312,7 +321,7 @@ async def download_target_ports(
progress_bar: ProgressBarData,
port_notifier: PortNotifier | None,
) -> ByteSize:
logger.debug("retrieving data from simcore...")
_logger.debug("retrieving data from simcore...")
start_time = time.perf_counter()

settings: ApplicationSettings = get_settings()
Expand Down Expand Up @@ -386,7 +395,7 @@ async def _get_date_from_port_notified(
data_file.write_text(json.dumps(data))

elapsed_time = time.perf_counter() - start_time
logger.info(
_logger.info(
"Downloaded %s in %s seconds",
total_transfered_bytes.human_readable(decimal=True),
elapsed_time,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio
import logging
import re
from pathlib import Path

from models_library.basic_types import IDStr
from servicelib.progress_bar import ProgressBarData

from ..core.errors import SevenZipError
from ..core.utils import async_command

_logger = logging.getLogger(__name__)


async def _get_file_count(zip_path: Path) -> int:
result = await async_command(f"7z l {zip_path}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there no long option names?

if not result.success:
raise SevenZipError(command=result.command, command_result=result.message)

match = re.search(r"\s*(\d+)\s*files", result.message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can create a variable that contains the compiled pattern for added performance, and also use named groups for added understanding. r"\s*(?P<file_count>\d+)\s*files" and then you can use match.group('file_count')
what also is possible is to use the parse library that makes it a bit more understandable as well. https://github.com/r1chardj0n3s/parse
and question, after file_count is there something no space? why not use + ?

return int(match.group().replace("files", "").strip() if match else "0")


async def unarchive_zip_to(
zip_path: Path,
output_dir: Path,
progress_bar: ProgressBarData | None = None,
) -> set[Path]:
if not progress_bar:
progress_bar = ProgressBarData(
num_steps=1, description=IDStr(f"extracting {zip_path.name}")
)

file_count = await _get_file_count(zip_path)

command = f"7z x {zip_path} -o{output_dir} -bb1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be careful not to end up with segmented paths in the CLI ...

Normally subprocess api allows you to pass the command as a list of arguments. Here I read that it has to be a str. So perhaps the way to go is to add quotes around the paths? (double check)

Something like

command = f"7z x \"{zip_path}\" -o \"{output_dir}\" -bb1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and long option names? 'x' '-o' --> '--output' ?
and it is indeed usually better to use the list of strings if possible

process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
assert process.stdout # nosec
assert process.stderr # nosec

async with progress_bar.sub_progress(
steps=file_count, description=IDStr("...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the first time I see using IDStr for descriptions ...
NOTE that:

  1. Calling IDStr("...") is equivalent to calling str("..."). Therefore in this context: it basically does nothing. To perform a validation you need to create a TypeAdapter(IDStgr).validate_python("...")` etc
  2. If you want to annotate a field . Then note that IDStr is intended for annotation of String IDentifiers (e.g. a prj_123456) and not descriptions! There are other constraints for e.g. ShortTruncatedStr or LongTruncatedStr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pcrespov I think this is a flaw in the ProgressBar class. I guess I did it wrong at the time. but indeed I would just remove that IDStr from there it makes no sense. But probably should go in a separate PR, maybe an issue for this would be nice.

) as sub_prog:

while True:
line = await process.stdout.readline()
if not line:
break

line_decoded = line.decode().strip()
if line_decoded.startswith("- "): # check file entry
await sub_prog.update(1)
Comment on lines +53 to +54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohla.. hope that is tested


await process.wait()
if process.returncode != 0:
stderr = await process.stderr.read()
raise SevenZipError(command=command, command_result=stderr.decode().strip())

return {x for x in output_dir.rglob("*") if x.is_file()}
Loading
Loading