From fc596eb8795c75c30108c28e9022e0f92d603d76 Mon Sep 17 00:00:00 2001 From: Alexander Goscinski Date: Wed, 20 Nov 2024 12:00:44 +0100 Subject: [PATCH] fixing format, linter, mypy --- aiida_restapi/routers/nodes.py | 84 ++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 35 deletions(-) diff --git a/aiida_restapi/routers/nodes.py b/aiida_restapi/routers/nodes.py index ecec5ce..45b34fd 100644 --- a/aiida_restapi/routers/nodes.py +++ b/aiida_restapi/routers/nodes.py @@ -4,7 +4,7 @@ import os import tempfile from pathlib import Path -from typing import Any, List, Optional +from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Union from aiida import orm from aiida.cmdline.utils.decorators import with_dbenv @@ -17,6 +17,9 @@ from .auth import get_current_active_user +if TYPE_CHECKING: + from aiida.engine.processes.ports import PortNamespace + router = APIRouter() @@ -131,7 +134,7 @@ async def create_upload_file( return models.Node.from_orm(orm_object) -from typing import Any +from typing import Any # noqa: E402 @router.post('/nodes/full_types', response_model=dict[str, Any]) @@ -142,9 +145,8 @@ async def get_full_types() -> dict[str, Any]: ### TODO move to identifiers -def construct_full_type(node_type, process_type): +def construct_full_type(node_type: str, process_type: str) -> str: """Return the full type, which uniquely identifies any `Node` with the given `node_type` and `process_type`. - :param node_type: the `node_type` of the `Node` :param process_type: the `process_type` of the `Node` :return: the full type, which is a unique identifier @@ -158,7 +160,7 @@ def construct_full_type(node_type, process_type): return f'{node_type}{FULL_TYPE_CONCATENATOR}{process_type}' -from collections.abc import MutableMapping +from collections.abc import MutableMapping # noqa: E402 FULL_TYPE_CONCATENATOR = '|' LIKE_OPERATOR_CHARACTER = '%' @@ -171,7 +173,7 @@ class Namespace(MutableMapping): namespace_separator = '.' # Very ugly ad-hoc mapping of `path` to `label` for the non-leaf entries in the nested `Namespace` mapping: - mapping_path_to_label = { + mapping_path_to_label: dict[Union[str, None], str] = { 'node': 'Node', 'node.data': 'Data', 'node.process': 'Process', @@ -201,24 +203,33 @@ class Namespace(MutableMapping): 'process.workflow.workchain.': 'process.workflow.workchain.WorkChainNode.|{plugin_name}.%', } - def __str__(self): + def __str__(self) -> str: import json return json.dumps(self.get_description(), sort_keys=True, indent=4) - def __init__(self, namespace, path=None, label=None, full_type=None, counter=None, is_leaf=True): + def __init__( + self, + namespace: str, + path: Optional[str] = None, + label: Optional[str] = None, + full_type: Optional[str] = None, + counter: Optional[int] = None, + is_leaf: bool = True, + ): """Construct a new node class namespace.""" self._namespace = namespace - self._path = path if path else namespace + self._path = path if path is not None else namespace self._full_type = self._infer_full_type(full_type) - self._subspaces = {} + self._subspaces: dict[str, PortNamespace] = {} self._is_leaf = is_leaf self._counter = counter - try: - self._label = label if label is not None else self.mapping_path_to_label[path] - except KeyError: - self._label = self._path.rpartition('.')[-1] + self._label: str + if label is not None: + self._label = label + else: + self._label = self.mapping_path_to_label.get(path, self._path.rpartition('.')[-1]) # Manual override for process subspaces that contain entries corresponding to nodes with "unregistered" process # types. In this case, the label should become `Unregistered` and the full type set to `None` because we cannot @@ -227,61 +238,64 @@ def __init__(self, namespace, path=None, label=None, full_type=None, counter=Non self._label = 'Unregistered' self._full_type = None - def _infer_full_type(self, full_type): + def _infer_full_type(self, full_type: Union[str, None]) -> Union[str, None]: """Infer the full type based on the current namespace path and the given full type of the leaf.""" from aiida.common.utils import strip_prefix - if full_type or self._path is None: + if full_type is None or self._path is None: return full_type + else: + # for type checker + full_type_: str = full_type full_type = strip_prefix(self._path, 'node.') - if full_type.startswith('process.'): + if full_type_.startswith('process.'): for basepath, full_type_template in self.process_full_type_mapping.items(): - if full_type.startswith(basepath): + if full_type_.startswith(basepath): plugin_name = strip_prefix(full_type, basepath) if plugin_name.startswith(DEFAULT_NAMESPACE_LABEL): temp_type_template = self.process_full_type_mapping_unplugged[basepath] plugin_name = strip_prefix(plugin_name, DEFAULT_NAMESPACE_LABEL + '.') - full_type = temp_type_template.format(plugin_name=plugin_name) + full_type_ = temp_type_template.format(plugin_name=plugin_name) else: - full_type = full_type_template.format(plugin_name=plugin_name) - return full_type + full_type_ = full_type_template.format(plugin_name=plugin_name) + return full_type_ - full_type += f'.{LIKE_OPERATOR_CHARACTER}{FULL_TYPE_CONCATENATOR}' + full_type_ += f'.{LIKE_OPERATOR_CHARACTER}{FULL_TYPE_CONCATENATOR}' - if full_type.startswith('process.'): - full_type += LIKE_OPERATOR_CHARACTER + if full_type_.startswith('process.'): + full_type_ += LIKE_OPERATOR_CHARACTER - return full_type + return full_type_ - def __iter__(self): + def __iter__(self) -> Iterator[PortNamespace]: return self._subspaces.__iter__() - def __len__(self): + def __len__(self) -> int: return len(self._subspaces) - def __delitem__(self, key): + def __delitem__(self, key: str) -> None: del self._subspaces[key] - def __getitem__(self, key): + def __getitem__(self, key: str) -> PortNamespace: return self._subspaces[key] - def __setitem__(self, key, port): + def __setitem__(self, key: str, port: PortNamespace) -> None: self._subspaces[key] = port @property - def is_leaf(self): + def is_leaf(self) -> bool: return self._is_leaf - def get_description(self): + def get_description(self) -> dict: """Return a dictionary with a description of the ports this namespace contains. Nested PortNamespaces will be properly recursed and Ports will print their properties in a list :returns: a dictionary of descriptions of the Ports contained within this PortNamespace """ - result = { + result: dict[str, Any] = { 'namespace': self._namespace, 'full_type': self._full_type, 'label': self._label, @@ -302,7 +316,7 @@ def get_description(self): return result - def create_namespace(self, name, **kwargs): + def create_namespace(self, name: str, **kwargs: Any) -> 'Namespace': """Create and return a new `Namespace` in this `Namespace`. If the name is namespaced, the sub `Namespaces` will be created recursively, except if one of the namespaces is @@ -356,7 +370,7 @@ def create_namespace(self, name, **kwargs): return self[port_name] -def get_node_namespace(user_pk=None, count_nodes=False): +def get_node_namespace(user_pk: Optional[int] = None, count_nodes: Optional[int] = False) -> 'Namespace': """Return the full namespace of all available nodes in the current database. :return: complete node `Namespace`