Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 unarchiving_utils are now based on 7zip cli #6959

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Changes from 2 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
97fd911
move archive to port instead of failing
Dec 12, 2024
29533e9
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 12, 2024
ca87d0b
using log context and rename
Dec 13, 2024
7f3e41e
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 13, 2024
be58b0c
refactor to use less generic error UnsupportedArchiveFormat
Dec 13, 2024
53e0728
renaming error
Dec 13, 2024
65b5291
replace unarchiver
Dec 16, 2024
e6d118d
remove unused
Dec 16, 2024
2a2f917
remove comment
Dec 16, 2024
56d7fe1
refactor 7zip
Dec 16, 2024
9c82bc5
fixed broken install
Dec 16, 2024
5a4cfb5
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 16, 2024
0f315e0
refactor
Dec 16, 2024
7ede41a
mypy
Dec 16, 2024
9d67dd1
added extra test
Dec 16, 2024
0f1642d
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 17, 2024
71adc5b
move to folder
Dec 17, 2024
01ce053
small enhancements in tests
Dec 17, 2024
79c1648
restructure package
Dec 17, 2024
02c9497
modules renaming
Dec 17, 2024
c01e35a
refactor
Dec 17, 2024
13c1485
fix refactor
Dec 17, 2024
653e168
added decompress progress
Dec 17, 2024
6dfcd95
fixed issues with parsing progress
Dec 17, 2024
c3b0c1c
restructure tests with common utils
Dec 19, 2024
56d1871
added notes to deprecate
Dec 19, 2024
30346af
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 19, 2024
a9145e2
remove unwanted code
Dec 19, 2024
46a0c29
remove unused
Dec 19, 2024
c04999c
removed unused feature
Dec 19, 2024
d48de81
refactor to drop relative_paths since it's always True
Dec 19, 2024
9ff2374
added tqdm progress
Dec 19, 2024
0364e8b
refactor common code
Dec 19, 2024
7205c2f
connected progressbar
Dec 19, 2024
79cc7e8
added dropin interface
Dec 19, 2024
e69bc9a
remove old unused interface
Dec 19, 2024
e282c95
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 19, 2024
7279155
removes unused repro-zipfile
Dec 19, 2024
571a411
refactor imports
Dec 19, 2024
da30d13
restructure model
Dec 19, 2024
9ca1090
simplify outputs parsing
Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from models_library.projects_nodes_io import NodeIDStr
from models_library.services_types import ServicePortKey
from pydantic import ByteSize
from servicelib.archiving_utils import PrunableFolder, archive_dir, unarchive_dir
from servicelib.archiving_utils import (
ArchiveError,
PrunableFolder,
archive_dir,
unarchive_dir,
)
from servicelib.async_utils import run_sequentially_in_context
from servicelib.file_utils import remove_directory
from servicelib.logging_utils import log_context
Expand All @@ -46,7 +51,7 @@ class PortTypeName(str, Enum):
_FILE_TYPE_PREFIX = "data:"
_KEY_VALUE_FILE_NAME = "key_values.json"

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

# OUTPUTS section

Expand Down Expand Up @@ -95,7 +100,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
port_notifier: PortNotifier,
) -> None:
# pylint: disable=too-many-branches
logger.debug("uploading data to simcore...")
_logger.debug("uploading data to simcore...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this kind of logs, I guess you could use the log decorator and spare 1 line of code

start_time = time.perf_counter()

settings: ApplicationSettings = get_settings()
Expand Down Expand Up @@ -138,7 +143,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
if is_file_type(port.property_type):
src_folder = outputs_path / port.key
files_and_folders_list = list(src_folder.rglob("*"))
logger.debug("Discovered files to upload %s", files_and_folders_list)
_logger.debug("Discovered files to upload %s", files_and_folders_list)

if not files_and_folders_list:
ports_values[port.key] = (None, None)
Expand Down Expand Up @@ -213,9 +218,9 @@ async def _archive_dir_notified(
if port.key in data and data[port.key] is not None:
ports_values[port.key] = (data[port.key], None)
else:
logger.debug("Port %s not found in %s", port.key, data)
_logger.debug("Port %s not found in %s", port.key, data)
else:
logger.debug("No file %s to fetch port values from", data_file)
_logger.debug("No file %s to fetch port values from", data_file)

if archiving_tasks:
await limited_gather(*archiving_tasks, limit=4)
Expand All @@ -228,8 +233,8 @@ async def _archive_dir_notified(

elapsed_time = time.perf_counter() - start_time
total_bytes = sum(_get_size_of_value(x) for x in ports_values.values())
logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)
_logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
_logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)


# INPUTS section
Expand All @@ -243,14 +248,32 @@ def _is_zip_file(file_path: Path) -> bool:
_shutil_move = aiofiles.os.wrap(shutil.move)


async def _move_file_to_input_port(
final_path: Path, downloaded_file: Path, dest_folder: PrunableFolder
) -> None:
Comment on lines +245 to +247
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function named _move_file_to_input_port takes arguments that do not have any port in their name. even the function does not check anything port related...
actually I don't even really understand what dest_folder is? was that the original folder where the downloaded_file is living?

so this funciton

  • moves downloaded_file into final_path
  • then prunes dest_folder exluding final_path
    maybe a bit of refactoring is in order? at least with naming the arguments?

_logger.debug("moving %s", downloaded_file)
GitHK marked this conversation as resolved.
Show resolved Hide resolved
final_path = final_path / downloaded_file.name

# ensure parent exists
final_path.parent.mkdir(exist_ok=True, parents=True)

await _shutil_move(downloaded_file, final_path)

# NOTE: after the download the current value of the port
# makes sure previously downloaded files are removed
dest_folder.prune(exclude={final_path})
GitHK marked this conversation as resolved.
Show resolved Hide resolved

_logger.debug("file moved to %s", final_path)


async def _get_data_from_port(
port: Port, *, target_dir: Path, progress_bar: ProgressBarData
) -> tuple[Port, ItemConcreteValue | None, ByteSize]:
async with progress_bar.sub_progress(
steps=2 if is_file_type(port.property_type) else 1,
description=IDStr("getting data"),
) as sub_progress:
with log_context(logger, logging.DEBUG, f"getting {port.key=}"):
with log_context(_logger, logging.DEBUG, f"getting {port.key=}"):
port_data = await port.get(sub_progress)

if is_file_type(port.property_type):
Expand All @@ -261,7 +284,7 @@ async def _get_data_from_port(
if not downloaded_file or not downloaded_file.exists():
# the link may be empty
# remove files all files from disk when disconnecting port
logger.debug("removing contents of dir %s", final_path)
_logger.debug("removing contents of dir %s", final_path)
GitHK marked this conversation as resolved.
Show resolved Hide resolved
await remove_directory(
final_path, only_children=True, ignore_errors=True
)
Expand All @@ -270,33 +293,38 @@ async def _get_data_from_port(
transferred_bytes = downloaded_file.stat().st_size

# in case of valid file, it is either uncompressed and/or moved to the final directory
with log_context(logger, logging.DEBUG, "creating directory"):
with log_context(_logger, logging.DEBUG, "creating directory"):
final_path.mkdir(exist_ok=True, parents=True)
port_data = f"{final_path}"
dest_folder = PrunableFolder(final_path)

if _is_zip_file(downloaded_file):
# unzip updated data to dest_path
logger.debug("unzipping %s", downloaded_file)
unarchived: set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file,
destination_folder=final_path,
progress_bar=sub_progress,
)

dest_folder.prune(exclude=unarchived)
_logger.debug("unzipping %s", downloaded_file)
try:
unarchived: set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file,
destination_folder=final_path,
progress_bar=sub_progress,
)
dest_folder.prune(exclude=unarchived)

_logger.debug("all unzipped in %s", final_path)
except ArchiveError:
_logger.warning(
GitHK marked this conversation as resolved.
Show resolved Hide resolved
"Could not extract archive '%s' to '%s' moving it to: '%s'",
downloaded_file,
final_path,
final_path / downloaded_file.name,
)
await _move_file_to_input_port(
final_path, downloaded_file, dest_folder
)

logger.debug("all unzipped in %s", final_path)
else:
logger.debug("moving %s", downloaded_file)
final_path = final_path / Path(downloaded_file).name
await _shutil_move(str(downloaded_file), final_path)

# NOTE: after the download the current value of the port
# makes sure previously downloaded files are removed
dest_folder.prune(exclude={final_path})
await _move_file_to_input_port(final_path, downloaded_file, dest_folder)

logger.debug("all moved to %s", final_path)
_logger.debug("all moved to %s", final_path)
else:
transferred_bytes = sys.getsizeof(port_data)

Expand All @@ -312,7 +340,7 @@ async def download_target_ports(
progress_bar: ProgressBarData,
port_notifier: PortNotifier | None,
) -> ByteSize:
logger.debug("retrieving data from simcore...")
_logger.debug("retrieving data from simcore...")
start_time = time.perf_counter()

settings: ApplicationSettings = get_settings()
Expand Down Expand Up @@ -386,7 +414,7 @@ async def _get_date_from_port_notified(
data_file.write_text(json.dumps(data))

elapsed_time = time.perf_counter() - start_time
logger.info(
_logger.info(
"Downloaded %s in %s seconds",
total_transfered_bytes.human_readable(decimal=True),
elapsed_time,
Expand Down
Loading