Skip to content

Commit

Permalink
♻️Legacy archives are always deleted as the owner of the project (ITI…
Browse files Browse the repository at this point in the history
  • Loading branch information
GitHK authored Oct 3, 2023
1 parent e4cef38 commit 295f20a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 54 deletions.
11 changes: 8 additions & 3 deletions packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ..node_ports_common import filemanager
from ..node_ports_common.constants import SIMCORE_LOCATION
from ..node_ports_common.dbmanager import DBManager
from ..node_ports_common.file_io_utils import LogRedirectCB

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -155,15 +156,20 @@ async def _state_metadata_entry_exists(


async def _delete_legacy_archive(
user_id: UserID, project_id: ProjectID, node_uuid: NodeID, path: Path
project_id: ProjectID, node_uuid: NodeID, path: Path
) -> None:
"""removes the .zip state archive from storage"""
s3_object = __create_s3_object_key(
project_id, node_uuid, __get_s3_name(path, is_archive=True)
)
_logger.debug("Deleting s3_object='%s' is archive", s3_object)

# NOTE: if service is opened by a person which the users shared it with,
# they will not have the permission to delete the node
# Removing it via it's owner allows to always have access to the delete operation.
owner_id = await DBManager().get_project_owner_user_id(project_id)
await filemanager.delete_file(
user_id=user_id, store_id=SIMCORE_LOCATION, s3_object=s3_object
user_id=owner_id, store_id=SIMCORE_LOCATION, s3_object=s3_object
)


Expand Down Expand Up @@ -203,7 +209,6 @@ async def push(

with log_context(_logger, logging.INFO, "removing legacy data archive"):
await _delete_legacy_archive(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
Expand Down
79 changes: 47 additions & 32 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
import logging
import os
import socket
from typing import Any

import aiopg.sa
import sqlalchemy as sa
import tenacity
from aiopg.sa.engine import Engine
from aiopg.sa.result import RowProxy
from models_library.projects import ProjectID
from models_library.users import UserID
from servicelib.common_aiopg_utils import DataSourceName, create_pg_engine
from servicelib.retry_policies import PostgresRetryPolicyUponInitialization
from simcore_postgres_database.models.comp_tasks import comp_tasks
from simcore_postgres_database.models.projects import projects
from simcore_postgres_database.utils_aiopg import (
close_engine,
raise_if_migration_not_ready,
)
from sqlalchemy import and_

from .exceptions import NodeNotFound
from .exceptions import NodeNotFound, ProjectNotFoundError
from .settings import NodePortsSettings

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,45 +114,56 @@ async def write_ports_configuration(
log.debug(message)

node_configuration = json.loads(json_configuration)
async with DBContextManager(self._db_engine) as engine:
async with engine.acquire() as connection:
# update the necessary parts
await connection.execute(
# FIXME: E1120:No value for argument 'dml' in method call
# pylint: disable=E1120
comp_tasks.update()
.where(
and_(
comp_tasks.c.node_id == node_uuid,
comp_tasks.c.project_id == project_id,
)
)
.values(
schema=node_configuration["schema"],
inputs=node_configuration["inputs"],
outputs=node_configuration["outputs"],
run_hash=node_configuration.get("run_hash"),
async with DBContextManager(
self._db_engine
) as engine, engine.acquire() as connection:
# update the necessary parts
await connection.execute(
comp_tasks.update()
.where(
and_(
comp_tasks.c.node_id == node_uuid,
comp_tasks.c.project_id == project_id,
)
)
.values(
schema=node_configuration["schema"],
inputs=node_configuration["inputs"],
outputs=node_configuration["outputs"],
run_hash=node_configuration.get("run_hash"),
)
)

async def get_ports_configuration_from_node_uuid(
self, project_id: str, node_uuid: str
) -> str:
log.debug(
"Getting ports configuration of node %s from comp_tasks table", node_uuid
)
async with DBContextManager(self._db_engine) as engine:
async with engine.acquire() as connection:
node: RowProxy = await _get_node_from_db(
project_id, node_uuid, connection
)
node_json_config = json.dumps(
{
"schema": node.schema,
"inputs": node.inputs,
"outputs": node.outputs,
"run_hash": node.run_hash,
}
)
async with DBContextManager(
self._db_engine
) as engine, engine.acquire() as connection:
node: RowProxy = await _get_node_from_db(project_id, node_uuid, connection)
node_json_config = json.dumps(
{
"schema": node.schema,
"inputs": node.inputs,
"outputs": node.outputs,
"run_hash": node.run_hash,
}
)
log.debug("Found and converted to json")
return node_json_config

async def get_project_owner_user_id(self, project_id: ProjectID) -> UserID:
async with DBContextManager(
self._db_engine
) as engine, engine.acquire() as connection:
prj_owner: Any | None = await connection.scalar(
sa.select(projects.c.prj_owner).where(
projects.c.uuid == f"{project_id}"
)
)
if prj_owner is None:
raise ProjectNotFoundError(project_id)
return UserID(prj_owner)
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
#
#

from typing import Optional


class NodeportsException(Exception):
"""Basic exception for errors raised in nodeports"""

def __init__(self, msg: Optional[str] = None):
def __init__(self, msg: str | None = None):
super().__init__(msg or "An error occured in simcore")


Expand All @@ -26,23 +24,23 @@ def __init__(self, obj):
class UnboundPortError(NodeportsException, IndexError):
"""Accessed port is not configured"""

def __init__(self, port_index, msg: Optional[str] = None):
def __init__(self, port_index, msg: str | None = None):
super().__init__(f"No port bound at index {port_index}")
self.port_index = port_index


class InvalidKeyError(NodeportsException):
"""Accessed key does not exist"""

def __init__(self, item_key: str, msg: Optional[str] = None):
def __init__(self, item_key: str, msg: str | None = None):
super().__init__(f"No port bound with key {item_key}")
self.item_key = item_key


class InvalidItemTypeError(NodeportsException):
"""Item type incorrect"""

def __init__(self, item_type: str, item_value: str, msg: Optional[str] = None):
def __init__(self, item_type: str, item_value: str, msg: str | None = None):
super().__init__(
msg
or f"Invalid item type, value [{item_value}] does not qualify as type [{item_type}]"
Expand All @@ -54,7 +52,7 @@ def __init__(self, item_type: str, item_value: str, msg: Optional[str] = None):
class InvalidProtocolError(NodeportsException):
"""Invalid protocol used"""

def __init__(self, dct, msg: Optional[str] = None):
def __init__(self, dct, msg: str | None = None):
super().__init__(f"Invalid protocol used: {dct} [{msg}]")
self.dct = dct

Expand All @@ -70,7 +68,7 @@ class StorageServerIssue(NodeportsException):
class S3TransferError(NodeportsException):
"""S3 transfer error"""

def __init__(self, msg: Optional[str] = None):
def __init__(self, msg: str | None = None):
super().__init__(msg or "Error while transferring to/from S3 storage")


Expand Down Expand Up @@ -126,6 +124,14 @@ def __init__(self, node_uuid):
super().__init__(f"the node id {node_uuid} was not found")


class ProjectNotFoundError(NodeportsException):
"""The given node_uuid was not found"""

def __init__(self, project_id):
self.project_id = project_id
super().__init__(f"the {project_id=} was not found")


class SymlinkToSymlinkIsNotUploadableException(NodeportsException):
"""Not possible to upload a symlink to a symlink"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _empty_path(path: Path) -> None:
def _get_file_hashes_in_path(path_to_hash: Path) -> set[tuple[Path, str]]:
def _hash_path(path: Path):
sha256_hash = hashlib.sha256()
with open(path, "rb") as f:
with Path.open(path, "rb") as f:
# Read and update hash string value in blocks of 4K
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
Expand Down Expand Up @@ -156,7 +156,7 @@ async def test_valid_upload_download(
mock_io_log_redirect_cb: LogRedirectCB,
):
async with ProgressBarData(steps=2) as progress_bar:
await data_manager._push_directory(
await data_manager._push_directory( # noqa: SLF001
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
Expand All @@ -166,13 +166,15 @@ async def test_valid_upload_download(
r_clone_settings=r_clone_settings,
)
# pylint: disable=protected-access
assert progress_bar._continuous_progress_value == pytest.approx(1.0)
assert progress_bar._continuous_progress_value == pytest.approx( # noqa: SLF001
1.0
)

uploaded_hashes = _get_file_hashes_in_path(content_path)

_empty_path(content_path)

await data_manager._pull_directory(
await data_manager._pull_directory( # noqa: SLF001
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
Expand All @@ -181,7 +183,9 @@ async def test_valid_upload_download(
r_clone_settings=r_clone_settings,
progress_bar=progress_bar,
)
assert progress_bar._continuous_progress_value == pytest.approx(2.0)
assert progress_bar._continuous_progress_value == pytest.approx( # noqa: SLF001
2.0
)

downloaded_hashes = _get_file_hashes_in_path(content_path)

Expand All @@ -207,7 +211,7 @@ async def test_valid_upload_download_saved_to(
mock_io_log_redirect_cb: LogRedirectCB,
):
async with ProgressBarData(steps=2) as progress_bar:
await data_manager._push_directory(
await data_manager._push_directory( # noqa: SLF001
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
Expand All @@ -227,7 +231,7 @@ async def test_valid_upload_download_saved_to(

new_destination = random_tmp_dir_generator(is_file=content_path.is_file())

await data_manager._pull_directory(
await data_manager._pull_directory( # noqa: SLF001
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
Expand Down Expand Up @@ -290,7 +294,7 @@ async def test_delete_legacy_archive(
)

assert (
await data_manager._state_metadata_entry_exists(
await data_manager._state_metadata_entry_exists( # noqa: SLF001
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
Expand All @@ -300,15 +304,14 @@ async def test_delete_legacy_archive(
is True
)

await data_manager._delete_legacy_archive(
user_id=user_id,
await data_manager._delete_legacy_archive( # noqa: SLF001
project_id=project_id,
node_uuid=node_uuid,
path=content_path,
)

assert (
await data_manager._state_metadata_entry_exists(
await data_manager._state_metadata_entry_exists( # noqa: SLF001
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
Expand Down

0 comments on commit 295f20a

Please sign in to comment.