Skip to content

Commit

Permalink
Core & Internals: rse_settings dictionary type-hinted with typing.Typ…
Browse files Browse the repository at this point in the history
…edDict rucio#5972
  • Loading branch information
Kwaizer authored and bari12 committed Sep 11, 2023
1 parent 5763204 commit 2afb3b5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
57 changes: 57 additions & 0 deletions lib/rucio/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Optional, TypedDict, Union


class InternalType(object):
'''
Base for Internal representations of string types
Expand Down Expand Up @@ -99,3 +102,57 @@ class InternalScope(InternalType):
'''
def __init__(self, scope, vo='def', fromExternal=True):
super(InternalScope, self).__init__(value=scope, vo=vo, fromExternal=fromExternal)


class RSEDomainLANDict(TypedDict):
read: Optional[int]
write: Optional[int]
delete: Optional[int]


class RSEDomainWANDict(TypedDict):
read: Optional[int]
write: Optional[int]
delete: Optional[int]
third_party_copy_read: Optional[int]
third_party_copy_write: Optional[int]


class RSEDomainsDict(TypedDict):
lan: RSEDomainLANDict
wan: RSEDomainWANDict


class RSEProtocolDict(TypedDict):
auth_token: Optional[str] # FIXME: typing.NotRequired
hostname: str
scheme: str
port: int
prefix: str
impl: str
domains: RSEDomainsDict
extended_attributes: Optional[Union[str, dict[str, Any]]]


class RSESettingsDict(TypedDict):
availability_delete: bool
availability_read: bool
availability_write: bool
credentials: Optional[dict[str, Any]]
lfn2pfn_algorithm: str
qos_class: Optional[str]
staging_area: bool
rse_type: str
sign_url: Optional[str]
read_protocol: int
write_protocol: int
delete_protocol: int
third_party_copy_read_protocol: int
third_party_copy_write_protocol: int
id: str
rse: str
volatile: bool
verify_checksum: bool
deterministic: bool
domain: list[str]
protocols: list[RSEProtocolDict]
7 changes: 4 additions & 3 deletions lib/rucio/core/rse.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from rucio.common import exception, utils
from rucio.common.cache import make_region_memcached
from rucio.common.config import get_lfn2pfn_algorithm_default, config_get_bool
from rucio.common import types
from rucio.common.utils import CHECKSUM_KEY, GLOBALLY_SUPPORTED_CHECKSUMS, Availability
from rucio.core.rse_counter import add_counter, get_counter
from rucio.db.sqla import models
Expand Down Expand Up @@ -1334,7 +1335,7 @@ def add_protocol(


@read_session
def get_rse_protocols(rse_id, schemes=None, *, session: "Session"):
def get_rse_protocols(rse_id, schemes=None, *, session: "Session") -> types.RSESettingsDict:
"""
Returns protocol information. Parameter combinations are: (operation OR default) XOR scheme.
Expand Down Expand Up @@ -1373,7 +1374,7 @@ def _format_get_rse_protocols(
rse_attributes: Optional[dict[str, Any]] = None,
*,
session: "Session"
) -> dict[str, Any]:
) -> types.RSESettingsDict:
_rse = rse
if rse_attributes:
lfn2pfn_algorithm = rse_attributes.get('lfn2pfn_algorithm')
Expand Down Expand Up @@ -1445,7 +1446,7 @@ def _format_get_rse_protocols(


@read_session
def get_rse_info(rse_id, *, session: "Session"):
def get_rse_info(rse_id, *, session: "Session") -> types.RSESettingsDict:
"""
For historical reasons, related to usage of rsemanager, "rse_info" is equivalent to
a cached call to get_rse_protocols without any schemes set.
Expand Down
25 changes: 13 additions & 12 deletions lib/rucio/rse/rsemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
from urllib.parse import urlparse

from rucio.common import exception, utils, constants
from rucio.common import types
from rucio.common.config import config_get_int
from rucio.common.constraints import STRING_TYPES
from rucio.common.logging import formatted_logger
from rucio.common.utils import make_valid_did, GLOBALLY_SUPPORTED_CHECKSUMS


def get_rse_info(rse=None, vo='def', rse_id=None, session=None):
def get_rse_info(rse=None, vo='def', rse_id=None, session=None) -> types.RSESettingsDict:
"""
Returns all protocol related RSE attributes.
Call with either rse and vo, or (in server mode) rse_id
Expand Down Expand Up @@ -66,7 +67,7 @@ def get_rse_info(rse=None, vo='def', rse_id=None, session=None):
return rse_info


def _get_possible_protocols(rse_settings, operation, scheme=None, domain=None, impl=None):
def _get_possible_protocols(rse_settings: types.RSESettingsDict, operation, scheme=None, domain=None, impl=None):
"""
Filter the list of available protocols or provided by the supported ones.
Expand Down Expand Up @@ -116,7 +117,7 @@ def _get_possible_protocols(rse_settings, operation, scheme=None, domain=None, i
return [c for c in candidates if c not in tbr]


def get_protocols_ordered(rse_settings, operation, scheme=None, domain='wan', impl=None):
def get_protocols_ordered(rse_settings: types.RSESettingsDict, operation, scheme=None, domain='wan', impl=None):
if operation not in utils.rse_supported_protocol_operations():
raise exception.RSEOperationNotSupported('Operation %s is not supported' % operation)

Expand All @@ -128,7 +129,7 @@ def get_protocols_ordered(rse_settings, operation, scheme=None, domain='wan', im
return candidates


def select_protocol(rse_settings, operation, scheme=None, domain='wan'):
def select_protocol(rse_settings: types.RSESettingsDict, operation, scheme=None, domain='wan'):
if operation not in utils.rse_supported_protocol_operations():
raise exception.RSEOperationNotSupported('Operation %s is not supported' % operation)

Expand All @@ -141,7 +142,7 @@ def select_protocol(rse_settings, operation, scheme=None, domain='wan'):
return min(candidates, key=lambda k: k['domains'][domain][operation])


def create_protocol(rse_settings, operation, scheme=None, domain='wan', auth_token=None, protocol_attr=None, logger=logging.log, impl=None):
def create_protocol(rse_settings: types.RSESettingsDict, operation, scheme=None, domain='wan', auth_token=None, protocol_attr=None, logger=logging.log, impl=None):
"""
Instantiates the protocol defined for the given operation.
Expand Down Expand Up @@ -191,7 +192,7 @@ def create_protocol(rse_settings, operation, scheme=None, domain='wan', auth_tok
return protocol


def lfns2pfns(rse_settings, lfns, operation='write', scheme=None, domain='wan', auth_token=None, logger=logging.log, impl=None):
def lfns2pfns(rse_settings: types.RSESettingsDict, lfns, operation='write', scheme=None, domain='wan', auth_token=None, logger=logging.log, impl=None):
"""
Convert the lfn to a pfn
Expand All @@ -209,7 +210,7 @@ def lfns2pfns(rse_settings, lfns, operation='write', scheme=None, domain='wan',
return create_protocol(rse_settings, operation, scheme, domain, auth_token=auth_token, logger=logger, impl=impl).lfns2pfns(lfns)


def parse_pfns(rse_settings, pfns, operation='read', domain='wan', auth_token=None):
def parse_pfns(rse_settings: types.RSESettingsDict, pfns, operation='read', domain='wan', auth_token=None):
"""
Checks if a PFN is feasible for a given RSE. If so it splits the pfn in its various components.
Expand All @@ -231,7 +232,7 @@ def parse_pfns(rse_settings, pfns, operation='read', domain='wan', auth_token=No
return create_protocol(rse_settings, operation, urlparse(pfns[0]).scheme, domain, auth_token=auth_token).parse_pfns(pfns)


def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token=None, vo='def', logger=logging.log):
def exists(rse_settings: types.RSESettingsDict, files, domain='wan', scheme=None, impl=None, auth_token=None, vo='def', logger=logging.log):
"""
Checks if a file is present at the connected storage.
Providing a list indicates the bulk mode.
Expand Down Expand Up @@ -292,7 +293,7 @@ def exists(rse_settings, files, domain='wan', scheme=None, impl=None, auth_token
return [gs, ret]


def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None, auth_token=None, vo='def', logger=logging.log, impl=None):
def upload(rse_settings: types.RSESettingsDict, lfns, domain='wan', source_dir=None, force_pfn=None, force_scheme=None, transfer_timeout=None, delete_existing=False, sign_service=None, auth_token=None, vo='def', logger=logging.log, impl=None):
"""
Uploads a file to the connected storage.
Providing a list indicates the bulk mode.
Expand Down Expand Up @@ -491,7 +492,7 @@ def upload(rse_settings, lfns, domain='wan', source_dir=None, force_pfn=None, fo
return {0: gs, 1: ret, 'success': gs, 'pfn': pfn}


def delete(rse_settings, lfns, domain='wan', auth_token=None, logger=logging.log, impl=None):
def delete(rse_settings: types.RSESettingsDict, lfns, domain='wan', auth_token=None, logger=logging.log, impl=None):
"""
Delete a file from the connected storage.
Providing a list indicates the bulk mode.
Expand Down Expand Up @@ -534,7 +535,7 @@ def delete(rse_settings, lfns, domain='wan', auth_token=None, logger=logging.log
return [gs, ret]


def rename(rse_settings, files, domain='wan', auth_token=None, logger=logging.log, impl=None):
def rename(rse_settings: types.RSESettingsDict, files, domain='wan', auth_token=None, logger=logging.log, impl=None):
"""
Rename files stored on the connected storage.
Providing a list indicates the bulk mode.
Expand Down Expand Up @@ -610,7 +611,7 @@ def rename(rse_settings, files, domain='wan', auth_token=None, logger=logging.lo
return [gs, ret]


def get_space_usage(rse_settings, scheme=None, domain='wan', auth_token=None, logger=logging.log, impl=None):
def get_space_usage(rse_settings: types.RSESettingsDict, scheme=None, domain='wan', auth_token=None, logger=logging.log, impl=None):
"""
Get RSE space usage information.
Expand Down

0 comments on commit 2afb3b5

Please sign in to comment.