From eafaca5c14a0e5483b13a6026649d98671e63407 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:10:34 +0200 Subject: [PATCH 1/8] Added annotations --- sievelib/commands.py | 123 ++++++++++++++++++----------- sievelib/factory.py | 66 +++++++++++----- sievelib/managesieve.py | 115 ++++++++++++++++----------- sievelib/parser.py | 38 ++++----- sievelib/tests/test_managesieve.py | 6 +- sievelib/tools.py | 2 +- 6 files changed, 221 insertions(+), 129 deletions(-) diff --git a/sievelib/commands.py b/sievelib/commands.py index 78af592..e062b41 100644 --- a/sievelib/commands.py +++ b/sievelib/commands.py @@ -20,21 +20,21 @@ """ +from collections.abc import Iterable, Iterator import sys -from collections.abc import Iterable +from typing import Any, NotRequired, Optional, TypedDict, Union + from . import tools class CommandError(Exception): """Base command exception class.""" - pass - class UnknownCommand(CommandError): """Specific exception raised when an unknown command is encountered""" - def __init__(self, name): + def __init__(self, name: str): self.name = name def __str__(self): @@ -78,22 +78,42 @@ def __str__(self): return "extension '{}' not loaded".format(self.name) +class CommandExtraArg(TypedDict): + """Type definition for command extra argument.""" + + type: Union[str, list[str]] + values: NotRequired[list[str]] + valid_for: NotRequired[list[str]] + + +class CommandArg(TypedDict): + """Type definition for command argument.""" + + name: str + type: list[str] + required: NotRequired[bool] + values: NotRequired[list[str]] + extra_arg: NotRequired[CommandExtraArg] + extension: NotRequired[str] + extension_values: NotRequired[dict[str, str]] + + # Statement elements (see RFC, section 8.3) # They are used in different commands. -comparator = { +comparator: CommandArg = { "name": "comparator", "type": ["tag"], "values": [":comparator"], "extra_arg": {"type": "string", "values": ['"i;octet"', '"i;ascii-casemap"']}, "required": False, } -address_part = { +address_part: CommandArg = { "name": "address-part", "values": [":localpart", ":domain", ":all"], "type": ["tag"], "required": False, } -match_type = { +match_type: CommandArg = { "name": "match-type", "values": [":is", ":contains", ":matches"], "extension_values": { @@ -111,7 +131,7 @@ def __str__(self): } -class Command(object): +class Command: """Generic command representation. A command is described as follow: @@ -125,33 +145,36 @@ class Command(object): """ - _type = None - variable_args_nb = False - non_deterministic_args = False - accept_children = False - must_follow = None - extension = None + args_definition: list[CommandArg] + _type: str + variable_args_nb: bool = False + non_deterministic_args: bool = False + accept_children: bool = False + must_follow: Optional[list[str]] = None + extension: Optional[str] = None - def __init__(self, parent=None): + def __init__(self, parent: Optional["Command"] = None): self.parent = parent - self.arguments = {} - self.extra_arguments = {} # to store tag arguments - self.children = [] + self.arguments: dict[str, Any] = {} + self.extra_arguments: dict[str, Any] = {} # to store tag arguments + self.children: list[Command] = [] self.nextargpos = 0 self.required_args = -1 self.rargs_cnt = 0 - self.curarg = None # for arguments that expect an argument :p (ex: :comparator) + self.curarg: Union[CommandArg, None] = ( + None # for arguments that expect an argument :p (ex: :comparator) + ) - self.name = self.__class__.__name__.replace("Command", "") + self.name: str = self.__class__.__name__.replace("Command", "") self.name = self.name.lower() - self.hash_comments = [] + self.hash_comments: list[bytes] = [] def __repr__(self): return "%s (type: %s)" % (self.name, self._type) - def tosieve(self, indentlevel=0, target=sys.stdout): + def tosieve(self, indentlevel: int = 0, target=sys.stdout): """Generate the sieve syntax corresponding to this command Recursive method. @@ -213,14 +236,16 @@ def tosieve(self, indentlevel=0, target=sys.stdout): ch.tosieve(indentlevel + 4, target=target) self.__print("}", indentlevel, target=target) - def __print(self, data, indentlevel, nocr=False, target=sys.stdout): + def __print( + self, data: str, indentlevel: int, nocr: bool = False, target=sys.stdout + ): text = "%s%s" % (" " * indentlevel, data) if nocr: target.write(text) else: target.write(text + "\n") - def __get_arg_type(self, arg): + def __get_arg_type(self, arg: str) -> Optional[list[str]]: """Return the type corresponding to the given name. :param arg: a defined argument name @@ -237,11 +262,11 @@ def complete_cb(self): """ pass - def get_expected_first(self): + def get_expected_first(self) -> Optional[list[str]]: """Return the first expected token for this command""" return None - def has_arguments(self): + def has_arguments(self) -> bool: return len(self.args_definition) != 0 def reassign_arguments(self): @@ -252,7 +277,7 @@ def reassign_arguments(self): """ raise NotImplementedError - def dump(self, indentlevel=0, target=sys.stdout): + def dump(self, indentlevel: int = 0, target=sys.stdout): """Display the command Pretty printing of this command and its eventual arguments and @@ -291,7 +316,7 @@ def dump(self, indentlevel=0, target=sys.stdout): for ch in self.children: ch.dump(indentlevel, target) - def walk(self): + def walk(self) -> Iterator["Command"]: """Walk through commands.""" yield self if self.has_arguments(): @@ -311,7 +336,7 @@ def walk(self): for node in ch.walk(): yield node - def addchild(self, child): + def addchild(self, child: "Command") -> bool: """Add a new child to the command A child corresponds to a command located into a block (this @@ -325,7 +350,9 @@ def addchild(self, child): self.children += [child] return True - def iscomplete(self, atype=None, avalue=None): + def iscomplete( + self, atype: Optional[str] = None, avalue: Optional[str] = None + ) -> bool: """Check if the command is complete Check if all required arguments have been encountered. For @@ -342,7 +369,7 @@ def iscomplete(self, atype=None, avalue=None): if arg.get("required", False): self.required_args += 1 return ( - not self.curarg + self.curarg is None or "extra_arg" not in self.curarg or ( "valid_for" in self.curarg["extra_arg"] @@ -352,20 +379,22 @@ def iscomplete(self, atype=None, avalue=None): ) ) and (self.rargs_cnt == self.required_args) - def get_type(self): + def get_type(self) -> str: """Return the command's type""" if self._type is None: raise NotImplementedError return self._type - def __is_valid_value_for_arg(self, arg, value, check_extension=True): + def __is_valid_value_for_arg( + self, arg: CommandArg, value: str, check_extension: bool = True + ) -> bool: """Check if value is allowed for arg Some commands only allow a limited set of values. The method always returns True for methods that do not provide such a set. - :param arg: the argument's name + :param arg: the argument :param value: the value to check :param check_extension: check if value requires an extension :return: True on succes, False otherwise @@ -386,7 +415,7 @@ def __is_valid_value_for_arg(self, arg, value, check_extension=True): return True return False - def __is_valid_type(self, typ, typlist): + def __is_valid_type(self, typ: str, typlist: list[str]) -> bool: """Check if type is valid based on input type list "string" is special because it can be used for stringlist @@ -399,7 +428,9 @@ def __is_valid_type(self, typ, typlist): return typ in typlist or (typ_is_str and str_list_in_typlist) - def check_next_arg(self, atype, avalue, add=True, check_extension=True): + def check_next_arg( + self, atype: str, avalue: str, add: bool = True, check_extension: bool = True + ) -> bool: """Argument validity checking This method is usually used by the parser to check if detected @@ -468,7 +499,7 @@ def check_next_arg(self, atype, avalue, add=True, check_extension=True): self.arguments[curarg["name"]] = avalue break - condition = atype in curarg["type"] and self.__is_valid_value_for_arg( + condition: bool = atype in curarg["type"] and self.__is_valid_value_for_arg( curarg, avalue, check_extension ) if condition: @@ -496,11 +527,11 @@ def check_next_arg(self, atype, avalue, add=True, check_extension=True): raise BadArgument(self.name, avalue, self.args_definition[pos]["type"]) return True - def __contains__(self, name): + def __contains__(self, name: str) -> bool: """Check if argument is provided with command.""" return name in self.arguments - def __getitem__(self, name): + def __getitem__(self, name: str) -> Any: """Shorcut to access a command argument :param name: the argument's name @@ -535,7 +566,7 @@ class RequireCommand(ControlCommand): {"name": "capabilities", "type": ["string", "stringlist"], "required": True} ] - loaded_extensions = [] + loaded_extensions: list[str] = [] def complete_cb(self): if type(self.arguments["capabilities"]) != list: @@ -553,7 +584,7 @@ class IfCommand(ControlCommand): args_definition = [{"name": "test", "type": ["test"], "required": True}] - def get_expected_first(self): + def get_expected_first(self) -> list[str]: return ["identifier"] @@ -562,7 +593,7 @@ class ElsifCommand(ControlCommand): must_follow = ["if", "elsif"] args_definition = [{"name": "test", "type": ["test"], "required": True}] - def get_expected_first(self): + def get_expected_first(self) -> list[str]: return ["identifier"] @@ -717,7 +748,7 @@ class AllofCommand(TestCommand): args_definition = [{"name": "tests", "type": ["testlist"], "required": True}] - def get_expected_first(self): + def get_expected_first(self) -> list[str]: return ["left_parenthesis"] @@ -727,7 +758,7 @@ class AnyofCommand(TestCommand): args_definition = [{"name": "tests", "type": ["testlist"], "required": True}] - def get_expected_first(self): + def get_expected_first(self) -> list[str]: return ["left_parenthesis"] @@ -1044,7 +1075,9 @@ def add_commands(cmds): globals()[command.__name__] = command -def get_command_instance(name, parent=None, checkexists=True): +def get_command_instance( + name: str, parent: Optional[Command] = None, checkexists: bool = True +) -> Command: """Try to guess and create the appropriate command instance Given a command name (encountered by the parser), construct the diff --git a/sievelib/factory.py b/sievelib/factory.py index f57ee1c..82acb03 100644 --- a/sievelib/factory.py +++ b/sievelib/factory.py @@ -10,6 +10,7 @@ import io import sys +from typing import NotRequired, Optional, TypedDict, Union from sievelib import commands @@ -18,6 +19,15 @@ class FilterAlreadyExists(Exception): pass +class Filter(TypedDict): + """Type definition for filter.""" + + name: str + content: commands.Command + enabled: bool + description: NotRequired[str] + + class FiltersSet: """A set of filters.""" @@ -39,8 +49,8 @@ def __init__( self.name = name self.filter_name_pretext = filter_name_pretext self.filter_desc_pretext = filter_desc_pretext - self.requires = [] - self.filters = [] + self.requires: list[str] = [] + self.filters: list[Filter] = [] def __str__(self): target = io.StringIO() @@ -49,7 +59,7 @@ def __str__(self): target.close() return ret - def __isdisabled(self, fcontent): + def __isdisabled(self, fcontent: commands.Command) -> bool: """Tells if a filter is disabled or not Simply checks if the filter is surrounded by a "if false" test. @@ -62,7 +72,7 @@ def __isdisabled(self, fcontent): return False return True - def from_parser_result(self, parser): + def from_parser_result(self, parser: "sievelib.parser.Parser"): cpt = 1 for f in parser.result: if isinstance(f, commands.RequireCommand): @@ -100,13 +110,13 @@ def require(self, name: str): if name not in self.requires: self.requires += [name] - def check_if_arg_is_extension(self, arg): + def check_if_arg_is_extension(self, arg: str): """Include extension if arg requires one.""" args_using_extensions = {":copy": "copy"} if arg in args_using_extensions: self.require(args_using_extensions[arg]) - def __gen_require_command(self): + def __gen_require_command(self) -> Union[commands.Command, None]: """Internal method to create a RequireCommand based on requirements Called just before this object is going to be dumped. @@ -117,7 +127,7 @@ def __gen_require_command(self): reqcmd.check_next_arg("stringlist", self.requires) return reqcmd - def __quote_if_necessary(self, value): + def __quote_if_necessary(self, value: str) -> str: """Add double quotes to the given string if necessary :param value: the string to check @@ -127,7 +137,9 @@ def __quote_if_necessary(self, value): return '"%s"' % value return value - def __build_condition(self, condition, parent, tag=None): + def __build_condition( + self, condition: list[str], parent: commands.Command, tag: Optional[str] = None + ) -> commands.Command: """Translate a condition to a valid sievelib Command. :param list condition: condition's definition @@ -144,7 +156,12 @@ def __build_condition(self, condition, parent, tag=None): cmd.check_next_arg("string", self.__quote_if_necessary(condition[2])) return cmd - def __create_filter(self, conditions, actions, matchtype="anyof"): + def __create_filter( + self, + conditions: list[tuple], + actions: list[tuple], + matchtype: str = "anyof", + ) -> commands.Command: """Create a new filter A filter is composed of: @@ -272,7 +289,7 @@ def __create_filter(self, conditions, actions, matchtype="anyof"): ifcontrol.addchild(action) return ifcontrol - def _unicode_filter_name(self, name): + def _unicode_filter_name(self, name) -> str: """Convert name to unicode if necessary.""" return name.decode("utf-8") if isinstance(name, bytes) else name @@ -284,7 +301,11 @@ def filter_exists(self, name: str) -> bool: return False def addfilter( - self, name: str, conditions: list, actions: list, matchtype: str = "anyof" + self, + name: str, + conditions: list[tuple], + actions: list[tuple], + matchtype: str = "anyof", ): """Add a new filter to this filters set @@ -306,7 +327,12 @@ def addfilter( ] def updatefilter( - self, oldname: str, newname: str, conditions, actions, matchtype: str = "anyof" + self, + oldname: str, + newname: str, + conditions: list[tuple], + actions: list[tuple], + matchtype: str = "anyof", ) -> bool: """Update a specific filter @@ -337,8 +363,12 @@ def updatefilter( return True def replacefilter( - self, oldname: str, sieve_filter, newname: str = None, description: str = None - ): + self, + oldname: str, + sieve_filter: commands.Command, + newname: Optional[str] = None, + description: Optional[str] = None, + ) -> bool: """replace a specific sieve_filter Instead of removing and re-creating the sieve_filter, we update the @@ -370,7 +400,7 @@ def replacefilter( return self.disablefilter(newname) return True - def getfilter(self, name: str): + def getfilter(self, name: str) -> Union[commands.Command, None]: """Search for a specific filter :param name: the filter's name @@ -384,7 +414,7 @@ def getfilter(self, name: str): return f["content"] return None - def get_filter_matchtype(self, name: str) -> str: + def get_filter_matchtype(self, name: str) -> Union[str, None]: """Retrieve matchtype of the given filter.""" flt = self.getfilter(name) if not flt: @@ -394,7 +424,7 @@ def get_filter_matchtype(self, name: str) -> str: return node.__class__.__name__.lower().replace("command", "") return None - def get_filter_conditions(self, name: str) -> list: + def get_filter_conditions(self, name: str) -> Union[list[str], None]: """Retrieve conditions of the given filter.""" flt = self.getfilter(name) if not flt: @@ -434,7 +464,7 @@ def get_filter_conditions(self, name: str) -> list: conditions.append(args) return conditions - def get_filter_actions(self, name: str) -> list: + def get_filter_actions(self, name: str) -> Union[list[str], None]: """Retrieve actions of the given filter.""" flt = self.getfilter(name) if not flt: diff --git a/sievelib/managesieve.py b/sievelib/managesieve.py index cce462a..af62431 100644 --- a/sievelib/managesieve.py +++ b/sievelib/managesieve.py @@ -12,6 +12,7 @@ import re import socket import ssl +from typing import Any, Optional from .digest_md5 import DigestMD5 from . import tools @@ -37,7 +38,7 @@ class Error(Exception): class Response(Exception): - def __init__(self, code, data): + def __init__(self, code: bytes, data: bytes): self.code = code self.data = data @@ -69,20 +70,21 @@ def check(cls, *args, **kwargs): return check -class Client(object): +class Client: read_size = 4096 read_timeout = 5 - def __init__(self, srvaddr, srvport=4190, debug=False): + def __init__(self, srvaddr: str, srvport: int = 4190, debug: bool = False): self.srvaddr = srvaddr self.srvport = srvport self.__debug = debug - self.sock = None - self.__read_buffer = b"" - self.authenticated = False - self.errcode = None + self.sock: socket.socket = None + self.__read_buffer: bytes = b"" + self.authenticated: bool = False + self.errcode: bytes = None + self.errmsg: bytes = b"" - self.__capabilities = {} + self.__capabilities: dict[str, str] = {} self.__respcode_expr = re.compile(rb"(OK|NO|BYE)\s*(.+)?") self.__error_expr = re.compile(rb'(\([\w/-]+\))?\s*(".+")') self.__size_expr = re.compile(rb"\{(\d+)\+?\}") @@ -98,7 +100,7 @@ def __dprint(self, message): return print("DEBUG: %s" % message) - def __read_block(self, size): + def __read_block(self, size: int) -> bytes: """Read a block of 'size' bytes from the server. An internal buffer is used to read data from the server. If @@ -128,7 +130,7 @@ def __read_block(self, size): self.__dprint(buf) return buf - def __read_line(self): + def __read_line(self) -> bytes: """Read one line from the server. An internal buffer is used to read data from the server @@ -175,7 +177,7 @@ def __read_line(self): raise Response(m.group(1), m.group(2)) return ret - def __read_response(self, nblines=-1): + def __read_response(self, nblines: int = -1) -> tuple[bytes, bytes, bytes]: """Read a response from the server. In the usual case, we read lines until we find one that looks @@ -211,7 +213,7 @@ def __read_response(self, nblines=-1): return (code, data, resp) - def __prepare_args(self, args): + def __prepare_args(self, args: list[Any]) -> list[bytes]: """Format command arguments before sending them. Command arguments of type string must be quoted, the only @@ -231,7 +233,7 @@ def __prepare_args(self, args): ret += [bytes(str(a).encode("utf-8"))] return ret - def __prepare_content(self, content): + def __prepare_content(self, content: str) -> bytes: """Format script content before sending it. Script length must be inserted before the content, @@ -240,13 +242,17 @@ def __prepare_content(self, content): :param content: script content as str or bytes :return: transformed script as bytes """ - if isinstance(content, str): - content = content.encode("utf-8") - return b"{%d+}%s%s" % (len(content), CRLF, content) + bcontent: bytes = content.encode("utf-8") + return b"{%d+}%s%s" % (len(bcontent), CRLF, bcontent) def __send_command( - self, name, args=None, withcontent=False, extralines=None, nblines=-1 - ): + self, + name: str, + args: Optional[list[bytes]] = None, + withcontent: bool = False, + extralines: Optional[list[bytes]] = None, + nblines: int = -1, + ) -> tuple[str, str, bytes]: """Send a command to the server. If args is not empty, we concatenate the given command with @@ -285,7 +291,7 @@ def __send_command( return (code, data, content) return (code, data) - def __get_capabilities(self): + def __get_capabilities(self) -> bool: code, data, capabilities = self.__read_response() if code == "NO": return False @@ -300,7 +306,7 @@ def __get_capabilities(self): ) return True - def __parse_error(self, text): + def __parse_error(self, text: bytes): """Parse an error received from the server. if text corresponds to a size indication, we grab the @@ -328,24 +334,24 @@ def __parse_error(self, text): self.errcode = b"" self.errmsg = m.group(2).strip(b'"') - def _plain_authentication(self, login, password, authz_id=b""): + def _plain_authentication( + self, login: bytes, password: bytes, authz_id: bytes = b"" + ) -> bool: """SASL PLAIN authentication :param login: username :param password: clear password :return: True on success, False otherwise. """ - if isinstance(login, str): - login = login.encode("utf-8") - if isinstance(password, str): - password = password.encode("utf-8") params = base64.b64encode(b"\0".join([authz_id, login, password])) code, data = self.__send_command("AUTHENTICATE", [b"PLAIN", params]) if code == "OK": return True return False - def _login_authentication(self, login, password, authz_id=""): + def _login_authentication( + self, login: bytes, password: bytes, authz_id: bytes = "" + ) -> bool: """SASL LOGIN authentication :param login: username @@ -353,8 +359,8 @@ def _login_authentication(self, login, password, authz_id=""): :return: True on success, False otherwise. """ extralines = [ - b'"%s"' % base64.b64encode(login.encode("utf-8")), - b'"%s"' % base64.b64encode(password.encode("utf-8")), + b'"%s"' % base64.b64encode(login), + b'"%s"' % base64.b64encode(password), ] code, data = self.__send_command( "AUTHENTICATE", [b"LOGIN"], extralines=extralines @@ -363,7 +369,9 @@ def _login_authentication(self, login, password, authz_id=""): return True return False - def _digest_md5_authentication(self, login, password, authz_id=""): + def _digest_md5_authentication( + self, login: bytes, password: bytes, authz_id: bytes = b"" + ) -> bool: """SASL DIGEST-MD5 authentication :param login: username @@ -390,7 +398,9 @@ def _digest_md5_authentication(self, login, password, authz_id=""): return True return False - def _oauthbearer_authentication(self, login, password, authz_id=""): + def _oauthbearer_authentication( + self, login: bytes, password: bytes, authz_id: bytes = b"" + ) -> bool: """ OAUTHBEARER authentication. @@ -409,7 +419,13 @@ def _oauthbearer_authentication(self, login, password, authz_id=""): return True return False - def __authenticate(self, login, password, authz_id=b"", authmech=None): + def __authenticate( + self, + login: str, + password: str, + authz_id: str = "", + authmech: Optional[str] = None, + ) -> bool: """AUTHENTICATE command Actually, it is just a wrapper to the real commands (one by @@ -439,7 +455,11 @@ def __authenticate(self, login, password, authz_id=b"", authmech=None): continue mech = mech.lower().replace("-", "_") auth_method = getattr(self, "_%s_authentication" % mech) - if auth_method(login, password, authz_id): + if auth_method( + login.encode("utf-8"), + password.encode("utf-8"), + authz_id.encode("utf-8"), + ): self.authenticated = True return True return False @@ -447,7 +467,7 @@ def __authenticate(self, login, password, authz_id=b"", authmech=None): self.errmsg = b"No suitable mechanism found" return False - def __starttls(self, keyfile=None, certfile=None): + def __starttls(self, keyfile=None, certfile=None) -> bool: """STARTTLS command See MANAGESIEVE specifications, section 2.2. @@ -472,7 +492,7 @@ def __starttls(self, keyfile=None, certfile=None): self.__get_capabilities() return True - def get_implementation(self): + def get_implementation(self) -> str: """Returns the IMPLEMENTATION value. It is read from server capabilities. (see the CAPABILITY @@ -482,7 +502,7 @@ def get_implementation(self): """ return self.__capabilities["IMPLEMENTATION"] - def get_sasl_mechanisms(self): + def get_sasl_mechanisms(self) -> list[str]: """Returns the supported authentication mechanisms. They're read from server capabilities. (see the CAPABILITY @@ -492,7 +512,7 @@ def get_sasl_mechanisms(self): """ return self.__capabilities["SASL"].split() - def has_tls_support(self): + def has_tls_support(self) -> bool: """Tells if the server has STARTTLS support or not. It is read from server capabilities. (see the CAPABILITY @@ -514,7 +534,14 @@ def get_sieve_capabilities(self): self.__capabilities["SIEVE"] = self.__capabilities["SIEVE"].split() return self.__capabilities["SIEVE"] - def connect(self, login, password, authz_id=b"", starttls=False, authmech=None): + def connect( + self, + login: str, + password: str, + authz_id: str = "", + starttls: bool = False, + authmech: Optional[str] = None, + ): """Establish a connection with the server. This function must be used. It read the server capabilities @@ -578,7 +605,7 @@ def havespace(self, scriptname: str, scriptsize: int) -> bool: return False @authentication_required - def listscripts(self): + def listscripts(self) -> tuple[str, list[str]]: """List available scripts. See MANAGESIEVE specifications, section 2.7 @@ -588,8 +615,8 @@ def listscripts(self): code, data, listing = self.__send_command("LISTSCRIPTS", withcontent=True) if code == "NO": return None - ret = [] - active_script = None + ret: list[str] = [] + active_script: str = None for l in listing.splitlines(): if self.__size_expr.match(l): continue @@ -636,8 +663,8 @@ def putscript(self, name: str, content: str) -> bool: :param content: script's content :rtype: boolean """ - content = self.__prepare_content(content) - code, data = self.__send_command("PUTSCRIPT", [name.encode("utf-8"), content]) + bcontent = self.__prepare_content(content) + code, data = self.__send_command("PUTSCRIPT", [name.encode("utf-8"), bcontent]) if code == "OK": return True return False @@ -727,8 +754,8 @@ def checkscript(self, content: str) -> bool: """ if "VERSION" not in self.__capabilities: raise NotImplementedError("server does not support CHECKSCRIPT command") - content = self.__prepare_content(content) - code, data = self.__send_command("CHECKSCRIPT", [content]) + bcontent = self.__prepare_content(content) + code, data = self.__send_command("CHECKSCRIPT", [bcontent]) if code == "OK": return True return False diff --git a/sievelib/parser.py b/sievelib/parser.py index 057b848..f24b041 100755 --- a/sievelib/parser.py +++ b/sievelib/parser.py @@ -7,6 +7,7 @@ This implementation is based on RFC 5228 (http://tools.ietf.org/html/rfc5228) """ +from collections.abc import Iterator import re import sys @@ -16,11 +17,11 @@ class ParseError(Exception): """Generic parsing error""" - def __init__(self, msg): + def __init__(self, msg: str): self.msg = msg def __str__(self): - return "parsing error: %s" % self.msg + return f"parsing error: {self.msg}" class Lexer: @@ -46,15 +47,15 @@ def __init__(self, definitions): self.regexp = re.compile(self.regexpString, re.MULTILINE) self.wsregexp = re.compile(rb"\s+", re.M) - def curlineno(self): + def curlineno(self) -> int: """Return the current line number""" return self.text[: self.pos].count(b"\n") + 1 - def curcolno(self): + def curcolno(self) -> int: """Return the current column number""" return self.pos - self.text.rfind(b"\n", 0, self.pos) - def scan(self, text): + def scan(self, text: bytes) -> Iterator[tuple[str, bytes]]: """Analyse some data Analyse the passed content. Each time a token is recognized, a @@ -79,7 +80,7 @@ def scan(self, text): m = self.wsregexp.search(token) if m is not None: token = token[: m.start()] - raise ParseError("unknown token %s" % token) + raise ParseError(f"unknown token {token}") yield (m.lastgroup, m.group(m.lastgroup)) self.pos += len(m.group(0)) @@ -110,7 +111,7 @@ class Parser: (b"number", rb"[0-9]+[KMGkmg]?"), ] - def __init__(self, debug=False): + def __init__(self, debug: bool = False): self.debug = debug self.lexer = Lexer(Parser.lrules) @@ -144,14 +145,14 @@ def __set_expected(self, *args, **kwargs): """ self.__expected = args - def __push_expected_bracket(self, ttype, tvalue): + def __push_expected_bracket(self, ttype: str, tvalue: bytes): """Append a new expected bracket. Next time a bracket is closed, it must match the one provided here. """ self.__expected_brackets.append((ttype, tvalue)) - def __pop_expected_bracket(self, ttype, tvalue): + def __pop_expected_bracket(self, ttype: str, tvalue): """Drop the last expected bracket. If the given bracket doesn't match the dropped expected bracket, @@ -166,7 +167,7 @@ def __pop_expected_bracket(self, ttype, tvalue): "unexpected closing bracket %s (expected %s)" % (tvalue, evalue) ) - def __up(self, onlyrecord=False): + def __up(self, onlyrecord: bool = False): """Return to the current command's parent This method should be called each time a command is @@ -225,7 +226,7 @@ def __up(self, onlyrecord=False): self.__set_expected("comma", "right_parenthesis") break - def __check_command_completion(self, testsemicolon=True): + def __check_command_completion(self, testsemicolon: bool = True) -> bool: """Check for command(s) completion This function should be called each time a new argument is @@ -268,7 +269,7 @@ def __check_command_completion(self, testsemicolon=True): break return True - def __stringlist(self, ttype, tvalue): + def __stringlist(self, ttype: str, tvalue: bytes) -> bool: """Specific method to parse the 'string-list' type Syntax: @@ -290,7 +291,7 @@ def __stringlist(self, ttype, tvalue): return self.__check_command_completion() return False - def __argument(self, ttype, tvalue): + def __argument(self, ttype: str, tvalue: bytes) -> bool: """Argument parsing method This method acts as an entry point for 'argument' parsing. @@ -327,7 +328,7 @@ def __argument(self, ttype, tvalue): return False - def __arguments(self, ttype, tvalue): + def __arguments(self, ttype: str, tvalue: bytes) -> bool: """Arguments parsing method Entry point for command arguments parsing. The parser must @@ -371,7 +372,7 @@ def __arguments(self, ttype, tvalue): return False - def __command(self, ttype, tvalue): + def __command(self, ttype: str, tvalue: bytes) -> bool: """Command parsing method Entry point for command parsing. Here is expected behaviour: @@ -432,7 +433,7 @@ def __command(self, ttype, tvalue): return True return False - def parse(self, text): + def parse(self, text: bytes) -> bool: """The parser entry point. Parse the provided text to check for its validity. @@ -452,7 +453,8 @@ def parse(self, text): self.__reset_parser() try: - tvalue = "" + ttype: str + tvalue: bytes = b"" for ttype, tvalue in self.lexer.scan(text): if ttype == "hash_comment": self.hash_comments += [tvalue.strip()] @@ -499,7 +501,7 @@ def parse(self, text): return False return True - def parse_file(self, name): + def parse_file(self, name: str) -> bool: """Parse the content of a file. See 'parse' method for information. diff --git a/sievelib/tests/test_managesieve.py b/sievelib/tests/test_managesieve.py index c31a6c3..0232ab1 100644 --- a/sievelib/tests/test_managesieve.py +++ b/sievelib/tests/test_managesieve.py @@ -50,7 +50,7 @@ def setUp(self): def authenticate(self, mock_socket): """Authenticate client.""" mock_socket.return_value.recv.side_effect = (AUTHENTICATION,) - self.client.connect(b"user", b"password") + self.client.connect("user", "password") def test_connection(self, mock_socket): """Test connection.""" @@ -62,7 +62,7 @@ def test_connection(self, mock_socket): def test_auth_oauthbearer(self, mock_socket): """Test OAUTHBEARER mechanism.""" mock_socket.return_value.recv.side_effect = (AUTHENTICATION,) - self.assertTrue(self.client.connect(b"user", b"token", authmech="OAUTHBEARER")) + self.assertTrue(self.client.connect("user", "token", authmech="OAUTHBEARER")) def test_capabilities(self, mock_socket): """Test capabilities command.""" @@ -145,7 +145,7 @@ def test_renamescript_simulated(self, mock_socket): CAPABILITIES_WITHOUT_VERSION + b'OK "Dovecot ready."\r\n' b'OK "Logged in."\r\n', ) - self.client.connect(b"user", b"password") + self.client.connect("user", "password") mock_socket.return_value.recv.side_effect = ( LISTSCRIPTS, GETSCRIPT, diff --git a/sievelib/tools.py b/sievelib/tools.py index 4a7bd18..250289d 100644 --- a/sievelib/tools.py +++ b/sievelib/tools.py @@ -1,7 +1,7 @@ """Some tools.""" -def to_list(stringlist, unquote=True): +def to_list(stringlist: str, unquote: bool = True) -> list[str]: """Convert a string representing a list to real list.""" stringlist = stringlist[1:-1] return [ From e67b963c40b02577e4e368fd4e84ece3d7daeb0b Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:13:38 +0200 Subject: [PATCH 2/8] Compat with older python versions --- sievelib/commands.py | 36 ++++++++++++++++++------------------ sievelib/managesieve.py | 14 +++++++------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/sievelib/commands.py b/sievelib/commands.py index e062b41..64536f7 100644 --- a/sievelib/commands.py +++ b/sievelib/commands.py @@ -22,7 +22,7 @@ from collections.abc import Iterable, Iterator import sys -from typing import Any, NotRequired, Optional, TypedDict, Union +from typing import Any, NotRequired, List, Optional, TypedDict, Union from . import tools @@ -81,18 +81,18 @@ def __str__(self): class CommandExtraArg(TypedDict): """Type definition for command extra argument.""" - type: Union[str, list[str]] - values: NotRequired[list[str]] - valid_for: NotRequired[list[str]] + type: Union[str, List[str]] + values: NotRequired[List[str]] + valid_for: NotRequired[List[str]] class CommandArg(TypedDict): """Type definition for command argument.""" name: str - type: list[str] + type: List[str] required: NotRequired[bool] - values: NotRequired[list[str]] + values: NotRequired[List[str]] extra_arg: NotRequired[CommandExtraArg] extension: NotRequired[str] extension_values: NotRequired[dict[str, str]] @@ -145,19 +145,19 @@ class Command: """ - args_definition: list[CommandArg] + args_definition: List[CommandArg] _type: str variable_args_nb: bool = False non_deterministic_args: bool = False accept_children: bool = False - must_follow: Optional[list[str]] = None + must_follow: Optional[List[str]] = None extension: Optional[str] = None def __init__(self, parent: Optional["Command"] = None): self.parent = parent self.arguments: dict[str, Any] = {} self.extra_arguments: dict[str, Any] = {} # to store tag arguments - self.children: list[Command] = [] + self.children: List[Command] = [] self.nextargpos = 0 self.required_args = -1 @@ -169,7 +169,7 @@ def __init__(self, parent: Optional["Command"] = None): self.name: str = self.__class__.__name__.replace("Command", "") self.name = self.name.lower() - self.hash_comments: list[bytes] = [] + self.hash_comments: List[bytes] = [] def __repr__(self): return "%s (type: %s)" % (self.name, self._type) @@ -245,7 +245,7 @@ def __print( else: target.write(text + "\n") - def __get_arg_type(self, arg: str) -> Optional[list[str]]: + def __get_arg_type(self, arg: str) -> Optional[List[str]]: """Return the type corresponding to the given name. :param arg: a defined argument name @@ -262,7 +262,7 @@ def complete_cb(self): """ pass - def get_expected_first(self) -> Optional[list[str]]: + def get_expected_first(self) -> Optional[List[str]]: """Return the first expected token for this command""" return None @@ -415,7 +415,7 @@ def __is_valid_value_for_arg( return True return False - def __is_valid_type(self, typ: str, typlist: list[str]) -> bool: + def __is_valid_type(self, typ: str, typlist: List[str]) -> bool: """Check if type is valid based on input type list "string" is special because it can be used for stringlist @@ -566,7 +566,7 @@ class RequireCommand(ControlCommand): {"name": "capabilities", "type": ["string", "stringlist"], "required": True} ] - loaded_extensions: list[str] = [] + loaded_extensions: List[str] = [] def complete_cb(self): if type(self.arguments["capabilities"]) != list: @@ -584,7 +584,7 @@ class IfCommand(ControlCommand): args_definition = [{"name": "test", "type": ["test"], "required": True}] - def get_expected_first(self) -> list[str]: + def get_expected_first(self) -> List[str]: return ["identifier"] @@ -593,7 +593,7 @@ class ElsifCommand(ControlCommand): must_follow = ["if", "elsif"] args_definition = [{"name": "test", "type": ["test"], "required": True}] - def get_expected_first(self) -> list[str]: + def get_expected_first(self) -> List[str]: return ["identifier"] @@ -748,7 +748,7 @@ class AllofCommand(TestCommand): args_definition = [{"name": "tests", "type": ["testlist"], "required": True}] - def get_expected_first(self) -> list[str]: + def get_expected_first(self) -> List[str]: return ["left_parenthesis"] @@ -758,7 +758,7 @@ class AnyofCommand(TestCommand): args_definition = [{"name": "tests", "type": ["testlist"], "required": True}] - def get_expected_first(self) -> list[str]: + def get_expected_first(self) -> List[str]: return ["left_parenthesis"] diff --git a/sievelib/managesieve.py b/sievelib/managesieve.py index af62431..4b1e598 100644 --- a/sievelib/managesieve.py +++ b/sievelib/managesieve.py @@ -12,7 +12,7 @@ import re import socket import ssl -from typing import Any, Optional +from typing import Any, List, Optional from .digest_md5 import DigestMD5 from . import tools @@ -213,7 +213,7 @@ def __read_response(self, nblines: int = -1) -> tuple[bytes, bytes, bytes]: return (code, data, resp) - def __prepare_args(self, args: list[Any]) -> list[bytes]: + def __prepare_args(self, args: List[Any]) -> List[bytes]: """Format command arguments before sending them. Command arguments of type string must be quoted, the only @@ -248,9 +248,9 @@ def __prepare_content(self, content: str) -> bytes: def __send_command( self, name: str, - args: Optional[list[bytes]] = None, + args: Optional[List[bytes]] = None, withcontent: bool = False, - extralines: Optional[list[bytes]] = None, + extralines: Optional[List[bytes]] = None, nblines: int = -1, ) -> tuple[str, str, bytes]: """Send a command to the server. @@ -502,7 +502,7 @@ def get_implementation(self) -> str: """ return self.__capabilities["IMPLEMENTATION"] - def get_sasl_mechanisms(self) -> list[str]: + def get_sasl_mechanisms(self) -> List[str]: """Returns the supported authentication mechanisms. They're read from server capabilities. (see the CAPABILITY @@ -605,7 +605,7 @@ def havespace(self, scriptname: str, scriptsize: int) -> bool: return False @authentication_required - def listscripts(self) -> tuple[str, list[str]]: + def listscripts(self) -> tuple[str, List[str]]: """List available scripts. See MANAGESIEVE specifications, section 2.7 @@ -615,7 +615,7 @@ def listscripts(self) -> tuple[str, list[str]]: code, data, listing = self.__send_command("LISTSCRIPTS", withcontent=True) if code == "NO": return None - ret: list[str] = [] + ret: List[str] = [] active_script: str = None for l in listing.splitlines(): if self.__size_expr.match(l): From 2f5adfb49c530b8f824cfa8347dfddf849405775 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:22:31 +0200 Subject: [PATCH 3/8] Added typing-extensions dependency --- setup.py | 2 +- sievelib/commands.py | 3 ++- sievelib/factory.py | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index e7f2971..89f4183 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ def local_scheme(version): url="https://github.com/tonioo/sievelib", license="MIT", keywords=["sieve", "managesieve", "parser", "client"], - install_requires=[], + install_requires=["typing-extensions"], setup_requires=["setuptools_scm"], use_scm_version={"local_scheme": local_scheme}, classifiers=[ diff --git a/sievelib/commands.py b/sievelib/commands.py index 64536f7..2e2febb 100644 --- a/sievelib/commands.py +++ b/sievelib/commands.py @@ -22,7 +22,8 @@ from collections.abc import Iterable, Iterator import sys -from typing import Any, NotRequired, List, Optional, TypedDict, Union +from typing import Any, List, Optional, TypedDict, Union +from typing_extensions import NotRequired from . import tools diff --git a/sievelib/factory.py b/sievelib/factory.py index 82acb03..ff5d514 100644 --- a/sievelib/factory.py +++ b/sievelib/factory.py @@ -10,7 +10,8 @@ import io import sys -from typing import NotRequired, Optional, TypedDict, Union +from typing import Optional, TypedDict, Union +from typing_extensions import NotRequired from sievelib import commands From 31e67f32122171ba0711ad1d7e71cd6f1f280341 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:24:49 +0200 Subject: [PATCH 4/8] Updated workflow --- .github/workflows/sievelib.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/sievelib.yml b/.github/workflows/sievelib.yml index 0e5b0ad..76800a2 100644 --- a/.github/workflows/sievelib.yml +++ b/.github/workflows/sievelib.yml @@ -25,6 +25,7 @@ jobs: - name: Install dependencies run: | pip install codecov pytest pytest-cov + pip install -e . - name: Run tests if: ${{ matrix.python-version != '3.11' }} run: | From 180ad55627ac3d8e9d6bfe6ef1703131d87fb7c5 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:26:19 +0200 Subject: [PATCH 5/8] Python 3.8 compat --- sievelib/tools.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sievelib/tools.py b/sievelib/tools.py index 250289d..a139c39 100644 --- a/sievelib/tools.py +++ b/sievelib/tools.py @@ -1,7 +1,9 @@ """Some tools.""" +from typing import List -def to_list(stringlist: str, unquote: bool = True) -> list[str]: + +def to_list(stringlist: str, unquote: bool = True) -> List[str]: """Convert a string representing a list to real list.""" stringlist = stringlist[1:-1] return [ From 01e89ea887f87af265a50161b1fa341f5a0bd466 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:33:09 +0200 Subject: [PATCH 6/8] Python 3.8 compat --- sievelib/commands.py | 8 ++++---- sievelib/managesieve.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sievelib/commands.py b/sievelib/commands.py index 2e2febb..43f7a97 100644 --- a/sievelib/commands.py +++ b/sievelib/commands.py @@ -22,7 +22,7 @@ from collections.abc import Iterable, Iterator import sys -from typing import Any, List, Optional, TypedDict, Union +from typing import Any, Dict, List, Optional, TypedDict, Union from typing_extensions import NotRequired from . import tools @@ -96,7 +96,7 @@ class CommandArg(TypedDict): values: NotRequired[List[str]] extra_arg: NotRequired[CommandExtraArg] extension: NotRequired[str] - extension_values: NotRequired[dict[str, str]] + extension_values: NotRequired[Dict[str, str]] # Statement elements (see RFC, section 8.3) @@ -156,8 +156,8 @@ class Command: def __init__(self, parent: Optional["Command"] = None): self.parent = parent - self.arguments: dict[str, Any] = {} - self.extra_arguments: dict[str, Any] = {} # to store tag arguments + self.arguments: Dict[str, Any] = {} + self.extra_arguments: Dict[str, Any] = {} # to store tag arguments self.children: List[Command] = [] self.nextargpos = 0 diff --git a/sievelib/managesieve.py b/sievelib/managesieve.py index 4b1e598..ba1eb38 100644 --- a/sievelib/managesieve.py +++ b/sievelib/managesieve.py @@ -12,7 +12,7 @@ import re import socket import ssl -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple from .digest_md5 import DigestMD5 from . import tools @@ -177,7 +177,7 @@ def __read_line(self) -> bytes: raise Response(m.group(1), m.group(2)) return ret - def __read_response(self, nblines: int = -1) -> tuple[bytes, bytes, bytes]: + def __read_response(self, nblines: int = -1) -> Tuple[bytes, bytes, bytes]: """Read a response from the server. In the usual case, we read lines until we find one that looks @@ -252,7 +252,7 @@ def __send_command( withcontent: bool = False, extralines: Optional[List[bytes]] = None, nblines: int = -1, - ) -> tuple[str, str, bytes]: + ) -> Tuple[str, str, bytes]: """Send a command to the server. If args is not empty, we concatenate the given command with @@ -605,7 +605,7 @@ def havespace(self, scriptname: str, scriptsize: int) -> bool: return False @authentication_required - def listscripts(self) -> tuple[str, List[str]]: + def listscripts(self) -> Tuple[str, List[str]]: """List available scripts. See MANAGESIEVE specifications, section 2.7 From 028301a76be260d9497d896d4c17315d801d0626 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:41:17 +0200 Subject: [PATCH 7/8] Python 3.8 compat for iterators --- sievelib/commands.py | 4 ++-- sievelib/parser.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sievelib/commands.py b/sievelib/commands.py index 43f7a97..5a254da 100644 --- a/sievelib/commands.py +++ b/sievelib/commands.py @@ -20,9 +20,9 @@ """ -from collections.abc import Iterable, Iterator +from collections.abc import Iterable import sys -from typing import Any, Dict, List, Optional, TypedDict, Union +from typing import Any, Dict, Iterator, List, Optional, TypedDict, Union from typing_extensions import NotRequired from . import tools diff --git a/sievelib/parser.py b/sievelib/parser.py index f24b041..c4e6b77 100755 --- a/sievelib/parser.py +++ b/sievelib/parser.py @@ -7,9 +7,9 @@ This implementation is based on RFC 5228 (http://tools.ietf.org/html/rfc5228) """ -from collections.abc import Iterator import re import sys +from typing import Iterator, Tuple from sievelib.commands import get_command_instance, CommandError, RequireCommand @@ -55,7 +55,7 @@ def curcolno(self) -> int: """Return the current column number""" return self.pos - self.text.rfind(b"\n", 0, self.pos) - def scan(self, text: bytes) -> Iterator[tuple[str, bytes]]: + def scan(self, text: bytes) -> Iterator[Tuple[str, bytes]]: """Analyse some data Analyse the passed content. Each time a token is recognized, a From 602f15e29e560bfe6f76e34b369c93e500daebd4 Mon Sep 17 00:00:00 2001 From: Antoine Nguyen Date: Wed, 3 Apr 2024 17:43:49 +0200 Subject: [PATCH 8/8] Python 3.8 list -> List --- sievelib/factory.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sievelib/factory.py b/sievelib/factory.py index ff5d514..72a5fd1 100644 --- a/sievelib/factory.py +++ b/sievelib/factory.py @@ -10,7 +10,7 @@ import io import sys -from typing import Optional, TypedDict, Union +from typing import List, Optional, TypedDict, Union from typing_extensions import NotRequired from sievelib import commands @@ -50,8 +50,8 @@ def __init__( self.name = name self.filter_name_pretext = filter_name_pretext self.filter_desc_pretext = filter_desc_pretext - self.requires: list[str] = [] - self.filters: list[Filter] = [] + self.requires: List[str] = [] + self.filters: List[Filter] = [] def __str__(self): target = io.StringIO() @@ -139,7 +139,7 @@ def __quote_if_necessary(self, value: str) -> str: return value def __build_condition( - self, condition: list[str], parent: commands.Command, tag: Optional[str] = None + self, condition: List[str], parent: commands.Command, tag: Optional[str] = None ) -> commands.Command: """Translate a condition to a valid sievelib Command. @@ -159,8 +159,8 @@ def __build_condition( def __create_filter( self, - conditions: list[tuple], - actions: list[tuple], + conditions: List[tuple], + actions: List[tuple], matchtype: str = "anyof", ) -> commands.Command: """Create a new filter @@ -304,8 +304,8 @@ def filter_exists(self, name: str) -> bool: def addfilter( self, name: str, - conditions: list[tuple], - actions: list[tuple], + conditions: List[tuple], + actions: List[tuple], matchtype: str = "anyof", ): """Add a new filter to this filters set @@ -331,8 +331,8 @@ def updatefilter( self, oldname: str, newname: str, - conditions: list[tuple], - actions: list[tuple], + conditions: List[tuple], + actions: List[tuple], matchtype: str = "anyof", ) -> bool: """Update a specific filter @@ -425,7 +425,7 @@ def get_filter_matchtype(self, name: str) -> Union[str, None]: return node.__class__.__name__.lower().replace("command", "") return None - def get_filter_conditions(self, name: str) -> Union[list[str], None]: + def get_filter_conditions(self, name: str) -> Union[List[str], None]: """Retrieve conditions of the given filter.""" flt = self.getfilter(name) if not flt: @@ -465,7 +465,7 @@ def get_filter_conditions(self, name: str) -> Union[list[str], None]: conditions.append(args) return conditions - def get_filter_actions(self, name: str) -> Union[list[str], None]: + def get_filter_actions(self, name: str) -> Union[List[str], None]: """Retrieve actions of the given filter.""" flt = self.getfilter(name) if not flt: