diff --git a/CHANGES.rst b/CHANGES.rst index 709c2e7c..22c86763 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,9 @@ ================== - Update style and linting checking [#103] +- Fix regex issue in internal configobj [#108] +- Remove bundled ``configobj`` package in favor of the ``configobj`` package + bundled into ``astropy``. [#122] 0.5.1 (2023-10-02) ================== @@ -11,7 +14,6 @@ - Move ``strun`` to entrypoints [#101] - Deprecate ``preserve_comments`` fix spec parsing for inline comments with a closing parenthesis [#107] -- Fix regex issue in internal configobj [#108] 0.5.0 (2023-04-19) ================== diff --git a/pyproject.toml b/pyproject.toml index 795cedbd..3a477767 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,9 +73,6 @@ addopts = [ "-ra", "--color=yes" ] -norecursedirs = [ - 'src/stpipe/extern', -] testpaths = [ 'tests', ] @@ -96,7 +93,6 @@ src = [ line-length = 88 extend-exclude = [ 'docs', - 'src/stpipe/extern', 'scripts/strun', ] @@ -156,14 +152,10 @@ force-exclude = ''' | \.git | \.pytest_cache | \.tox - | src/stpipe/extern )/ ) ''' -[tool.flynt] -exclude = ["src/stpipe/extern/*"] - [tool.codespell] skip="*.pdf,*.fits,*.asdf,.tox,build,./tags,.git,docs/_build" # ignore-words-list=""" diff --git a/src/stpipe/config_parser.py b/src/stpipe/config_parser.py index 7981896a..aa70d785 100644 --- a/src/stpipe/config_parser.py +++ b/src/stpipe/config_parser.py @@ -10,17 +10,17 @@ from asdf import ValidationError as AsdfValidationError from asdf import open as asdf_open - -from . import utilities -from .config import StepConfig -from .datamodel import AbstractDataModel -from .extern.configobj.configobj import ( +from astropy.extern.configobj.configobj import ( ConfigObj, Section, flatten_errors, get_extra_values, ) -from .extern.configobj.validate import ValidateError, Validator, VdtTypeError +from astropy.extern.configobj.validate import ValidateError, Validator, VdtTypeError + +from . import utilities +from .config import StepConfig +from .datamodel import AbstractDataModel from .utilities import _not_set # Configure logger diff --git a/src/stpipe/extern/__init__.py b/src/stpipe/extern/__init__.py index 76789761..e69de29b 100644 --- a/src/stpipe/extern/__init__.py +++ b/src/stpipe/extern/__init__.py @@ -1,8 +0,0 @@ -""" -This package contains python package(s) that are bundled with jwst but are -external to jwst, and hence are developed in a separate source tree. - -Currently the bundled external dependencies are: - -.. _ConfigObj: https://github.com/DiffSK/configobj -""" diff --git a/src/stpipe/extern/configobj.py b/src/stpipe/extern/configobj.py new file mode 100644 index 00000000..d0ca8701 --- /dev/null +++ b/src/stpipe/extern/configobj.py @@ -0,0 +1,20 @@ +import inspect +import sys +import warnings + +from astropy.extern import configobj + +# Add submodules to system modules so they are available under the same path +for mod in inspect.getmembers(configobj, inspect.ismodule): + # add as a local variable + locals()[mod[0]] = mod[1] + # Add the submodule to the system modules so it can be imported + sys.modules[f"stpipe.extern.configobj.{mod[0]}"] = mod[1] + + +warnings.warn( + "stpipe.extern.configobj is deprecated in favor of astropy.extern.configobj, " + "please use that instead.", + DeprecationWarning, + stacklevel=2, +) diff --git a/src/stpipe/extern/configobj/__init__.py b/src/stpipe/extern/configobj/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/stpipe/extern/configobj/configobj.py b/src/stpipe/extern/configobj/configobj.py deleted file mode 100644 index b3e8f004..00000000 --- a/src/stpipe/extern/configobj/configobj.py +++ /dev/null @@ -1,2476 +0,0 @@ -# configobj.py -# A config file reader/writer that supports nested sections in config files. -# Copyright (C) 2005-2014: -# (name) : (email) -# Michael Foord: fuzzyman AT voidspace DOT org DOT uk -# Nicola Larosa: nico AT tekNico DOT net -# Rob Dennis: rdennis AT gmail DOT com -# Eli Courtwright: eli AT courtwright DOT org - -# This software is licensed under the terms of the BSD license. -# http://opensource.org/licenses/BSD-3-Clause - -# ConfigObj 5 - main repository for documentation and issue tracking: -# https://github.com/DiffSK/configobj - -import os -import re -import sys -from collections.abc import Mapping - -from codecs import BOM_UTF8, BOM_UTF16, BOM_UTF16_BE, BOM_UTF16_LE - -# imported lazily to avoid startup performance hit if it isn't used -compiler = None - -# A dictionary mapping BOM to -# the encoding to decode with, and what to set the -# encoding attribute to. -BOMS = { - BOM_UTF8: ('utf_8', None), - BOM_UTF16_BE: ('utf16_be', 'utf_16'), - BOM_UTF16_LE: ('utf16_le', 'utf_16'), - BOM_UTF16: ('utf_16', 'utf_16'), - } -# All legal variants of the BOM codecs. -# TODO: the list of aliases is not meant to be exhaustive, is there a -# better way ? -BOM_LIST = { - 'utf_16': 'utf_16', - 'u16': 'utf_16', - 'utf16': 'utf_16', - 'utf-16': 'utf_16', - 'utf16_be': 'utf16_be', - 'utf_16_be': 'utf16_be', - 'utf-16be': 'utf16_be', - 'utf16_le': 'utf16_le', - 'utf_16_le': 'utf16_le', - 'utf-16le': 'utf16_le', - 'utf_8': 'utf_8', - 'u8': 'utf_8', - 'utf': 'utf_8', - 'utf8': 'utf_8', - 'utf-8': 'utf_8', - } - -# Map of encodings to the BOM to write. -BOM_SET = { - 'utf_8': BOM_UTF8, - 'utf_16': BOM_UTF16, - 'utf16_be': BOM_UTF16_BE, - 'utf16_le': BOM_UTF16_LE, - None: BOM_UTF8 - } - - -def match_utf8(encoding): - return BOM_LIST.get(encoding.lower()) == 'utf_8' - - -# Quote strings used for writing values -squot = "'%s'" -dquot = '"%s"' -noquot = "%s" -wspace_plus = ' \r\n\v\t\'"' -tsquot = '"""%s"""' -tdquot = "'''%s'''" - -# Sentinel for use in getattr calls to replace hasattr -MISSING = object() - -__all__ = ( - 'DEFAULT_INDENT_TYPE', - 'DEFAULT_INTERPOLATION', - 'ConfigObjError', - 'NestingError', - 'ParseError', - 'DuplicateError', - 'ConfigspecError', - 'ConfigObj', - 'SimpleVal', - 'InterpolationError', - 'InterpolationLoopError', - 'MissingInterpolationOption', - 'RepeatSectionError', - 'ReloadError', - 'UnreprError', - 'UnknownType', - 'flatten_errors', - 'get_extra_values' -) - -DEFAULT_INTERPOLATION = 'configparser' -DEFAULT_INDENT_TYPE = ' ' -MAX_INTERPOL_DEPTH = 10 - -OPTION_DEFAULTS = { - 'interpolation': True, - 'raise_errors': False, - 'list_values': True, - 'create_empty': False, - 'file_error': False, - 'configspec': None, - 'stringify': True, - # option may be set to one of ('', ' ', '\t') - 'indent_type': None, - 'encoding': None, - 'default_encoding': None, - 'unrepr': False, - 'write_empty_values': False, -} - -# this could be replaced if six is used for compatibility, or there are no -# more assertions about items being a string - - -def getObj(s): - global compiler - if compiler is None: - import compiler - s = "a=" + s - p = compiler.parse(s) - return p.getChildren()[1].getChildren()[0].getChildren()[1] - - -class UnknownType(Exception): - pass - - -class Builder: - - def build(self, o): - if m is None: - raise UnknownType(o.__class__.__name__) - return m(o) - - def build_List(self, o): - return list(map(self.build, o.getChildren())) - - def build_Const(self, o): - return o.value - - def build_Dict(self, o): - d = {} - i = iter(map(self.build, o.getChildren())) - for el in i: - d[el] = next(i) - return d - - def build_Tuple(self, o): - return tuple(self.build_List(o)) - - def build_Name(self, o): - if o.name == 'None': - return None - if o.name == 'True': - return True - if o.name == 'False': - return False - - # An undefined Name - raise UnknownType('Undefined Name') - - def build_Add(self, o): - real, imag = list(map(self.build_Const, o.getChildren())) - try: - real = float(real) - except TypeError: - raise UnknownType('Add') - if not isinstance(imag, complex) or imag.real != 0.0: - raise UnknownType('Add') - return real+imag - - def build_Getattr(self, o): - parent = self.build(o.expr) - return getattr(parent, o.attrname) - - def build_UnarySub(self, o): - return -self.build_Const(o.getChildren()[0]) - - def build_UnaryAdd(self, o): - return self.build_Const(o.getChildren()[0]) - - -_builder = Builder() - - -def unrepr(s): - if not s: - return s - - # this is supposed to be safe - import ast - return ast.literal_eval(s) - - -class ConfigObjError(SyntaxError): - """ - This is the base class for all errors that ConfigObj raises. - It is a subclass of SyntaxError. - """ - def __init__(self, message='', line_number=None, line=''): - self.line = line - self.line_number = line_number - SyntaxError.__init__(self, message) - - -class NestingError(ConfigObjError): - """ - This error indicates a level of nesting that doesn't match. - """ - - -class ParseError(ConfigObjError): - """ - This error indicates that a line is badly written. - It is neither a valid ``key = value`` line, - nor a valid section marker line. - """ - - -class ReloadError(IOError): - """ - A 'reload' operation failed. - This exception is a subclass of ``IOError``. - """ - def __init__(self): - IOError.__init__(self, 'reload failed, filename is not set.') - - -class DuplicateError(ConfigObjError): - """ - The keyword or section specified already exists. - """ - - -class ConfigspecError(ConfigObjError): - """ - An error occurred whilst parsing a configspec. - """ - - -class InterpolationError(ConfigObjError): - """Base class for the two interpolation errors.""" - - -class InterpolationLoopError(InterpolationError): - """Maximum interpolation depth exceeded in string interpolation.""" - - def __init__(self, option): - InterpolationError.__init__( - self, - 'interpolation loop detected in value "%s".' % option) - - -class RepeatSectionError(ConfigObjError): - """ - This error indicates additional sections in a section with a - ``__many__`` (repeated) section. - """ - - -class MissingInterpolationOption(InterpolationError): - """A value specified for interpolation was missing.""" - def __init__(self, option): - msg = 'missing option "%s" in interpolation.' % option - InterpolationError.__init__(self, msg) - - -class UnreprError(ConfigObjError): - """An error parsing in unrepr mode.""" - - - -class InterpolationEngine: - """ - A helper class to help perform string interpolation. - - This class is an abstract base class; its descendants perform - the actual work. - """ - - # compiled regexp to use in self.interpolate() - _KEYCRE = re.compile(r"%\(([^)]*)\)s") - _cookie = '%' - - def __init__(self, section): - # the Section instance that "owns" this engine - self.section = section - - - def interpolate(self, key, value): - # short-cut - if not self._cookie in value: - return value - - def recursive_interpolate(key, value, section, backtrail): - """The function that does the actual work. - - ``value``: the string we're trying to interpolate. - ``section``: the section in which that string was found - ``backtrail``: a dict to keep track of where we've been, - to detect and prevent infinite recursion loops - - This is similar to a depth-first-search algorithm. - """ - # Have we been here already? - if (key, section.name) in backtrail: - # Yes - infinite loop detected - raise InterpolationLoopError(key) - # Place a marker on our backtrail so we won't come back here again - backtrail[(key, section.name)] = 1 - - # Now start the actual work - match = self._KEYCRE.search(value) - while match: - # The actual parsing of the match is implementation-dependent, - # so delegate to our helper function - k, v, s = self._parse_match(match) - if k is None: - # That's the signal that no further interpolation is needed - replacement = v - else: - # Further interpolation may be needed to obtain final value - replacement = recursive_interpolate(k, v, s, backtrail) - # Replace the matched string with its final value - start, end = match.span() - value = ''.join((value[:start], replacement, value[end:])) - new_search_start = start + len(replacement) - # Pick up the next interpolation key, if any, for next time - # through the while loop - match = self._KEYCRE.search(value, new_search_start) - - # Now safe to come back here again; remove marker from backtrail - del backtrail[(key, section.name)] - - return value - - # Back in interpolate(), all we have to do is kick off the recursive - # function with appropriate starting values - value = recursive_interpolate(key, value, self.section, {}) - return value - - - def _fetch(self, key): - """Helper function to fetch values from owning section. - - Returns a 2-tuple: the value, and the section where it was found. - """ - # switch off interpolation before we try and fetch anything ! - save_interp = self.section.main.interpolation - self.section.main.interpolation = False - - # Start at section that "owns" this InterpolationEngine - current_section = self.section - while True: - # try the current section first - val = current_section.get(key) - if val is not None and not isinstance(val, Section): - break - # try "DEFAULT" next - val = current_section.get('DEFAULT', {}).get(key) - if val is not None and not isinstance(val, Section): - break - # move up to parent and try again - # top-level's parent is itself - if current_section.parent is current_section: - # reached top level, time to give up - break - current_section = current_section.parent - - # restore interpolation to previous value before returning - self.section.main.interpolation = save_interp - if val is None: - raise MissingInterpolationOption(key) - return val, current_section - - - def _parse_match(self, match): - """Implementation-dependent helper function. - - Will be passed a match object corresponding to the interpolation - key we just found (e.g., "%(foo)s" or "$foo"). Should look up that - key in the appropriate config file section (using the ``_fetch()`` - helper function) and return a 3-tuple: (key, value, section) - - ``key`` is the name of the key we're looking for - ``value`` is the value found for that key - ``section`` is a reference to the section where it was found - - ``key`` and ``section`` should be None if no further - interpolation should be performed on the resulting value - (e.g., if we interpolated "$$" and returned "$"). - """ - raise NotImplementedError() - - - -class ConfigParserInterpolation(InterpolationEngine): - """Behaves like ConfigParser.""" - _cookie = '%' - _KEYCRE = re.compile(r"%\(([^)]*)\)s") - - def _parse_match(self, match): - key = match.group(1) - value, section = self._fetch(key) - return key, value, section - - - -class TemplateInterpolation(InterpolationEngine): - """Behaves like string.Template.""" - _cookie = '$' - _delimiter = '$' - _KEYCRE = re.compile(r""" - \$(?: - (?P\$) | # Two $ signs - (?P[_a-z][_a-z0-9]*) | # $name format - {(?P[^}]*)} # ${name} format - ) - """, re.IGNORECASE | re.VERBOSE) - - def _parse_match(self, match): - # Valid name (in or out of braces): fetch value from section - key = match.group('named') or match.group('braced') - if key is not None: - value, section = self._fetch(key) - return key, value, section - # Escaped delimiter (e.g., $$): return single delimiter - if match.group('escaped') is not None: - # Return None for key and section to indicate it's time to stop - return None, self._delimiter, None - # Anything else: ignore completely, just return it unchanged - return None, match.group(), None - - -interpolation_engines = { - 'configparser': ConfigParserInterpolation, - 'template': TemplateInterpolation, -} - - -def __newobj__(cls, *args): - # Hack for pickle - return cls.__new__(cls, *args) - -class Section(dict): - """ - A dictionary-like object that represents a section in a config file. - - It does string interpolation if the 'interpolation' attribute - of the 'main' object is set to True. - - Interpolation is tried first from this object, then from the 'DEFAULT' - section of this object, next from the parent and its 'DEFAULT' section, - and so on until the main object is reached. - - A Section will behave like an ordered dictionary - following the - order of the ``scalars`` and ``sections`` attributes. - You can use this to change the order of members. - - Iteration follows the order: scalars, then sections. - """ - - - def __setstate__(self, state): - dict.update(self, state[0]) - self.__dict__.update(state[1]) - - def __reduce__(self): - state = (dict(self), self.__dict__) - return (__newobj__, (self.__class__,), state) - - - def __init__(self, parent, depth, main, indict=None, name=None): - """ - * parent is the section above - * depth is the depth level of this section - * main is the main ConfigObj - * indict is a dictionary to initialise the section with - """ - if indict is None: - indict = {} - dict.__init__(self) - # used for nesting level *and* interpolation - self.parent = parent - # used for the interpolation attribute - self.main = main - # level of nesting depth of this Section - self.depth = depth - # purely for information - self.name = name - # - self._initialise() - # we do this explicitly so that __setitem__ is used properly - # (rather than just passing to ``dict.__init__``) - for entry, value in indict.items(): - self[entry] = value - - - def _initialise(self): - # the sequence of scalar values in this Section - self.scalars = [] - # the sequence of sections in this Section - self.sections = [] - # for comments :-) - self.comments = {} - self.inline_comments = {} - # the configspec - self.configspec = None - # for defaults - self.defaults = [] - self.default_values = {} - self.extra_values = [] - self._created = False - - - def _interpolate(self, key, value): - try: - # do we already have an interpolation engine? - engine = self._interpolation_engine - except AttributeError: - # not yet: first time running _interpolate(), so pick the engine - name = self.main.interpolation - if name == True: # note that "if name:" would be incorrect here - # backwards-compatibility: interpolation=True means use default - name = DEFAULT_INTERPOLATION - name = name.lower() # so that "Template", "template", etc. all work - class_ = interpolation_engines.get(name, None) - if class_ is None: - # invalid value for self.main.interpolation - self.main.interpolation = False - return value - else: - # save reference to engine so we don't have to do this again - engine = self._interpolation_engine = class_(self) - # let the engine do the actual work - return engine.interpolate(key, value) - - - def __getitem__(self, key): - """Fetch the item and do string interpolation.""" - val = dict.__getitem__(self, key) - if self.main.interpolation: - if isinstance(val, str): - return self._interpolate(key, val) - if isinstance(val, list): - def _check(entry): - if isinstance(entry, str): - return self._interpolate(key, entry) - return entry - new = [_check(entry) for entry in val] - if new != val: - return new - return val - - - def __setitem__(self, key, value, unrepr=False): - """ - Correctly set a value. - - Making dictionary values Section instances. - (We have to special case 'Section' instances - which are also dicts) - - Keys must be strings. - Values need only be strings (or lists of strings) if - ``main.stringify`` is set. - - ``unrepr`` must be set when setting a value to a dictionary, without - creating a new sub-section. - """ - if not isinstance(key, str): - raise ValueError('The key "%s" is not a string.' % key) - - # add the comment - if key not in self.comments: - self.comments[key] = [] - self.inline_comments[key] = '' - # remove the entry from defaults - if key in self.defaults: - self.defaults.remove(key) - # - if isinstance(value, Section): - if key not in self: - self.sections.append(key) - dict.__setitem__(self, key, value) - elif isinstance(value, Mapping) and not unrepr: - # First create the new depth level, - # then create the section - if key not in self: - self.sections.append(key) - new_depth = self.depth + 1 - dict.__setitem__( - self, - key, - Section( - self, - new_depth, - self.main, - indict=value, - name=key)) - else: - if key not in self: - self.scalars.append(key) - if not self.main.stringify: - if isinstance(value, str): - pass - elif isinstance(value, (list, tuple)): - for entry in value: - if not isinstance(entry, str): - raise TypeError('Value is not a string "%s".' % entry) - else: - raise TypeError('Value is not a string "%s".' % value) - dict.__setitem__(self, key, value) - - - def __delitem__(self, key): - """Remove items from the sequence when deleting.""" - dict. __delitem__(self, key) - if key in self.scalars: - self.scalars.remove(key) - else: - self.sections.remove(key) - del self.comments[key] - del self.inline_comments[key] - - - def get(self, key, default=None): - """A version of ``get`` that doesn't bypass string interpolation.""" - try: - return self[key] - except KeyError: - return default - - - def update(self, indict): - """ - A version of update that uses our ``__setitem__``. - """ - for entry in indict: - self[entry] = indict[entry] - - - def pop(self, key, default=MISSING): - """ - 'D.pop(k[,d]) -> v, remove specified key and return the corresponding value. - If key is not found, d is returned if given, otherwise KeyError is raised' - """ - try: - val = self[key] - except KeyError: - if default is MISSING: - raise - val = default - else: - del self[key] - return val - - - def popitem(self): - """Pops the first (key,val)""" - sequence = (self.scalars + self.sections) - if not sequence: - raise KeyError(": 'popitem(): dictionary is empty'") - key = sequence[0] - val = self[key] - del self[key] - return key, val - - - def clear(self): - """ - A version of clear that also affects scalars/sections - Also clears comments and configspec. - - Leaves other attributes alone : - depth/main/parent are not affected - """ - dict.clear(self) - self.scalars = [] - self.sections = [] - self.comments = {} - self.inline_comments = {} - self.configspec = None - self.defaults = [] - self.extra_values = [] - - - def setdefault(self, key, default=None): - """A version of setdefault that sets sequence if appropriate.""" - try: - return self[key] - except KeyError: - self[key] = default - return self[key] - - - def items(self): - """D.items() -> list of D's (key, value) pairs, as 2-tuples""" - return list(zip((self.scalars + self.sections), list(self.values()))) - - - def keys(self): - """D.keys() -> list of D's keys""" - return (self.scalars + self.sections) - - - def values(self): - """D.values() -> list of D's values""" - return [self[key] for key in (self.scalars + self.sections)] - - - def iteritems(self): - """D.iteritems() -> an iterator over the (key, value) items of D""" - return iter(list(self.items())) - - - def iterkeys(self): - """D.iterkeys() -> an iterator over the keys of D""" - return iter(self.scalars + self.sections) - - __iter__ = iterkeys - - - def itervalues(self): - """D.itervalues() -> an iterator over the values of D""" - return iter(list(self.values())) - - - def __repr__(self): - """x.__repr__() <==> repr(x)""" - def _getval(key): - try: - return self[key] - except MissingInterpolationOption: - return dict.__getitem__(self, key) - return '{%s}' % ', '.join([(f'{repr(key)}: {repr(_getval(key))}') - for key in (self.scalars + self.sections)]) - - __str__ = __repr__ - __str__.__doc__ = "x.__str__() <==> str(x)" - - - # Extra methods - not in a normal dictionary - - def dict(self): - """ - Return a deepcopy of self as a dictionary. - - All members that are ``Section`` instances are recursively turned to - ordinary dictionaries - by calling their ``dict`` method. - - >>> n = a.dict() - >>> n == a - 1 - >>> n is a - 0 - """ - newdict = {} - for entry in self: - this_entry = self[entry] - if isinstance(this_entry, Section): - this_entry = this_entry.dict() - elif isinstance(this_entry, list): - # create a copy rather than a reference - this_entry = list(this_entry) - elif isinstance(this_entry, tuple): - # create a copy rather than a reference - this_entry = tuple(this_entry) - newdict[entry] = this_entry - return newdict - - - def merge(self, indict): - """ - A recursive update - useful for merging config files. - - >>> a = '''[section1] - ... option1 = True - ... [[subsection]] - ... more_options = False - ... # end of file'''.splitlines() - >>> b = '''# File is user.ini - ... [section1] - ... option1 = False - ... # end of file'''.splitlines() - >>> c1 = ConfigObj(b) - >>> c2 = ConfigObj(a) - >>> c2.merge(c1) - >>> c2 - ConfigObj({'section1': {'option1': 'False', 'subsection': {'more_options': 'False'}}}) - """ - for key, val in list(indict.items()): - if (key in self and isinstance(self[key], Mapping) and - isinstance(val, Mapping)): - self[key].merge(val) - else: - self[key] = val - - - def rename(self, oldkey, newkey): - """ - Change a keyname to another, without changing position in sequence. - - Implemented so that transformations can be made on keys, - as well as on values. (used by encode and decode) - - Also renames comments. - """ - if oldkey in self.scalars: - the_list = self.scalars - elif oldkey in self.sections: - the_list = self.sections - else: - raise KeyError('Key "%s" not found.' % oldkey) - pos = the_list.index(oldkey) - # - val = self[oldkey] - dict.__delitem__(self, oldkey) - dict.__setitem__(self, newkey, val) - the_list.remove(oldkey) - the_list.insert(pos, newkey) - comm = self.comments[oldkey] - inline_comment = self.inline_comments[oldkey] - del self.comments[oldkey] - del self.inline_comments[oldkey] - self.comments[newkey] = comm - self.inline_comments[newkey] = inline_comment - - - def walk(self, function, raise_errors=True, - call_on_sections=False, **keywargs): - """ - Walk every member and call a function on the keyword and value. - - Return a dictionary of the return values - - If the function raises an exception, raise the error - unless ``raise_errors=False``, in which case set the return value to - ``False``. - - Any unrecognised keyword arguments you pass to walk, will be passed on - to the function you pass in. - - Note: if ``call_on_sections`` is ``True`` then - on encountering a - subsection, *first* the function is called for the *whole* subsection, - and then recurses into it's members. This means your function must be - able to handle strings, dictionaries and lists. This allows you - to change the key of subsections as well as for ordinary members. The - return value when called on the whole subsection has to be discarded. - - See the encode and decode methods for examples, including functions. - - .. admonition:: caution - - You can use ``walk`` to transform the names of members of a section - but you mustn't add or delete members. - - >>> config = '''[XXXXsection] - ... XXXXkey = XXXXvalue'''.splitlines() - >>> cfg = ConfigObj(config) - >>> cfg - ConfigObj({'XXXXsection': {'XXXXkey': 'XXXXvalue'}}) - >>> def transform(section, key): - ... val = section[key] - ... newkey = key.replace('XXXX', 'CLIENT1') - ... section.rename(key, newkey) - ... if isinstance(val, (tuple, list, dict)): - ... pass - ... else: - ... val = val.replace('XXXX', 'CLIENT1') - ... section[newkey] = val - >>> cfg.walk(transform, call_on_sections=True) - {'CLIENT1section': {'CLIENT1key': None}} - >>> cfg - ConfigObj({'CLIENT1section': {'CLIENT1key': 'CLIENT1value'}}) - """ - out = {} - # scalars first - for i in range(len(self.scalars)): - entry = self.scalars[i] - try: - val = function(self, entry, **keywargs) - # bound again in case name has changed - entry = self.scalars[i] - out[entry] = val - except Exception: - if raise_errors: - raise - else: - entry = self.scalars[i] - out[entry] = False - # then sections - for i in range(len(self.sections)): - entry = self.sections[i] - if call_on_sections: - try: - function(self, entry, **keywargs) - except Exception: - if raise_errors: - raise - else: - entry = self.sections[i] - out[entry] = False - # bound again in case name has changed - entry = self.sections[i] - # previous result is discarded - out[entry] = self[entry].walk( - function, - raise_errors=raise_errors, - call_on_sections=call_on_sections, - **keywargs) - return out - - - def as_bool(self, key): - """ - Accepts a key as input. The corresponding value must be a string or - the objects (``True`` or 1) or (``False`` or 0). We allow 0 and 1 to - retain compatibility with Python 2.2. - - If the string is one of ``True``, ``On``, ``Yes``, or ``1`` it returns - ``True``. - - If the string is one of ``False``, ``Off``, ``No``, or ``0`` it returns - ``False``. - - ``as_bool`` is not case sensitive. - - Any other input will raise a ``ValueError``. - - >>> a = ConfigObj() - >>> a['a'] = 'fish' - >>> a.as_bool('a') - Traceback (most recent call last): - ValueError: Value "fish" is neither True nor False - >>> a['b'] = 'True' - >>> a.as_bool('b') - 1 - >>> a['b'] = 'off' - >>> a.as_bool('b') - 0 - """ - val = self[key] - if val == True: - return True - elif val == False: - return False - else: - try: - if not isinstance(val, str): - # TODO: Why do we raise a KeyError here? - raise KeyError() - else: - return self.main._bools[val.lower()] - except KeyError: - raise ValueError('Value "%s" is neither True nor False' % val) - - - def as_int(self, key): - """ - A convenience method which coerces the specified value to an integer. - - If the value is an invalid literal for ``int``, a ``ValueError`` will - be raised. - - >>> a = ConfigObj() - >>> a['a'] = 'fish' - >>> a.as_int('a') - Traceback (most recent call last): - ValueError: invalid literal for int() with base 10: 'fish' - >>> a['b'] = '1' - >>> a.as_int('b') - 1 - >>> a['b'] = '3.2' - >>> a.as_int('b') - Traceback (most recent call last): - ValueError: invalid literal for int() with base 10: '3.2' - """ - return int(self[key]) - - - def as_float(self, key): - """ - A convenience method which coerces the specified value to a float. - - If the value is an invalid literal for ``float``, a ``ValueError`` will - be raised. - - >>> a = ConfigObj() - >>> a['a'] = 'fish' - >>> a.as_float('a') #doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: invalid literal for float(): fish - >>> a['b'] = '1' - >>> a.as_float('b') - 1.0 - >>> a['b'] = '3.2' - >>> a.as_float('b') #doctest: +ELLIPSIS - 3.2... - """ - return float(self[key]) - - - def as_list(self, key): - """ - A convenience method which fetches the specified value, guaranteeing - that it is a list. - - >>> a = ConfigObj() - >>> a['a'] = 1 - >>> a.as_list('a') - [1] - >>> a['a'] = (1,) - >>> a.as_list('a') - [1] - >>> a['a'] = [1] - >>> a.as_list('a') - [1] - """ - result = self[key] - if isinstance(result, (tuple, list)): - return list(result) - return [result] - - - def restore_default(self, key): - """ - Restore (and return) default value for the specified key. - - This method will only work for a ConfigObj that was created - with a configspec and has been validated. - - If there is no default value for this key, ``KeyError`` is raised. - """ - default = self.default_values[key] - dict.__setitem__(self, key, default) - if key not in self.defaults: - self.defaults.append(key) - return default - - - def restore_defaults(self): - """ - Recursively restore default values to all members - that have them. - - This method will only work for a ConfigObj that was created - with a configspec and has been validated. - - It doesn't delete or modify entries without default values. - """ - for key in self.default_values: - self.restore_default(key) - - for section in self.sections: - self[section].restore_defaults() - - -class ConfigObj(Section): - """An object to read, create, and write config files.""" - - _keyword = re.compile(r'''^ # line start - (\s*) # indentation - ( # keyword - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'"=].*?) # no quotes - ) - \s*=\s* # divider - (.*) # value (including list values and comments) - $ # line end - ''', - re.VERBOSE) - - _sectionmarker = re.compile(r'''^ - (\s*) # 1: indentation - ((?:\[\s*)+) # 2: section marker open - ( # 3: section name open - (?:"\s*\S.*?\s*")| # at least one non-space with double quotes - (?:'\s*\S.*?\s*')| # at least one non-space with single quotes - (?:[^'"\s].*?) # at least one non-space unquoted - ) # section name close - ((?:\s*\])+) # 4: section marker close - \s*(\#.*)? # 5: optional comment - $''', - re.VERBOSE) - - # this regexp pulls list values out as a single string - # or single values and comments - # FIXME: this regex adds a '' to the end of comma terminated lists - # workaround in ``_handle_value`` - _valueexp = re.compile(r'''^ - (?: - (?: - ( - (?: - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\#][^,\#]*?) # unquoted - ) - \s*,\s* # comma - )* # match all list items ending in a comma (if any) - ) - ( - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\#\s][^,]*?)| # unquoted - (?:(? 1: - msg = "Parsing failed with several errors.\nFirst error %s" % info - error = ConfigObjError(msg) - else: - error = self._errors[0] - # set the errors attribute; it's a list of tuples: - # (error_type, message, line_number) - error.errors = self._errors - # set the config attribute - error.config = self - raise error - # delete private attributes - del self._errors - - if configspec is None: - self.configspec = None - else: - self._handle_configspec(configspec) - - - def _initialise(self, options=None): - if options is None: - options = OPTION_DEFAULTS - - # initialise a few variables - self.filename = None - self._errors = [] - self.raise_errors = options['raise_errors'] - self.interpolation = options['interpolation'] - self.list_values = options['list_values'] - self.create_empty = options['create_empty'] - self.file_error = options['file_error'] - self.stringify = options['stringify'] - self.indent_type = options['indent_type'] - self.encoding = options['encoding'] - self.default_encoding = options['default_encoding'] - self.BOM = False - self.newlines = None - self.write_empty_values = options['write_empty_values'] - self.unrepr = options['unrepr'] - - self.initial_comment = [] - self.final_comment = [] - self.configspec = None - - if self._inspec: - self.list_values = False - - # Clear section attributes as well - Section._initialise(self) - - - def __repr__(self): - def _getval(key): - try: - return self[key] - except MissingInterpolationOption: - return dict.__getitem__(self, key) - return ('{}({{{}}})'.format(self.__class__.__name__, - ', '.join([(f'{repr(key)}: {repr(_getval(key))}') - for key in (self.scalars + self.sections)]))) - - - def _handle_bom(self, infile): - """ - Handle any BOM, and decode if necessary. - - If an encoding is specified, that *must* be used - but the BOM should - still be removed (and the BOM attribute set). - - (If the encoding is wrongly specified, then a BOM for an alternative - encoding won't be discovered or removed.) - - If an encoding is not specified, UTF8 or UTF16 BOM will be detected and - removed. The BOM attribute will be set. UTF16 will be decoded to - unicode. - - NOTE: This method must not be called with an empty ``infile``. - - Specifying the *wrong* encoding is likely to cause a - ``UnicodeDecodeError``. - - ``infile`` must always be returned as a list of lines, but may be - passed in as a single string. - """ - - if ((self.encoding is not None) and - (self.encoding.lower() not in BOM_LIST)): - # No need to check for a BOM - # the encoding specified doesn't have one - # just decode - return self._decode(infile, self.encoding) - - if isinstance(infile, (list, tuple)): - line = infile[0] - else: - line = infile - - if isinstance(line, str): - # it's already decoded and there's no need to do anything - # else, just use the _decode utility method to handle - # listifying appropriately - return self._decode(infile, self.encoding) - - if self.encoding is not None: - # encoding explicitly supplied - # And it could have an associated BOM - # TODO: if encoding is just UTF16 - we ought to check for both - # TODO: big endian and little endian versions. - enc = BOM_LIST[self.encoding.lower()] - if enc == 'utf_16': - # For UTF16 we try big endian and little endian - for BOM, (encoding, final_encoding) in list(BOMS.items()): - if not final_encoding: - # skip UTF8 - continue - if infile.startswith(BOM): - ### BOM discovered - ##self.BOM = True - # Don't need to remove BOM - return self._decode(infile, encoding) - - # If we get this far, will *probably* raise a DecodeError - # As it doesn't appear to start with a BOM - return self._decode(infile, self.encoding) - - # Must be UTF8 - BOM = BOM_SET[enc] - if not line.startswith(BOM): - return self._decode(infile, self.encoding) - - newline = line[len(BOM):] - - # BOM removed - if isinstance(infile, (list, tuple)): - infile[0] = newline - else: - infile = newline - self.BOM = True - return self._decode(infile, self.encoding) - - # No encoding specified - so we need to check for UTF8/UTF16 - for BOM, (encoding, final_encoding) in list(BOMS.items()): - if not isinstance(line, bytes) or not line.startswith(BOM): - # didn't specify a BOM, or it's not a bytestring - continue - else: - # BOM discovered - self.encoding = final_encoding - if not final_encoding: - self.BOM = True - # UTF8 - # remove BOM - newline = line[len(BOM):] - if isinstance(infile, (list, tuple)): - infile[0] = newline - else: - infile = newline - # UTF-8 - if isinstance(infile, str): - return infile.splitlines(True) - elif isinstance(infile, bytes): - return infile.decode('utf-8').splitlines(True) - else: - return self._decode(infile, 'utf-8') - # UTF16 - have to decode - return self._decode(infile, encoding) - - # No BOM discovered and no encoding specified, default to UTF-8 - if isinstance(infile, bytes): - return infile.decode('utf-8').splitlines(True) - else: - return self._decode(infile, 'utf-8') - - - def _a_to_u(self, aString): - """Decode ASCII strings to unicode if a self.encoding is specified.""" - if isinstance(aString, bytes) and self.encoding: - return aString.decode(self.encoding) - else: - return aString - - - def _decode(self, infile, encoding): - """ - Decode infile to unicode. Using the specified encoding. - - if is a string, it also needs converting to a list. - """ - if isinstance(infile, str): - return infile.splitlines(True) - if isinstance(infile, bytes): - # NOTE: Could raise a ``UnicodeDecodeError`` - if encoding: - return infile.decode(encoding).splitlines(True) - else: - return infile.splitlines(True) - - if encoding: - for i, line in enumerate(infile): - if isinstance(line, bytes): - # NOTE: The isinstance test here handles mixed lists of unicode/string - # NOTE: But the decode will break on any non-string values - # NOTE: Or could raise a ``UnicodeDecodeError`` - infile[i] = line.decode(encoding) - return infile - - - def _decode_element(self, line): - """Decode element to unicode if necessary.""" - if isinstance(line, bytes) and self.default_encoding: - return line.decode(self.default_encoding) - else: - return line - - - # TODO: this may need to be modified - def _str(self, value): - """ - Used by ``stringify`` within validate, to turn non-string values - into strings. - """ - if not isinstance(value, str): - # intentionally 'str' because it's just whatever the "normal" - # string type is for the python version we're dealing with - return str(value) - else: - return value - - - def _parse(self, infile): - """Actually parse the config file.""" - temp_list_values = self.list_values - if self.unrepr: - self.list_values = False - - comment_list = [] - done_start = False - this_section = self - maxline = len(infile) - 1 - cur_index = -1 - reset_comment = False - - while cur_index < maxline: - if reset_comment: - comment_list = [] - cur_index += 1 - line = infile[cur_index] - sline = line.strip() - # do we have anything on the line ? - if not sline or sline.startswith('#'): - reset_comment = False - comment_list.append(line) - continue - - if not done_start: - # preserve initial comment - self.initial_comment = comment_list - comment_list = [] - done_start = True - - reset_comment = True - # first we check if it's a section marker - mat = self._sectionmarker.match(line) - if mat is not None: - # is a section line - (indent, sect_open, sect_name, sect_close, comment) = mat.groups() - if indent and (self.indent_type is None): - self.indent_type = indent - cur_depth = sect_open.count('[') - if cur_depth != sect_close.count(']'): - self._handle_error("Cannot compute the section depth", - NestingError, infile, cur_index) - continue - - if cur_depth < this_section.depth: - # the new section is dropping back to a previous level - try: - parent = self._match_depth(this_section, - cur_depth).parent - except SyntaxError: - self._handle_error("Cannot compute nesting level", - NestingError, infile, cur_index) - continue - elif cur_depth == this_section.depth: - # the new section is a sibling of the current section - parent = this_section.parent - elif cur_depth == this_section.depth + 1: - # the new section is a child the current section - parent = this_section - else: - self._handle_error("Section too nested", - NestingError, infile, cur_index) - continue - - sect_name = self._unquote(sect_name) - if sect_name in parent: - self._handle_error('Duplicate section name', - DuplicateError, infile, cur_index) - continue - - # create the new section - this_section = Section( - parent, - cur_depth, - self, - name=sect_name) - parent[sect_name] = this_section - parent.inline_comments[sect_name] = comment - parent.comments[sect_name] = comment_list - continue - # - # it's not a section marker, - # so it should be a valid ``key = value`` line - mat = self._keyword.match(line) - if mat is None: - self._handle_error( - f'Invalid line ({line!r}) (matched as neither section nor keyword)', - ParseError, infile, cur_index) - else: - # is a keyword value - # value will include any inline comment - (indent, key, value) = mat.groups() - if indent and (self.indent_type is None): - self.indent_type = indent - # check for a multiline value - if value[:3] in ['"""', "'''"]: - try: - value, comment, cur_index = self._multiline( - value, infile, cur_index, maxline) - except SyntaxError: - self._handle_error( - 'Parse error in multiline value', - ParseError, infile, cur_index) - continue - else: - if self.unrepr: - comment = '' - try: - value = unrepr(value) - except Exception as e: - if type(e) == UnknownType: - msg = 'Unknown name or type in value' - else: - msg = 'Parse error from unrepr-ing multiline value' - self._handle_error(msg, UnreprError, infile, - cur_index) - continue - else: - if self.unrepr: - comment = '' - try: - value = unrepr(value) - except Exception as e: - if isinstance(e, UnknownType): - msg = 'Unknown name or type in value' - else: - msg = 'Parse error from unrepr-ing value' - self._handle_error(msg, UnreprError, infile, - cur_index) - continue - else: - # extract comment and lists - try: - (value, comment) = self._handle_value(value) - except SyntaxError: - self._handle_error( - 'Parse error in value', - ParseError, infile, cur_index) - continue - # - key = self._unquote(key) - if key in this_section: - self._handle_error( - 'Duplicate keyword name', - DuplicateError, infile, cur_index) - continue - # add the key. - # we set unrepr because if we have got this far we will never - # be creating a new section - this_section.__setitem__(key, value, unrepr=True) - this_section.inline_comments[key] = comment - this_section.comments[key] = comment_list - continue - # - if self.indent_type is None: - # no indentation used, set the type accordingly - self.indent_type = '' - - # preserve the final comment - if not self and not self.initial_comment: - self.initial_comment = comment_list - elif not reset_comment: - self.final_comment = comment_list - self.list_values = temp_list_values - - - def _match_depth(self, sect, depth): - """ - Given a section and a depth level, walk back through the sections - parents to see if the depth level matches a previous section. - - Return a reference to the right section, - or raise a SyntaxError. - """ - while depth < sect.depth: - if sect is sect.parent: - # we've reached the top level already - raise SyntaxError() - sect = sect.parent - if sect.depth == depth: - return sect - # shouldn't get here - raise SyntaxError() - - - def _handle_error(self, text, ErrorClass, infile, cur_index): - """ - Handle an error according to the error settings. - - Either raise the error or store it. - The error will have occurred at ``cur_index`` - """ - line = infile[cur_index] - cur_index += 1 - message = f'{text} at line {cur_index}.' - error = ErrorClass(message, cur_index, line) - if self.raise_errors: - # raise the error - parsing stops here - raise error - # store the error - # reraise when parsing has finished - self._errors.append(error) - - - def _unquote(self, value): - """Return an unquoted version of a value""" - if not value: - # should only happen during parsing of lists - raise SyntaxError - if (value[0] == value[-1]) and (value[0] in ('"', "'")): - value = value[1:-1] - return value - - - def _quote(self, value, multiline=True): - """ - Return a safely quoted version of a value. - - Raise a ConfigObjError if the value cannot be safely quoted. - If multiline is ``True`` (default) then use triple quotes - if necessary. - - * Don't quote values that don't need it. - * Recursively quote members of a list and return a comma joined list. - * Multiline is ``False`` for lists. - * Obey list syntax for empty and single member lists. - - If ``list_values=False`` then the value is only quoted if it contains - a ``\\n`` (is multiline) or '#'. - - If ``write_empty_values`` is set, and the value is an empty string, it - won't be quoted. - """ - if multiline and self.write_empty_values and value == '': - # Only if multiline is set, so that it is used for values not - # keys, and not values that are part of a list - return '' - - if multiline and isinstance(value, (list, tuple)): - if not value: - return ',' - elif len(value) == 1: - return self._quote(value[0], multiline=False) + ',' - return ', '.join([self._quote(val, multiline=False) - for val in value]) - if not isinstance(value, str): - if self.stringify: - # intentionally 'str' because it's just whatever the "normal" - # string type is for the python version we're dealing with - value = str(value) - else: - raise TypeError('Value "%s" is not a string.' % value) - - if not value: - return '""' - - no_lists_no_quotes = not self.list_values and '\n' not in value and '#' not in value - need_triple = multiline and ((("'" in value) and ('"' in value)) or ('\n' in value )) - hash_triple_quote = multiline and not need_triple and ("'" in value) and ('"' in value) and ('#' in value) - check_for_single = (no_lists_no_quotes or not need_triple) and not hash_triple_quote - - if check_for_single: - if not self.list_values: - # we don't quote if ``list_values=False`` - quot = noquot - # for normal values either single or double quotes will do - elif '\n' in value: - # will only happen if multiline is off - e.g. '\n' in key - raise ConfigObjError('Value "%s" cannot be safely quoted.' % value) - elif ((value[0] not in wspace_plus) and - (value[-1] not in wspace_plus) and - (',' not in value)): - quot = noquot - else: - quot = self._get_single_quote(value) - else: - # if value has '\n' or "'" *and* '"', it will need triple quotes - quot = self._get_triple_quote(value) - - if quot == noquot and '#' in value and self.list_values: - quot = self._get_single_quote(value) - - return quot % value - - - def _get_single_quote(self, value): - if ("'" in value) and ('"' in value): - raise ConfigObjError('Value "%s" cannot be safely quoted.' % value) - elif '"' in value: - quot = squot - else: - quot = dquot - return quot - - - def _get_triple_quote(self, value): - if (value.find('"""') != -1) and (value.find("'''") != -1): - raise ConfigObjError('Value "%s" cannot be safely quoted.' % value) - if value.find('"""') == -1: - quot = tdquot - else: - quot = tsquot - return quot - - - def _handle_value(self, value): - """ - Given a value string, unquote, remove comment, - handle lists. (including empty and single member lists) - """ - if self._inspec: - # Parsing a configspec so don't handle comments - return (value, '') - # do we look for lists in values ? - if not self.list_values: - mat = self._nolistvalue.match(value) - if mat is None: - raise SyntaxError() - # NOTE: we don't unquote here - return mat.groups() - # - mat = self._valueexp.match(value) - if mat is None: - # the value is badly constructed, probably badly quoted, - # or an invalid list - raise SyntaxError() - (list_values, single, empty_list, comment) = mat.groups() - if (list_values == '') and (single is None): - # change this if you want to accept empty values - raise SyntaxError() - # NOTE: note there is no error handling from here if the regex - # is wrong: then incorrect values will slip through - if empty_list is not None: - # the single comma - meaning an empty list - return ([], comment) - if single is not None: - # handle empty values - if list_values and not single: - # FIXME: the '' is a workaround because our regex now matches - # '' at the end of a list if it has a trailing comma - single = None - else: - single = single or '""' - single = self._unquote(single) - if list_values == '': - # not a list value - return (single, comment) - the_list = self._listvalueexp.findall(list_values) - the_list = [self._unquote(val) for val in the_list] - if single is not None: - the_list += [single] - return (the_list, comment) - - - def _multiline(self, value, infile, cur_index, maxline): - """Extract the value, where we are in a multiline situation.""" - quot = value[:3] - newvalue = value[3:] - single_line = self._triple_quote[quot][0] - multi_line = self._triple_quote[quot][1] - mat = single_line.match(value) - if mat is not None: - retval = list(mat.groups()) - retval.append(cur_index) - return retval - elif newvalue.find(quot) != -1: - # somehow the triple quote is missing - raise SyntaxError() - # - while cur_index < maxline: - cur_index += 1 - newvalue += '\n' - line = infile[cur_index] - if line.find(quot) == -1: - newvalue += line - else: - # end of multiline, process it - break - else: - # we've got to the end of the config, oops... - raise SyntaxError() - mat = multi_line.match(line) - if mat is None: - # a badly formed line - raise SyntaxError() - (value, comment) = mat.groups() - return (newvalue + value, comment, cur_index) - - - def _handle_configspec(self, configspec): - """Parse the configspec.""" - # FIXME: Should we check that the configspec was created with the - # correct settings ? (i.e. ``list_values=False``) - if not isinstance(configspec, ConfigObj): - try: - configspec = ConfigObj(configspec, - raise_errors=True, - file_error=True, - _inspec=True) - except ConfigObjError as e: - # FIXME: Should these errors have a reference - # to the already parsed ConfigObj ? - raise ConfigspecError('Parsing configspec failed: %s' % e) - except OSError as e: - raise OSError('Reading configspec failed: %s' % e) - - self.configspec = configspec - - - - def _set_configspec(self, section, copy): - """ - Called by validate. Handles setting the configspec on subsections - including sections to be validated by __many__ - """ - configspec = section.configspec - many = configspec.get('__many__') - if isinstance(many, dict): - for entry in section.sections: - if entry not in configspec: - section[entry].configspec = many - - for entry in configspec.sections: - if entry == '__many__': - continue - if entry not in section: - section[entry] = {} - section[entry]._created = True - if copy: - # copy comments - section.comments[entry] = configspec.comments.get(entry, []) - section.inline_comments[entry] = configspec.inline_comments.get(entry, '') - - # Could be a scalar when we expect a section - if isinstance(section[entry], Section): - section[entry].configspec = configspec[entry] - - - def _write_line(self, indent_string, entry, this_entry, comment): - """Write an individual line, for the write method""" - # NOTE: the calls to self._quote here handles non-StringType values. - if not self.unrepr: - val = self._decode_element(self._quote(this_entry)) - else: - val = repr(this_entry) - return '{}{}{}{}{}'.format(indent_string, - self._decode_element(self._quote(entry, multiline=False)), - self._a_to_u(' = '), - val, - self._decode_element(comment)) - - - def _write_marker(self, indent_string, depth, entry, comment): - """Write a section marker line""" - return '{}{}{}{}{}'.format(indent_string, - self._a_to_u('[' * depth), - self._quote(self._decode_element(entry), multiline=False), - self._a_to_u(']' * depth), - self._decode_element(comment)) - - - def _handle_comment(self, comment): - """Deal with a comment.""" - if not comment: - return '' - start = self.indent_type - if not comment.startswith('#'): - start += self._a_to_u(' # ') - return (start + comment) - - - # Public methods - - def write(self, outfile=None, section=None): - """ - Write the current ConfigObj as a file - - tekNico: FIXME: use StringIO instead of real files - - >>> filename = a.filename - >>> a.filename = 'test.ini' - >>> a.write() - >>> a.filename = filename - >>> a == ConfigObj('test.ini', raise_errors=True) - 1 - >>> import os - >>> os.remove('test.ini') - """ - if self.indent_type is None: - # this can be true if initialised from a dictionary - self.indent_type = DEFAULT_INDENT_TYPE - - out = [] - cs = self._a_to_u('#') - csp = self._a_to_u('# ') - if section is None: - int_val = self.interpolation - self.interpolation = False - section = self - for line in self.initial_comment: - line = self._decode_element(line) - stripped_line = line.strip() - if stripped_line and not stripped_line.startswith(cs): - line = csp + line - out.append(line) - - indent_string = self.indent_type * section.depth - for entry in (section.scalars + section.sections): - if entry in section.defaults: - # don't write out default values - continue - for comment_line in section.comments[entry]: - comment_line = self._decode_element(comment_line.lstrip()) - if comment_line and not comment_line.startswith(cs): - comment_line = csp + comment_line - out.append(indent_string + comment_line) - this_entry = section[entry] - comment = self._handle_comment(section.inline_comments[entry]) - - if isinstance(this_entry, Section): - # a section - out.append(self._write_marker( - indent_string, - this_entry.depth, - entry, - comment)) - out.extend(self.write(section=this_entry)) - else: - out.append(self._write_line( - indent_string, - entry, - this_entry, - comment)) - - if section is self: - for line in self.final_comment: - line = self._decode_element(line) - stripped_line = line.strip() - if stripped_line and not stripped_line.startswith(cs): - line = csp + line - out.append(line) - self.interpolation = int_val - - if section is not self: - return out - - if (self.filename is None) and (outfile is None): - # output a list of lines - # might need to encode - # NOTE: This will *screw* UTF16, each line will start with the BOM - if self.encoding: - out = [l.encode(self.encoding) for l in out] - if (self.BOM and ((self.encoding is None) or - (BOM_LIST.get(self.encoding.lower()) == 'utf_8'))): - # Add the UTF8 BOM - if not out: - out.append('') - out[0] = BOM_UTF8 + out[0] - return out - - # Turn the list to a string, joined with correct newlines - newline = self.newlines or os.linesep - if (getattr(outfile, 'mode', None) is not None and outfile.mode == 'w' - and sys.platform == 'win32' and newline == '\r\n'): - # Windows specific hack to avoid writing '\r\r\n' - newline = '\n' - output = self._a_to_u(newline).join(out) - if not output.endswith(newline): - output += newline - - if isinstance(output, bytes): - output_bytes = output - else: - output_bytes = output.encode(self.encoding or - self.default_encoding or - 'ascii') - - if self.BOM and ((self.encoding is None) or match_utf8(self.encoding)): - # Add the UTF8 BOM - output_bytes = BOM_UTF8 + output_bytes - - if outfile is not None: - outfile.write(output_bytes) - else: - with open(self.filename, 'wb') as h: - h.write(output_bytes) - - def validate(self, validator, preserve_errors=False, copy=False, - section=None): - """ - Test the ConfigObj against a configspec. - - It uses the ``validator`` object from *validate.py*. - - To run ``validate`` on the current ConfigObj, call: :: - - test = config.validate(validator) - - (Normally having previously passed in the configspec when the ConfigObj - was created - you can dynamically assign a dictionary of checks to the - ``configspec`` attribute of a section though). - - It returns ``True`` if everything passes, or a dictionary of - pass/fails (True/False). If every member of a subsection passes, it - will just have the value ``True``. (It also returns ``False`` if all - members fail). - - In addition, it converts the values from strings to their native - types if their checks pass (and ``stringify`` is set). - - If ``preserve_errors`` is ``True`` (``False`` is default) then instead - of a marking a fail with a ``False``, it will preserve the actual - exception object. This can contain info about the reason for failure. - For example the ``VdtValueTooSmallError`` indicates that the value - supplied was too small. If a value (or section) is missing it will - still be marked as ``False``. - - You must have the validate module to use ``preserve_errors=True``. - - You can then use the ``flatten_errors`` function to turn your nested - results dictionary into a flattened list of failures - useful for - displaying meaningful error messages. - """ - if section is None: - if self.configspec is None: - raise ValueError('No configspec supplied.') - if preserve_errors: - # We do this once to remove a top level dependency on the validate module - # Which makes importing configobj faster - from .validate import VdtMissingValue - self._vdtMissingValue = VdtMissingValue - - section = self - - if copy: - section.initial_comment = section.configspec.initial_comment - section.final_comment = section.configspec.final_comment - section.encoding = section.configspec.encoding - section.BOM = section.configspec.BOM - section.newlines = section.configspec.newlines - section.indent_type = section.configspec.indent_type - - # - # section.default_values.clear() #?? - configspec = section.configspec - self._set_configspec(section, copy) - - - def validate_entry(entry, spec, val, missing, ret_true, ret_false): - section.default_values.pop(entry, None) - - try: - section.default_values[entry] = validator.get_default_value(configspec[entry]) - except (KeyError, AttributeError, validator.baseErrorClass): - # No default, bad default or validator has no 'get_default_value' - # (e.g. SimpleVal) - pass - - try: - check = validator.check(spec, - val, - missing=missing - ) - except validator.baseErrorClass as e: - if not preserve_errors or isinstance(e, self._vdtMissingValue): - out[entry] = False - else: - # preserve the error - out[entry] = e - ret_false = False - ret_true = False - else: - ret_false = False - out[entry] = True - if self.stringify or missing: - # if we are doing type conversion - # or the value is a supplied default - if not self.stringify: - if isinstance(check, (list, tuple)): - # preserve lists - check = [self._str(item) for item in check] - elif missing and check is None: - # convert the None from a default to a '' - check = '' - else: - check = self._str(check) - if (check != val) or missing: - section[entry] = check - if not copy and missing and entry not in section.defaults: - section.defaults.append(entry) - return ret_true, ret_false - - # - out = {} - ret_true = True - ret_false = True - - unvalidated = [k for k in section.scalars if k not in configspec] - incorrect_sections = [k for k in configspec.sections if k in section.scalars] - incorrect_scalars = [k for k in configspec.scalars if k in section.sections] - - for entry in configspec.scalars: - if entry in ('__many__', '___many___'): - # reserved names - continue - if (not entry in section.scalars) or (entry in section.defaults): - # missing entries - # or entries from defaults - missing = True - val = None - if copy and entry not in section.scalars: - # copy comments - section.comments[entry] = ( - configspec.comments.get(entry, [])) - section.inline_comments[entry] = ( - configspec.inline_comments.get(entry, '')) - # - else: - missing = False - val = section[entry] - - ret_true, ret_false = validate_entry(entry, configspec[entry], val, - missing, ret_true, ret_false) - - many = None - if '__many__' in configspec.scalars: - many = configspec['__many__'] - elif '___many___' in configspec.scalars: - many = configspec['___many___'] - - if many is not None: - for entry in unvalidated: - val = section[entry] - ret_true, ret_false = validate_entry(entry, many, val, False, - ret_true, ret_false) - unvalidated = [] - - for entry in incorrect_scalars: - ret_true = False - if not preserve_errors: - out[entry] = False - else: - ret_false = False - msg = 'Value %r was provided as a section' % entry - out[entry] = validator.baseErrorClass(msg) - for entry in incorrect_sections: - ret_true = False - if not preserve_errors: - out[entry] = False - else: - ret_false = False - msg = 'Section %r was provided as a single value' % entry - out[entry] = validator.baseErrorClass(msg) - - # Missing sections will have been created as empty ones when the - # configspec was read. - for entry in section.sections: - # FIXME: this means DEFAULT is not copied in copy mode - if section is self and entry == 'DEFAULT': - continue - if section[entry].configspec is None: - unvalidated.append(entry) - continue - if copy: - section.comments[entry] = configspec.comments.get(entry, []) - section.inline_comments[entry] = configspec.inline_comments.get(entry, '') - check = self.validate(validator, preserve_errors=preserve_errors, copy=copy, section=section[entry]) - out[entry] = check - if check == False: - ret_true = False - elif check == True: - ret_false = False - else: - ret_true = False - - section.extra_values = unvalidated - if preserve_errors and not section._created: - # If the section wasn't created (i.e. it wasn't missing) - # then we can't return False, we need to preserve errors - ret_false = False - # - if ret_false and preserve_errors and out: - # If we are preserving errors, but all - # the failures are from missing sections / values - # then we can return False. Otherwise there is a - # real failure that we need to preserve. - ret_false = not any(out.values()) - if ret_true: - return True - elif ret_false: - return False - return out - - - def reset(self): - """Clear ConfigObj instance and restore to 'freshly created' state.""" - self.clear() - self._initialise() - # FIXME: Should be done by '_initialise', but ConfigObj constructor (and reload) - # requires an empty dictionary - self.configspec = None - # Just to be sure ;-) - self._original_configspec = None - - - def reload(self): - """ - Reload a ConfigObj from file. - - This method raises a ``ReloadError`` if the ConfigObj doesn't have - a filename attribute pointing to a file. - """ - if not isinstance(self.filename, str): - raise ReloadError() - - filename = self.filename - current_options = {} - for entry in OPTION_DEFAULTS: - if entry == 'configspec': - continue - current_options[entry] = getattr(self, entry) - - configspec = self._original_configspec - current_options['configspec'] = configspec - - self.clear() - self._initialise(current_options) - self._load(filename, configspec) - - - -class SimpleVal: - """ - A simple validator. - Can be used to check that all members expected are present. - - To use it, provide a configspec with all your members in (the value given - will be ignored). Pass an instance of ``SimpleVal`` to the ``validate`` - method of your ``ConfigObj``. ``validate`` will return ``True`` if all - members are present, or a dictionary with True/False meaning - present/missing. (Whole missing sections will be replaced with ``False``) - """ - - def __init__(self): - self.baseErrorClass = ConfigObjError - - def check(self, check, member, missing=False): - """A dummy check method, always returns the value unchanged.""" - if missing: - raise self.baseErrorClass() - return member - - -def flatten_errors(cfg, res, levels=None, results=None): - """ - An example function that will turn a nested dictionary of results - (as returned by ``ConfigObj.validate``) into a flat list. - - ``cfg`` is the ConfigObj instance being checked, ``res`` is the results - dictionary returned by ``validate``. - - (This is a recursive function, so you shouldn't use the ``levels`` or - ``results`` arguments - they are used by the function.) - - Returns a list of keys that failed. Each member of the list is a tuple:: - - ([list of sections...], key, result) - - If ``validate`` was called with ``preserve_errors=False`` (the default) - then ``result`` will always be ``False``. - - *list of sections* is a flattened list of sections that the key was found - in. - - If the section was missing (or a section was expected and a scalar provided - - or vice-versa) then key will be ``None``. - - If the value (or section) was missing then ``result`` will be ``False``. - - If ``validate`` was called with ``preserve_errors=True`` and a value - was present, but failed the check, then ``result`` will be the exception - object returned. You can use this as a string that describes the failure. - - For example *The value "3" is of the wrong type*. - """ - if levels is None: - # first time called - levels = [] - results = [] - if res == True: - return sorted(results) - if res == False or isinstance(res, Exception): - results.append((levels[:], None, res)) - if levels: - levels.pop() - return sorted(results) - for (key, val) in list(res.items()): - if val == True: - continue - if isinstance(cfg.get(key), Mapping): - # Go down one level - levels.append(key) - flatten_errors(cfg[key], val, levels, results) - continue - results.append((levels[:], key, val)) - # - # Go up one level - if levels: - levels.pop() - # - return sorted(results) - - -def get_extra_values(conf, _prepend=()): - """ - Find all the values and sections not in the configspec from a validated - ConfigObj. - - ``get_extra_values`` returns a list of tuples where each tuple represents - either an extra section, or an extra value. - - The tuples contain two values, a tuple representing the section the value - is in and the name of the extra values. For extra values in the top level - section the first member will be an empty tuple. For values in the 'foo' - section the first member will be ``('foo',)``. For members in the 'bar' - subsection of the 'foo' section the first member will be ``('foo', 'bar')``. - - NOTE: If you call ``get_extra_values`` on a ConfigObj instance that hasn't - been validated it will return an empty list. - """ - out = [] - - out.extend([(_prepend, name) for name in conf.extra_values]) - for name in conf.sections: - if name not in conf.extra_values: - out.extend(get_extra_values(conf[name], _prepend + (name,))) - return out - - -"""*A programming language is a medium of expression.* - Paul Graham""" diff --git a/src/stpipe/extern/configobj/validate.py b/src/stpipe/extern/configobj/validate.py deleted file mode 100644 index 6f4bdee9..00000000 --- a/src/stpipe/extern/configobj/validate.py +++ /dev/null @@ -1,1466 +0,0 @@ -# validate.py -# A Validator object -# Copyright (C) 2005-2014: -# (name) : (email) -# Michael Foord: fuzzyman AT voidspace DOT org DOT uk -# Mark Andrews: mark AT la-la DOT com -# Nicola Larosa: nico AT tekNico DOT net -# Rob Dennis: rdennis AT gmail DOT com -# Eli Courtwright: eli AT courtwright DOT org - -# This software is licensed under the terms of the BSD license. -# http://opensource.org/licenses/BSD-3-Clause - -# ConfigObj 5 - main repository for documentation and issue tracking: -# https://github.com/DiffSK/configobj - -""" - The Validator object is used to check that supplied values - conform to a specification. - - The value can be supplied as a string - e.g. from a config file. - In this case the check will also *convert* the value to - the required type. This allows you to add validation - as a transparent layer to access data stored as strings. - The validation checks that the data is correct *and* - converts it to the expected type. - - Some standard checks are provided for basic data types. - Additional checks are easy to write. They can be - provided when the ``Validator`` is instantiated or - added afterwards. - - The standard functions work with the following basic data types : - - * integers - * floats - * booleans - * strings - * ip_addr - - plus lists of these datatypes - - Adding additional checks is done through coding simple functions. - - The full set of standard checks are : - - * 'integer': matches integer values (including negative) - Takes optional 'min' and 'max' arguments : :: - - integer() - integer(3, 9) # any value from 3 to 9 - integer(min=0) # any positive value - integer(max=9) - - * 'float': matches float values - Has the same parameters as the integer check. - - * 'boolean': matches boolean values - ``True`` or ``False`` - Acceptable string values for True are : - true, on, yes, 1 - Acceptable string values for False are : - false, off, no, 0 - - Any other value raises an error. - - * 'ip_addr': matches an Internet Protocol address, v.4, represented - by a dotted-quad string, i.e. '1.2.3.4'. - - * 'string': matches any string. - Takes optional keyword args 'min' and 'max' - to specify min and max lengths of the string. - - * 'list': matches any list. - Takes optional keyword args 'min', and 'max' to specify min and - max sizes of the list. (Always returns a list.) - - * 'tuple': matches any tuple. - Takes optional keyword args 'min', and 'max' to specify min and - max sizes of the tuple. (Always returns a tuple.) - - * 'int_list': Matches a list of integers. - Takes the same arguments as list. - - * 'float_list': Matches a list of floats. - Takes the same arguments as list. - - * 'bool_list': Matches a list of boolean values. - Takes the same arguments as list. - - * 'ip_addr_list': Matches a list of IP addresses. - Takes the same arguments as list. - - * 'string_list': Matches a list of strings. - Takes the same arguments as list. - - * 'mixed_list': Matches a list with different types in - specific positions. List size must match - the number of arguments. - - Each position can be one of : - 'integer', 'float', 'ip_addr', 'string', 'boolean' - - So to specify a list with two strings followed - by two integers, you write the check as : :: - - mixed_list('string', 'string', 'integer', 'integer') - - * 'pass': This check matches everything ! It never fails - and the value is unchanged. - - It is also the default if no check is specified. - - * 'option': This check matches any from a list of options. - You specify this check with : :: - - option('option 1', 'option 2', 'option 3') - - You can supply a default value (returned if no value is supplied) - using the default keyword argument. - - You specify a list argument for default using a list constructor syntax in - the check : :: - - checkname(arg1, arg2, default=list('val 1', 'val 2', 'val 3')) - - A badly formatted set of arguments will raise a ``VdtParamError``. -""" - -__version__ = '1.0.1' - - -__all__ = ( - '__version__', - 'dottedQuadToNum', - 'numToDottedQuad', - 'ValidateError', - 'VdtUnknownCheckError', - 'VdtParamError', - 'VdtTypeError', - 'VdtValueError', - 'VdtValueTooSmallError', - 'VdtValueTooBigError', - 'VdtValueTooShortError', - 'VdtValueTooLongError', - 'VdtMissingValue', - 'Validator', - 'is_integer', - 'is_float', - 'is_boolean', - 'is_list', - 'is_tuple', - 'is_ip_addr', - 'is_string', - 'is_int_list', - 'is_bool_list', - 'is_float_list', - 'is_string_list', - 'is_ip_addr_list', - 'is_mixed_list', - 'is_option', - '__docformat__', -) - -import re -import sys - -#TODO - #21 - six is part of the repo now, but we didn't switch over to it here -# this could be replaced if six is used for compatibility, or there are no -# more assertions about items being a string -string_type = str -# so tests that care about unicode on 2.x can specify unicode, and the same -# tests when run on 3.x won't complain about a undefined name "unicode" -# since all strings are unicode on 3.x we just want to pass it through -# unchanged -unicode = lambda x: x -# in python 3, all ints are equivalent to python 2 longs, and they'll -# never show "L" in the repr -long = int - -_list_arg = re.compile(r''' - (?: - ([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*list\( - ( - (?: - \s* - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s\)][^,\)]*?) # unquoted - ) - \s*,\s* - )* - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s\)][^,\)]*?) # unquoted - )? # last one - ) - \) - ) -''', re.VERBOSE | re.DOTALL) # two groups - -_list_members = re.compile(r''' - ( - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s=][^,=]*?) # unquoted - ) - (?: - (?:\s*,\s*)|(?:\s*$) # comma - ) -''', re.VERBOSE | re.DOTALL) # one group - -_paramstring = r''' - (?: - ( - (?: - [a-zA-Z_][a-zA-Z0-9_]*\s*=\s*list\( - (?: - \s* - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s\)][^,\)]*?) # unquoted - ) - \s*,\s* - )* - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s\)][^,\)]*?) # unquoted - )? # last one - \) - )| - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s=][^,=]*?)| # unquoted - (?: # keyword argument - [a-zA-Z_][a-zA-Z0-9_]*\s*=\s* - (?: - (?:".*?")| # double quotes - (?:'.*?')| # single quotes - (?:[^'",\s=][^,=]*?) # unquoted - ) - ) - ) - ) - (?: - (?:\s*,\s*)|(?:\s*$) # comma - ) - ) - ''' - -_matchstring = '^%s*' % _paramstring - -# Python pre 2.2.1 doesn't have bool -try: - bool -except NameError: - def bool(val): - """Simple boolean equivalent function. """ - if val: - return 1 - else: - return 0 - - -def dottedQuadToNum(ip): - """ - Convert decimal dotted quad string to long integer - - >>> int(dottedQuadToNum('1 ')) - 1 - >>> int(dottedQuadToNum(' 1.2')) - 16777218 - >>> int(dottedQuadToNum(' 1.2.3 ')) - 16908291 - >>> int(dottedQuadToNum('1.2.3.4')) - 16909060 - >>> dottedQuadToNum('255.255.255.255') - 4294967295 - >>> dottedQuadToNum('255.255.255.256') - Traceback (most recent call last): - ValueError: Not a good dotted-quad IP: 255.255.255.256 - """ - - # import here to avoid it when ip_addr values are not used - import socket, struct - - try: - return struct.unpack('!L', - socket.inet_aton(ip.strip()))[0] - except OSError: - raise ValueError('Not a good dotted-quad IP: %s' % ip) - return - - -def numToDottedQuad(num): - """ - Convert int or long int to dotted quad string - - >>> numToDottedQuad(long(-1)) - Traceback (most recent call last): - ValueError: Not a good numeric IP: -1 - >>> numToDottedQuad(long(1)) - '0.0.0.1' - >>> numToDottedQuad(long(16777218)) - '1.0.0.2' - >>> numToDottedQuad(long(16908291)) - '1.2.0.3' - >>> numToDottedQuad(long(16909060)) - '1.2.3.4' - >>> numToDottedQuad(long(4294967295)) - '255.255.255.255' - >>> numToDottedQuad(long(4294967296)) - Traceback (most recent call last): - ValueError: Not a good numeric IP: 4294967296 - >>> numToDottedQuad(-1) - Traceback (most recent call last): - ValueError: Not a good numeric IP: -1 - >>> numToDottedQuad(1) - '0.0.0.1' - >>> numToDottedQuad(16777218) - '1.0.0.2' - >>> numToDottedQuad(16908291) - '1.2.0.3' - >>> numToDottedQuad(16909060) - '1.2.3.4' - >>> numToDottedQuad(4294967295) - '255.255.255.255' - >>> numToDottedQuad(4294967296) - Traceback (most recent call last): - ValueError: Not a good numeric IP: 4294967296 - - """ - - # import here to avoid it when ip_addr values are not used - import socket, struct - - # no need to intercept here, 4294967295L is fine - if num > long(4294967295) or num < 0: - raise ValueError('Not a good numeric IP: %s' % num) - try: - return socket.inet_ntoa( - struct.pack('!L', long(num))) - except (OSError, struct.error, OverflowError): - raise ValueError('Not a good numeric IP: %s' % num) - - -class ValidateError(Exception): - """ - This error indicates that the check failed. - It can be the base class for more specific errors. - - Any check function that fails ought to raise this error. - (or a subclass) - - >>> raise ValidateError - Traceback (most recent call last): - ValidateError - """ - - -class VdtMissingValue(ValidateError): - """No value was supplied to a check that needed one.""" - - -class VdtUnknownCheckError(ValidateError): - """An unknown check function was requested""" - - def __init__(self, value): - """ - >>> raise VdtUnknownCheckError('yoda') - Traceback (most recent call last): - VdtUnknownCheckError: the check "yoda" is unknown. - """ - ValidateError.__init__(self, f'the check "{value}" is unknown.') - - -class VdtParamError(SyntaxError): - """An incorrect parameter was passed""" - - def __init__(self, name, value): - """ - >>> raise VdtParamError('yoda', 'jedi') - Traceback (most recent call last): - VdtParamError: passed an incorrect value "jedi" for parameter "yoda". - """ - SyntaxError.__init__(self, f'passed an incorrect value "{value}" for parameter "{name}".') - - -class VdtTypeError(ValidateError): - """The value supplied was of the wrong type""" - - def __init__(self, value): - """ - >>> raise VdtTypeError('jedi') - Traceback (most recent call last): - VdtTypeError: the value "jedi" is of the wrong type. - """ - ValidateError.__init__(self, f'the value "{value}" is of the wrong type.') - - -class VdtValueError(ValidateError): - """The value supplied was of the correct type, but was not an allowed value.""" - - def __init__(self, value): - """ - >>> raise VdtValueError('jedi') - Traceback (most recent call last): - VdtValueError: the value "jedi" is unacceptable. - """ - ValidateError.__init__(self, f'the value "{value}" is unacceptable.') - - -class VdtValueTooSmallError(VdtValueError): - """The value supplied was of the correct type, but was too small.""" - - def __init__(self, value): - """ - >>> raise VdtValueTooSmallError('0') - Traceback (most recent call last): - VdtValueTooSmallError: the value "0" is too small. - """ - ValidateError.__init__(self, f'the value "{value}" is too small.') - - -class VdtValueTooBigError(VdtValueError): - """The value supplied was of the correct type, but was too big.""" - - def __init__(self, value): - """ - >>> raise VdtValueTooBigError('1') - Traceback (most recent call last): - VdtValueTooBigError: the value "1" is too big. - """ - ValidateError.__init__(self, f'the value "{value}" is too big.') - - -class VdtValueTooShortError(VdtValueError): - """The value supplied was of the correct type, but was too short.""" - - def __init__(self, value): - """ - >>> raise VdtValueTooShortError('jed') - Traceback (most recent call last): - VdtValueTooShortError: the value "jed" is too short. - """ - ValidateError.__init__( - self, - f'the value "{value}" is too short.') - - -class VdtValueTooLongError(VdtValueError): - """The value supplied was of the correct type, but was too long.""" - - def __init__(self, value): - """ - >>> raise VdtValueTooLongError('jedie') - Traceback (most recent call last): - VdtValueTooLongError: the value "jedie" is too long. - """ - ValidateError.__init__(self, f'the value "{value}" is too long.') - - -class Validator: - """ - Validator is an object that allows you to register a set of 'checks'. - These checks take input and test that it conforms to the check. - - This can also involve converting the value from a string into - the correct datatype. - - The ``check`` method takes an input string which configures which - check is to be used and applies that check to a supplied value. - - An example input string would be: - 'int_range(param1, param2)' - - You would then provide something like: - - >>> def int_range_check(value, min, max): - ... # turn min and max from strings to integers - ... min = int(min) - ... max = int(max) - ... # check that value is of the correct type. - ... # possible valid inputs are integers or strings - ... # that represent integers - ... if not isinstance(value, (int, long, string_type)): - ... raise VdtTypeError(value) - ... elif isinstance(value, string_type): - ... # if we are given a string - ... # attempt to convert to an integer - ... try: - ... value = int(value) - ... except ValueError: - ... raise VdtValueError(value) - ... # check the value is between our constraints - ... if not min <= value: - ... raise VdtValueTooSmallError(value) - ... if not value <= max: - ... raise VdtValueTooBigError(value) - ... return value - - >>> fdict = {'int_range': int_range_check} - >>> vtr1 = Validator(fdict) - >>> vtr1.check('int_range(20, 40)', '30') - 30 - >>> vtr1.check('int_range(20, 40)', '60') - Traceback (most recent call last): - VdtValueTooBigError: the value "60" is too big. - - New functions can be added with : :: - - >>> vtr2 = Validator() - >>> vtr2.functions['int_range'] = int_range_check - - Or by passing in a dictionary of functions when Validator - is instantiated. - - Your functions *can* use keyword arguments, - but the first argument should always be 'value'. - - If the function doesn't take additional arguments, - the parentheses are optional in the check. - It can be written with either of : :: - - keyword = function_name - keyword = function_name() - - The first program to utilise Validator() was Michael Foord's - ConfigObj, an alternative to ConfigParser which supports lists and - can validate a config file using a config schema. - For more details on using Validator with ConfigObj see: - https://configobj.readthedocs.org/en/latest/configobj.html - """ - - # this regex does the initial parsing of the checks - _func_re = re.compile(r'([^\(\)]+?)\((.*)\)', re.DOTALL) - - # this regex takes apart keyword arguments - _key_arg = re.compile(r'^([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(.*)$', re.DOTALL) - - - # this regex finds keyword=list(....) type values - _list_arg = _list_arg - - # this regex takes individual values out of lists - in one pass - _list_members = _list_members - - # These regexes check a set of arguments for validity - # and then pull the members out - _paramfinder = re.compile(_paramstring, re.VERBOSE | re.DOTALL) - _matchfinder = re.compile(_matchstring, re.VERBOSE | re.DOTALL) - - - def __init__(self, functions=None): - """ - >>> vtri = Validator() - """ - self.functions = { - '': self._pass, - 'integer': is_integer, - 'float': is_float, - 'boolean': is_boolean, - 'ip_addr': is_ip_addr, - 'string': is_string, - 'list': is_list, - 'tuple': is_tuple, - 'int_list': is_int_list, - 'float_list': is_float_list, - 'bool_list': is_bool_list, - 'ip_addr_list': is_ip_addr_list, - 'string_list': is_string_list, - 'mixed_list': is_mixed_list, - 'pass': self._pass, - 'option': is_option, - 'force_list': force_list, - } - if functions is not None: - self.functions.update(functions) - # tekNico: for use by ConfigObj - self.baseErrorClass = ValidateError - self._cache = {} - - - def check(self, check, value, missing=False): - """ - Usage: check(check, value) - - Arguments: - check: string representing check to apply (including arguments) - value: object to be checked - Returns value, converted to correct type if necessary - - If the check fails, raises a ``ValidateError`` subclass. - - >>> vtor.check('yoda', '') - Traceback (most recent call last): - VdtUnknownCheckError: the check "yoda" is unknown. - >>> vtor.check('yoda()', '') - Traceback (most recent call last): - VdtUnknownCheckError: the check "yoda" is unknown. - - >>> vtor.check('string(default="")', '', missing=True) - '' - """ - fun_name, fun_args, fun_kwargs, default = self._parse_with_caching(check) - - if missing: - if default is None: - # no information needed here - to be handled by caller - raise VdtMissingValue() - value = self._handle_none(default) - - if value is None: - return None - - return self._check_value(value, fun_name, fun_args, fun_kwargs) - - - def _handle_none(self, value): - if value == 'None': - return None - elif value in ("'None'", '"None"'): - # Special case a quoted None - value = self._unquote(value) - return value - - - def _parse_with_caching(self, check): - if check in self._cache: - fun_name, fun_args, fun_kwargs, default = self._cache[check] - # We call list and dict below to work with *copies* of the data - # rather than the original (which are mutable of course) - fun_args = list(fun_args) - fun_kwargs = dict(fun_kwargs) - else: - fun_name, fun_args, fun_kwargs, default = self._parse_check(check) - fun_kwargs = {str(key): value for (key, value) in list(fun_kwargs.items())} - self._cache[check] = fun_name, list(fun_args), dict(fun_kwargs), default - return fun_name, fun_args, fun_kwargs, default - - - def _check_value(self, value, fun_name, fun_args, fun_kwargs): - try: - fun = self.functions[fun_name] - except KeyError: - raise VdtUnknownCheckError(fun_name) - else: - return fun(value, *fun_args, **fun_kwargs) - - - def _parse_check(self, check): - fun_match = self._func_re.match(check) - if fun_match: - fun_name = fun_match.group(1) - arg_string = fun_match.group(2) - arg_match = self._matchfinder.match(arg_string) - if arg_match is None: - # Bad syntax - raise VdtParamError('Bad syntax in check "%s".' % check) - fun_args = [] - fun_kwargs = {} - # pull out args of group 2 - for arg in self._paramfinder.findall(arg_string): - # args may need whitespace removing (before removing quotes) - arg = arg.strip() - listmatch = self._list_arg.match(arg) - if listmatch: - key, val = self._list_handle(listmatch) - fun_kwargs[key] = val - continue - keymatch = self._key_arg.match(arg) - if keymatch: - val = keymatch.group(2) - if not val in ("'None'", '"None"'): - # Special case a quoted None - val = self._unquote(val) - fun_kwargs[keymatch.group(1)] = val - continue - - fun_args.append(self._unquote(arg)) - else: - # allows for function names without (args) - return check, (), {}, None - - # Default must be deleted if the value is specified too, - # otherwise the check function will get a spurious "default" keyword arg - default = fun_kwargs.pop('default', None) - return fun_name, fun_args, fun_kwargs, default - - - def _unquote(self, val): - """Unquote a value if necessary.""" - if (len(val) >= 2) and (val[0] in ("'", '"')) and (val[0] == val[-1]): - val = val[1:-1] - return val - - - def _list_handle(self, listmatch): - """Take apart a ``keyword=list('val, 'val')`` type string.""" - out = [] - name = listmatch.group(1) - args = listmatch.group(2) - for arg in self._list_members.findall(args): - out.append(self._unquote(arg)) - return name, out - - - def _pass(self, value): - """ - Dummy check that always passes - - >>> vtor.check('', 0) - 0 - >>> vtor.check('', '0') - '0' - """ - return value - - - def get_default_value(self, check): - """ - Given a check, return the default value for the check - (converted to the right type). - - If the check doesn't specify a default value then a - ``KeyError`` will be raised. - """ - fun_name, fun_args, fun_kwargs, default = self._parse_with_caching(check) - if default is None: - raise KeyError('Check "%s" has no default value.' % check) - value = self._handle_none(default) - if value is None: - return value - return self._check_value(value, fun_name, fun_args, fun_kwargs) - - -def _is_num_param(names, values, to_float=False): - """ - Return numbers from inputs or raise VdtParamError. - - Lets ``None`` pass through. - Pass in keyword argument ``to_float=True`` to - use float for the conversion rather than int. - - >>> _is_num_param(('', ''), (0, 1.0)) - [0, 1] - >>> _is_num_param(('', ''), (0, 1.0), to_float=True) - [0.0, 1.0] - >>> _is_num_param(('a'), ('a')) - Traceback (most recent call last): - VdtParamError: passed an incorrect value "a" for parameter "a". - """ - fun = to_float and float or int - out_params = [] - for (name, val) in zip(names, values): - if val is None: - out_params.append(val) - elif isinstance(val, (int, long, float, string_type)): - try: - out_params.append(fun(val)) - except ValueError as e: - raise VdtParamError(name, val) - else: - raise VdtParamError(name, val) - return out_params - - -# built in checks -# you can override these by setting the appropriate name -# in Validator.functions -# note: if the params are specified wrongly in your input string, -# you will also raise errors. - -def is_integer(value, min=None, max=None): - """ - A check that tests that a given value is an integer (int, or long) - and optionally, between bounds. A negative value is accepted, while - a float will fail. - - If the value is a string, then the conversion is done - if possible. - Otherwise a VdtError is raised. - - >>> vtor.check('integer', '-1') - -1 - >>> vtor.check('integer', '0') - 0 - >>> vtor.check('integer', 9) - 9 - >>> vtor.check('integer', 'a') - Traceback (most recent call last): - VdtTypeError: the value "a" is of the wrong type. - >>> vtor.check('integer', '2.2') - Traceback (most recent call last): - VdtTypeError: the value "2.2" is of the wrong type. - >>> vtor.check('integer(10)', '20') - 20 - >>> vtor.check('integer(max=20)', '15') - 15 - >>> vtor.check('integer(10)', '9') - Traceback (most recent call last): - VdtValueTooSmallError: the value "9" is too small. - >>> vtor.check('integer(10)', 9) - Traceback (most recent call last): - VdtValueTooSmallError: the value "9" is too small. - >>> vtor.check('integer(max=20)', '35') - Traceback (most recent call last): - VdtValueTooBigError: the value "35" is too big. - >>> vtor.check('integer(max=20)', 35) - Traceback (most recent call last): - VdtValueTooBigError: the value "35" is too big. - >>> vtor.check('integer(0, 9)', False) - 0 - """ - (min_val, max_val) = _is_num_param(('min', 'max'), (min, max)) - if not isinstance(value, (int, long, string_type)): - raise VdtTypeError(value) - if isinstance(value, string_type): - # if it's a string - does it represent an integer ? - try: - value = int(value) - except ValueError: - raise VdtTypeError(value) - if (min_val is not None) and (value < min_val): - raise VdtValueTooSmallError(value) - if (max_val is not None) and (value > max_val): - raise VdtValueTooBigError(value) - return value - - -def is_float(value, min=None, max=None): - """ - A check that tests that a given value is a float - (an integer will be accepted), and optionally - that it is between bounds. - - If the value is a string, then the conversion is done - if possible. - Otherwise a VdtError is raised. - - This can accept negative values. - - >>> vtor.check('float', '2') - 2.0 - - From now on we multiply the value to avoid comparing decimals - - >>> vtor.check('float', '-6.8') * 10 - -68.0 - >>> vtor.check('float', '12.2') * 10 - 122.0 - >>> vtor.check('float', 8.4) * 10 - 84.0 - >>> vtor.check('float', 'a') - Traceback (most recent call last): - VdtTypeError: the value "a" is of the wrong type. - >>> vtor.check('float(10.1)', '10.2') * 10 - 102.0 - >>> vtor.check('float(max=20.2)', '15.1') * 10 - 151.0 - >>> vtor.check('float(10.0)', '9.0') - Traceback (most recent call last): - VdtValueTooSmallError: the value "9.0" is too small. - >>> vtor.check('float(max=20.0)', '35.0') - Traceback (most recent call last): - VdtValueTooBigError: the value "35.0" is too big. - """ - (min_val, max_val) = _is_num_param( - ('min', 'max'), (min, max), to_float=True) - if not isinstance(value, (int, long, float, string_type)): - raise VdtTypeError(value) - if not isinstance(value, float): - # if it's a string - does it represent a float ? - try: - value = float(value) - except ValueError: - raise VdtTypeError(value) - if (min_val is not None) and (value < min_val): - raise VdtValueTooSmallError(value) - if (max_val is not None) and (value > max_val): - raise VdtValueTooBigError(value) - return value - - -bool_dict = { - True: True, 'on': True, '1': True, 'true': True, 'yes': True, - False: False, 'off': False, '0': False, 'false': False, 'no': False, -} - - -def is_boolean(value): - """ - Check if the value represents a boolean. - - >>> vtor.check('boolean', 0) - 0 - >>> vtor.check('boolean', False) - 0 - >>> vtor.check('boolean', '0') - 0 - >>> vtor.check('boolean', 'off') - 0 - >>> vtor.check('boolean', 'false') - 0 - >>> vtor.check('boolean', 'no') - 0 - >>> vtor.check('boolean', 'nO') - 0 - >>> vtor.check('boolean', 'NO') - 0 - >>> vtor.check('boolean', 1) - 1 - >>> vtor.check('boolean', True) - 1 - >>> vtor.check('boolean', '1') - 1 - >>> vtor.check('boolean', 'on') - 1 - >>> vtor.check('boolean', 'true') - 1 - >>> vtor.check('boolean', 'yes') - 1 - >>> vtor.check('boolean', 'Yes') - 1 - >>> vtor.check('boolean', 'YES') - 1 - >>> vtor.check('boolean', '') - Traceback (most recent call last): - VdtTypeError: the value "" is of the wrong type. - >>> vtor.check('boolean', 'up') - Traceback (most recent call last): - VdtTypeError: the value "up" is of the wrong type. - - """ - if isinstance(value, string_type): - try: - return bool_dict[value.lower()] - except KeyError: - raise VdtTypeError(value) - # we do an equality test rather than an identity test - # this ensures Python 2.2 compatibility - # and allows 0 and 1 to represent True and False - if value == False: - return False - elif value == True: - return True - else: - raise VdtTypeError(value) - - -def is_ip_addr(value): - """ - Check that the supplied value is an Internet Protocol address, v.4, - represented by a dotted-quad string, i.e. '1.2.3.4'. - - >>> vtor.check('ip_addr', '1 ') - '1' - >>> vtor.check('ip_addr', ' 1.2') - '1.2' - >>> vtor.check('ip_addr', ' 1.2.3 ') - '1.2.3' - >>> vtor.check('ip_addr', '1.2.3.4') - '1.2.3.4' - >>> vtor.check('ip_addr', '0.0.0.0') - '0.0.0.0' - >>> vtor.check('ip_addr', '255.255.255.255') - '255.255.255.255' - >>> vtor.check('ip_addr', '255.255.255.256') - Traceback (most recent call last): - VdtValueError: the value "255.255.255.256" is unacceptable. - >>> vtor.check('ip_addr', '1.2.3.4.5') - Traceback (most recent call last): - VdtValueError: the value "1.2.3.4.5" is unacceptable. - >>> vtor.check('ip_addr', 0) - Traceback (most recent call last): - VdtTypeError: the value "0" is of the wrong type. - """ - if not isinstance(value, string_type): - raise VdtTypeError(value) - value = value.strip() - try: - dottedQuadToNum(value) - except ValueError: - raise VdtValueError(value) - return value - - -def is_list(value, min=None, max=None): - """ - Check that the value is a list of values. - - You can optionally specify the minimum and maximum number of members. - - It does no check on list members. - - >>> vtor.check('list', ()) - [] - >>> vtor.check('list', []) - [] - >>> vtor.check('list', (1, 2)) - [1, 2] - >>> vtor.check('list', [1, 2]) - [1, 2] - >>> vtor.check('list(3)', (1, 2)) - Traceback (most recent call last): - VdtValueTooShortError: the value "(1, 2)" is too short. - >>> vtor.check('list(max=5)', (1, 2, 3, 4, 5, 6)) - Traceback (most recent call last): - VdtValueTooLongError: the value "(1, 2, 3, 4, 5, 6)" is too long. - >>> vtor.check('list(min=3, max=5)', (1, 2, 3, 4)) - [1, 2, 3, 4] - >>> vtor.check('list', 0) - Traceback (most recent call last): - VdtTypeError: the value "0" is of the wrong type. - >>> vtor.check('list', '12') - Traceback (most recent call last): - VdtTypeError: the value "12" is of the wrong type. - """ - (min_len, max_len) = _is_num_param(('min', 'max'), (min, max)) - if isinstance(value, string_type): - raise VdtTypeError(value) - try: - num_members = len(value) - except TypeError: - raise VdtTypeError(value) - if min_len is not None and num_members < min_len: - raise VdtValueTooShortError(value) - if max_len is not None and num_members > max_len: - raise VdtValueTooLongError(value) - return list(value) - - -def is_tuple(value, min=None, max=None): - """ - Check that the value is a tuple of values. - - You can optionally specify the minimum and maximum number of members. - - It does no check on members. - - >>> vtor.check('tuple', ()) - () - >>> vtor.check('tuple', []) - () - >>> vtor.check('tuple', (1, 2)) - (1, 2) - >>> vtor.check('tuple', [1, 2]) - (1, 2) - >>> vtor.check('tuple(3)', (1, 2)) - Traceback (most recent call last): - VdtValueTooShortError: the value "(1, 2)" is too short. - >>> vtor.check('tuple(max=5)', (1, 2, 3, 4, 5, 6)) - Traceback (most recent call last): - VdtValueTooLongError: the value "(1, 2, 3, 4, 5, 6)" is too long. - >>> vtor.check('tuple(min=3, max=5)', (1, 2, 3, 4)) - (1, 2, 3, 4) - >>> vtor.check('tuple', 0) - Traceback (most recent call last): - VdtTypeError: the value "0" is of the wrong type. - >>> vtor.check('tuple', '12') - Traceback (most recent call last): - VdtTypeError: the value "12" is of the wrong type. - """ - return tuple(is_list(value, min, max)) - - -def is_string(value, min=None, max=None): - """ - Check that the supplied value is a string. - - You can optionally specify the minimum and maximum number of members. - - >>> vtor.check('string', '0') - '0' - >>> vtor.check('string', 0) - Traceback (most recent call last): - VdtTypeError: the value "0" is of the wrong type. - >>> vtor.check('string(2)', '12') - '12' - >>> vtor.check('string(2)', '1') - Traceback (most recent call last): - VdtValueTooShortError: the value "1" is too short. - >>> vtor.check('string(min=2, max=3)', '123') - '123' - >>> vtor.check('string(min=2, max=3)', '1234') - Traceback (most recent call last): - VdtValueTooLongError: the value "1234" is too long. - """ - if not isinstance(value, string_type): - raise VdtTypeError(value) - (min_len, max_len) = _is_num_param(('min', 'max'), (min, max)) - try: - num_members = len(value) - except TypeError: - raise VdtTypeError(value) - if min_len is not None and num_members < min_len: - raise VdtValueTooShortError(value) - if max_len is not None and num_members > max_len: - raise VdtValueTooLongError(value) - return value - - -def is_int_list(value, min=None, max=None): - """ - Check that the value is a list of integers. - - You can optionally specify the minimum and maximum number of members. - - Each list member is checked that it is an integer. - - >>> vtor.check('int_list', ()) - [] - >>> vtor.check('int_list', []) - [] - >>> vtor.check('int_list', (1, 2)) - [1, 2] - >>> vtor.check('int_list', [1, 2]) - [1, 2] - >>> vtor.check('int_list', [1, 'a']) - Traceback (most recent call last): - VdtTypeError: the value "a" is of the wrong type. - """ - return [is_integer(mem) for mem in is_list(value, min, max)] - - -def is_bool_list(value, min=None, max=None): - """ - Check that the value is a list of booleans. - - You can optionally specify the minimum and maximum number of members. - - Each list member is checked that it is a boolean. - - >>> vtor.check('bool_list', ()) - [] - >>> vtor.check('bool_list', []) - [] - >>> check_res = vtor.check('bool_list', (True, False)) - >>> check_res == [True, False] - 1 - >>> check_res = vtor.check('bool_list', [True, False]) - >>> check_res == [True, False] - 1 - >>> vtor.check('bool_list', [True, 'a']) - Traceback (most recent call last): - VdtTypeError: the value "a" is of the wrong type. - """ - return [is_boolean(mem) for mem in is_list(value, min, max)] - - -def is_float_list(value, min=None, max=None): - """ - Check that the value is a list of floats. - - You can optionally specify the minimum and maximum number of members. - - Each list member is checked that it is a float. - - >>> vtor.check('float_list', ()) - [] - >>> vtor.check('float_list', []) - [] - >>> vtor.check('float_list', (1, 2.0)) - [1.0, 2.0] - >>> vtor.check('float_list', [1, 2.0]) - [1.0, 2.0] - >>> vtor.check('float_list', [1, 'a']) - Traceback (most recent call last): - VdtTypeError: the value "a" is of the wrong type. - """ - return [is_float(mem) for mem in is_list(value, min, max)] - - -def is_string_list(value, min=None, max=None): - """ - Check that the value is a list of strings. - - You can optionally specify the minimum and maximum number of members. - - Each list member is checked that it is a string. - - >>> vtor.check('string_list', ()) - [] - >>> vtor.check('string_list', []) - [] - >>> vtor.check('string_list', ('a', 'b')) - ['a', 'b'] - >>> vtor.check('string_list', ['a', 1]) - Traceback (most recent call last): - VdtTypeError: the value "1" is of the wrong type. - >>> vtor.check('string_list', 'hello') - Traceback (most recent call last): - VdtTypeError: the value "hello" is of the wrong type. - """ - if isinstance(value, string_type): - raise VdtTypeError(value) - return [is_string(mem) for mem in is_list(value, min, max)] - - -def is_ip_addr_list(value, min=None, max=None): - """ - Check that the value is a list of IP addresses. - - You can optionally specify the minimum and maximum number of members. - - Each list member is checked that it is an IP address. - - >>> vtor.check('ip_addr_list', ()) - [] - >>> vtor.check('ip_addr_list', []) - [] - >>> vtor.check('ip_addr_list', ('1.2.3.4', '5.6.7.8')) - ['1.2.3.4', '5.6.7.8'] - >>> vtor.check('ip_addr_list', ['a']) - Traceback (most recent call last): - VdtValueError: the value "a" is unacceptable. - """ - return [is_ip_addr(mem) for mem in is_list(value, min, max)] - - -def force_list(value, min=None, max=None): - """ - Check that a value is a list, coercing strings into - a list with one member. Useful where users forget the - trailing comma that turns a single value into a list. - - You can optionally specify the minimum and maximum number of members. - A minimum of greater than one will fail if the user only supplies a - string. - - >>> vtor.check('force_list', ()) - [] - >>> vtor.check('force_list', []) - [] - >>> vtor.check('force_list', 'hello') - ['hello'] - """ - if not isinstance(value, (list, tuple)): - value = [value] - return is_list(value, min, max) - - - -fun_dict = { - 'integer': is_integer, - 'float': is_float, - 'ip_addr': is_ip_addr, - 'string': is_string, - 'boolean': is_boolean, -} - - -def is_mixed_list(value, *args): - """ - Check that the value is a list. - Allow specifying the type of each member. - Work on lists of specific lengths. - - You specify each member as a positional argument specifying type - - Each type should be one of the following strings : - 'integer', 'float', 'ip_addr', 'string', 'boolean' - - So you can specify a list of two strings, followed by - two integers as : - - mixed_list('string', 'string', 'integer', 'integer') - - The length of the list must match the number of positional - arguments you supply. - - >>> mix_str = "mixed_list('integer', 'float', 'ip_addr', 'string', 'boolean')" - >>> check_res = vtor.check(mix_str, (1, 2.0, '1.2.3.4', 'a', True)) - >>> check_res == [1, 2.0, '1.2.3.4', 'a', True] - 1 - >>> check_res = vtor.check(mix_str, ('1', '2.0', '1.2.3.4', 'a', 'True')) - >>> check_res == [1, 2.0, '1.2.3.4', 'a', True] - 1 - >>> vtor.check(mix_str, ('b', 2.0, '1.2.3.4', 'a', True)) - Traceback (most recent call last): - VdtTypeError: the value "b" is of the wrong type. - >>> vtor.check(mix_str, (1, 2.0, '1.2.3.4', 'a')) - Traceback (most recent call last): - VdtValueTooShortError: the value "(1, 2.0, '1.2.3.4', 'a')" is too short. - >>> vtor.check(mix_str, (1, 2.0, '1.2.3.4', 'a', 1, 'b')) - Traceback (most recent call last): - VdtValueTooLongError: the value "(1, 2.0, '1.2.3.4', 'a', 1, 'b')" is too long. - >>> vtor.check(mix_str, 0) - Traceback (most recent call last): - VdtTypeError: the value "0" is of the wrong type. - - >>> vtor.check('mixed_list("yoda")', ('a')) - Traceback (most recent call last): - VdtParamError: passed an incorrect value "KeyError('yoda',)" for parameter "'mixed_list'" - """ - try: - length = len(value) - except TypeError: - raise VdtTypeError(value) - if length < len(args): - raise VdtValueTooShortError(value) - elif length > len(args): - raise VdtValueTooLongError(value) - try: - return [fun_dict[arg](val) for arg, val in zip(args, value)] - except KeyError as e: - raise VdtParamError('mixed_list', e) - - -def is_option(value, *options): - """ - This check matches the value to any of a set of options. - - >>> vtor.check('option("yoda", "jedi")', 'yoda') - 'yoda' - >>> vtor.check('option("yoda", "jedi")', 'jed') - Traceback (most recent call last): - VdtValueError: the value "jed" is unacceptable. - >>> vtor.check('option("yoda", "jedi")', 0) - Traceback (most recent call last): - VdtTypeError: the value "0" is of the wrong type. - """ - if not isinstance(value, string_type): - raise VdtTypeError(value) - if not value in options: - raise VdtValueError(value) - return value - -def _test(value, *args, **keywargs): - """ - A function that exists for test purposes. - - >>> checks = [ - ... '3, 6, min=1, max=3, test=list(a, b, c)', - ... '3', - ... '3, 6', - ... '3,', - ... 'min=1, test="a b c"', - ... 'min=5, test="a, b, c"', - ... 'min=1, max=3, test="a, b, c"', - ... 'min=-100, test=-99', - ... 'min=1, max=3', - ... '3, 6, test="36"', - ... '3, 6, test="a, b, c"', - ... '3, max=3, test=list("a", "b", "c")', - ... '''3, max=3, test=list("'a'", 'b', "x=(c)")''', - ... "test='x=fish(3)'", - ... ] - >>> v = Validator({'test': _test}) - >>> for entry in checks: - ... pprint(v.check(('test(%s)' % entry), 3)) - (3, ('3', '6'), {'max': '3', 'min': '1', 'test': ['a', 'b', 'c']}) - (3, ('3',), {}) - (3, ('3', '6'), {}) - (3, ('3',), {}) - (3, (), {'min': '1', 'test': 'a b c'}) - (3, (), {'min': '5', 'test': 'a, b, c'}) - (3, (), {'max': '3', 'min': '1', 'test': 'a, b, c'}) - (3, (), {'min': '-100', 'test': '-99'}) - (3, (), {'max': '3', 'min': '1'}) - (3, ('3', '6'), {'test': '36'}) - (3, ('3', '6'), {'test': 'a, b, c'}) - (3, ('3',), {'max': '3', 'test': ['a', 'b', 'c']}) - (3, ('3',), {'max': '3', 'test': ["'a'", 'b', 'x=(c)']}) - (3, (), {'test': 'x=fish(3)'}) - - >>> v = Validator() - >>> v.check('integer(default=6)', '3') - 3 - >>> v.check('integer(default=6)', None, True) - 6 - >>> v.get_default_value('integer(default=6)') - 6 - >>> v.get_default_value('float(default=6)') - 6.0 - >>> v.get_default_value('pass(default=None)') - >>> v.get_default_value("string(default='None')") - 'None' - >>> v.get_default_value('pass') - Traceback (most recent call last): - KeyError: 'Check "pass" has no default value.' - >>> v.get_default_value('pass(default=list(1, 2, 3, 4))') - ['1', '2', '3', '4'] - - >>> v = Validator() - >>> v.check("pass(default=None)", None, True) - >>> v.check("pass(default='None')", None, True) - 'None' - >>> v.check('pass(default="None")', None, True) - 'None' - >>> v.check('pass(default=list(1, 2, 3, 4))', None, True) - ['1', '2', '3', '4'] - - Bug test for unicode arguments - >>> v = Validator() - >>> v.check(unicode('string(min=4)'), unicode('test')) == unicode('test') - True - - >>> v = Validator() - >>> v.get_default_value(unicode('string(min=4, default="1234")')) == unicode('1234') - True - >>> v.check(unicode('string(min=4, default="1234")'), unicode('test')) == unicode('test') - True - - >>> v = Validator() - >>> default = v.get_default_value('string(default=None)') - >>> default == None - 1 - """ - return (value, args, keywargs) - - -def _test2(): - """ - >>> - >>> v = Validator() - >>> v.get_default_value('string(default="#ff00dd")') - '#ff00dd' - >>> v.get_default_value('integer(default=3) # comment') - 3 - """ - -def _test3(): - r""" - >>> vtor.check('string(default="")', '', missing=True) - '' - >>> vtor.check('string(default="\n")', '', missing=True) - '\n' - >>> print(vtor.check('string(default="\n")', '', missing=True)) - - - >>> vtor.check('string()', '\n') - '\n' - >>> vtor.check('string(default="\n\n\n")', '', missing=True) - '\n\n\n' - >>> vtor.check('string()', 'random \n text goes here\n\n') - 'random \n text goes here\n\n' - >>> vtor.check('string(default=" \nrandom text\ngoes \n here\n\n ")', - ... '', missing=True) - ' \nrandom text\ngoes \n here\n\n ' - >>> vtor.check("string(default='\n\n\n')", '', missing=True) - '\n\n\n' - >>> vtor.check("option('\n','a','b',default='\n')", '', missing=True) - '\n' - >>> vtor.check("string_list()", ['foo', '\n', 'bar']) - ['foo', '\n', 'bar'] - >>> vtor.check("string_list(default=list('\n'))", '', missing=True) - ['\n'] - """ - - -if __name__ == '__main__': - # run the code tests in doctest format - import sys - import doctest - m = sys.modules.get('__main__') - globs = m.__dict__.copy() - globs.update({ - 'vtor': Validator(), - }) - - failures, tests = doctest.testmod( - m, globs=globs, - optionflags=doctest.IGNORE_EXCEPTION_DETAIL | doctest.ELLIPSIS) - assert not failures, f'{failures} failures out of {tests} tests' diff --git a/src/stpipe/log.py b/src/stpipe/log.py index 5d0a9fc5..d1819c01 100644 --- a/src/stpipe/log.py +++ b/src/stpipe/log.py @@ -9,9 +9,10 @@ import threading from contextlib import contextmanager +from astropy.extern.configobj import validate +from astropy.extern.configobj.configobj import ConfigObj + from . import config_parser -from .extern.configobj import validate -from .extern.configobj.configobj import ConfigObj STPIPE_ROOT_LOGGER = "stpipe" DEFAULT_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" diff --git a/src/stpipe/pipeline.py b/src/stpipe/pipeline.py index c84b3078..bd5a25ea 100644 --- a/src/stpipe/pipeline.py +++ b/src/stpipe/pipeline.py @@ -5,8 +5,9 @@ from os.path import dirname, join from typing import ClassVar +from astropy.extern.configobj.configobj import ConfigObj, Section + from . import config_parser, crds_client, log -from .extern.configobj.configobj import ConfigObj, Section from .step import Step, get_disable_crds_steppars from .utilities import _not_set diff --git a/tests/test_config_parser.py b/tests/test_config_parser.py index bef530c0..d1e5cbb5 100644 --- a/tests/test_config_parser.py +++ b/tests/test_config_parser.py @@ -2,9 +2,9 @@ from collections.abc import Mapping import pytest +from astropy.extern.configobj.configobj import ConfigObj, Section from stpipe import config_parser -from stpipe.extern.configobj.configobj import ConfigObj, Section from stpipe.utilities import _not_set