From fd8e7245cddb2aa8c6f93f27a515a2c0ca5f0649 Mon Sep 17 00:00:00 2001 From: Autumn Jolitz Date: Sat, 9 Dec 2023 19:18:17 -0800 Subject: [PATCH] [*] refactor build, add invoke interface as my makefile --- .github/workflows/build.yml | 165 ++++ .../{style-check.yml => development.yml} | 62 +- .github/workflows/release.yml | 82 -- .github/workflows/tests.yml | 50 -- .pre-commit-config.yaml | 40 +- CHANGES.rst | 22 +- CURRENT_VERSION.txt | 2 +- README.rst | 30 +- dev-requirements.txt | 3 + tasks.py | 349 ++++++++ tasksupport.py | 836 ++++++++++++++++++ 11 files changed, 1463 insertions(+), 178 deletions(-) create mode 100644 .github/workflows/build.yml rename .github/workflows/{style-check.yml => development.yml} (55%) delete mode 100644 .github/workflows/release.yml delete mode 100644 .github/workflows/tests.yml create mode 100644 tasks.py create mode 100644 tasksupport.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..af0c0c6 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,165 @@ +# This workflow will install Python dependencies, run tests and lint with a single version of Python +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions + +name: Tests + +on: + push: + tags: '*' + branches: [ master, main ] + pull_request: + branches: [ master, main ] + +jobs: + verify_style: + runs-on: 'ubuntu-latest' + steps: + - + uses: actions/checkout@v4 + - + name: Set up Python 3 + uses: actions/setup-python@v4 + with: + python-version: '3.x' + architecture: 'x64' + - + name: Install dependencies + run: | + python -m pip install --upgrade pip invoke typing-extensions + invoke setup --devel --tests + python/bin/python -m pip install black==19.3b0 click==8.0.1 + + - + name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + python/bin/python -m flake8 instruct/ --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + python/bin/python -m flake8 instruct/ --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - + name: Check style with black + run: | + python/bin/python -m black --check + + test_matrix: + needs: [verify_style] + strategy: + fail-fast: false + matrix: + experimental: [false] + arch: + - 'x64' + python_version: + - '3.7' + - '3.8' + - '3.9' + - '3.10' + - '3.11' + - '3.12' + - 'pypy3.7' + - 'pypy3.8' + - 'pypy3.9' + os: + - 'ubuntu-latest' + include: + - + python_version: 'pypy3.10' + experimental: true + arch: 'x64' + os: ubuntu-latest + + runs-on: ${{ matrix.os }} + continue-on-error: ${{ matrix.experimental }} + steps: + - + uses: actions/checkout@v4 + - + name: Set up Python ${{ matrix.python_version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python_version }} + architecture: ${{ matrix.arch }} + - + name: Install dependencies + run: | + python -m pip install --upgrade pip invoke typing-extensions + invoke setup --tests + - + name: Test with pytest + run: | + python/bin/python -m pytest + + pypi-publish: + needs: [test_matrix, verify_style] + runs-on: 'ubuntu-latest' + environment: + name: pypi + url: https://pypi.org/p/instruct + permissions: + id-token: write + contents: write + steps: + - + uses: actions/checkout@v4 + - + name: Set up Python 3.11 + uses: actions/setup-python@v4 + with: + python-version: '3.11' + architecture: 'x64' + - + name: Set some variables... + id: version + run: | + echo "CURRENT_VERSION=$(grep -vE '^#' CURRENT_VERSION.txt | head -1)" >> "$GITHUB_OUTPUT" + if [ "x${{ github.ref_type == 'tag' && startsWith(github.ref, 'refs/tags/v') }}" = 'xtrue' ]; then + echo "GIT_VERSION=$(echo '${{ github.ref_name }}' | sed 's/^.//')" >> "$GITHUB_OUTPUT" + fi + + - + name: Warn tag version matches the source version + if: ${{ github.ref_type != 'tag' }} + run: | + if [ 'x${{ steps.version.outputs.GIT_VERSION }}' != 'x${{ steps.version.outputs.CURRENT_VERSION }}' ]; then + echo '::warn file=CURRENT_VERSION.txt,line=2,title=Version mismatch::Expected ${{ steps.version.outputs.GIT_VERSION }} but got ${{ steps.version.outputs.CURRENT_VERSION }} instead. + + If you make a tag from this, it *will* error out.' + fi + + - + name: Assert tag version matches the source version + if: ${{ github.ref_type == 'tag' }} + run: | + if [ 'x${{ steps.version.outputs.GIT_VERSION }}' != 'x${{ steps.version.outputs.CURRENT_VERSION }}' ]; then + echo '::error file=CURRENT_VERSION.txt,line=2,title=Version mismatch::Expected ${{ steps.version.outputs.GIT_VERSION }} but got ${{ steps.version.outputs.CURRENT_VERSION }} instead. + Suggest you fix that and delete the tag.' + exit 254 + fi + - + name: Setup + run: | + python -m pip install --upgrade pip invoke typing-extensions + invoke setup --devel --no-project + - + name: Create artifacts + id: artifacts + run: | + python/bin/python -m build + + - + name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + if: ${{ github.ref_type == 'tag' && startsWith(github.ref, 'refs/tags/v') }} + with: + skip-existing: false + + - + name: Create Release + if: ${{ github.ref_type == 'tag' && startsWith(github.ref, 'refs/tags/v') }} + id: upload-release-asset + uses: softprops/action-gh-release@v1 + with: + body_path: CHANGES.rst + name: Release ${{ steps.version.outputs.CURRENT_VERSION }} + files: + dist/instruct* diff --git a/.github/workflows/style-check.yml b/.github/workflows/development.yml similarity index 55% rename from .github/workflows/style-check.yml rename to .github/workflows/development.yml index a7bed2d..efac624 100644 --- a/.github/workflows/style-check.yml +++ b/.github/workflows/development.yml @@ -1,52 +1,76 @@ # This workflow will install Python dependencies, run tests and lint with a single version of Python # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions -name: Check style +name: Development on: push: - branches: [ master ] - pull_request: - branches: [ master ] + branches: + - '!main' + - '!master' + - '*' jobs: - build: + test-matrix: strategy: fail-fast: false matrix: arch: - 'x64' python_version: - - '3.x' - os: + - '3.7' + - '3.8' + - '3.9' + - '3.10' + - '3.11' + - '3.12' + os: - 'ubuntu-latest' runs-on: ${{ matrix.os }} steps: - - + - uses: actions/checkout@v4 - - + - name: Set up Python ${{ matrix.python_version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python_version }} architecture: ${{ matrix.arch }} - - + - name: Install dependencies run: | - python -m pip install --upgrade pip - python -m pip install black==19.3b0 click==8.0.1 - python -m pip install .[devel,test] - - + python -m pip install --upgrade pip invoke typing-extensions + invoke setup --tests + - + name: Test with pytest + run: | + python -m pytest + + verify_style: + needs: [test-matrix] + runs-on: 'ubuntu-latest' + steps: + - + uses: actions/checkout@v4 + - + name: Set up Python 3 + uses: actions/setup-python@v4 + with: + python-version: '3.x' + architecture: 'x64' + - + name: Install dependencies + run: | + python -m pip install --upgrade pip invoke typing-extensions + invoke setup --no-project --devel --tests + - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names python -m flake8 instruct/ --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide python -m flake8 instruct/ --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - - name: Check style + - + name: Check style with black run: | python -m black --check - - - diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml deleted file mode 100644 index 40a1bbd..0000000 --- a/.github/workflows/release.yml +++ /dev/null @@ -1,82 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a single version of Python -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions - -name: Release - -on: - push: - branches: [ master ] - -jobs: - build: - strategy: - fail-fast: false - matrix: - arch: - - 'x64' - python_version: - - '3.7' - - '3.8' - - '3.9' - - '3.10' - - '3.11' - - '3.12' - os: - - 'ubuntu-latest' - runs-on: ${{ matrix.os }} - steps: - - - uses: actions/checkout@v4 - - - name: Set up Python ${{ matrix.python_version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python_version }} - architecture: ${{ matrix.arch }} - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - mkdir wheels - python -m pip wheel -w wheels -r setup-requirements.txt -r dev-requirements.txt -r test-requirements.txt - python -m pip install --no-index --find-links=wheels/ build - python -m build - python -m pip install --find-links=dist/ --find-links=wheels/ --no-index instruct[devel,test] - - - name: Test with pytest - run: | - python -m pytest - pypi-publish: - needs: [build] - runs-on: 'ubuntu-latest' - environment: - name: pypi - url: https://pypi.org/p/instruct - permissions: - id-token: write - steps: - - - uses: actions/checkout@v4 - - - name: Set up Python 3.11 - uses: actions/setup-python@v4 - with: - python-version: '3.11' - architecture: 'x64' - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - python -m pip install -r dev-requirements.txt - - - name: Create artifacts - run: | - python -m build - - - name: Publish package distributions to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 - with: - skip-existing: true - - - diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml deleted file mode 100644 index 3d0433c..0000000 --- a/.github/workflows/tests.yml +++ /dev/null @@ -1,50 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a single version of Python -# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions - -name: Tests - -on: - push: - branches: [ master ] - pull_request: - branches: [ master ] - -jobs: - build: - strategy: - fail-fast: false - matrix: - arch: - - 'x64' - python_version: - - '3.7' - - '3.8' - - '3.9' - - '3.10' - - '3.11' - - '3.12' - os: - - 'ubuntu-latest' - runs-on: ${{ matrix.os }} - steps: - - - uses: actions/checkout@v4 - - - name: Set up Python ${{ matrix.python_version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python_version }} - architecture: ${{ matrix.arch }} - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - mkdir wheels - python -m pip wheel -w wheels -r setup-requirements.txt -r dev-requirements.txt -r test-requirements.txt - python -m pip install --no-index --find-links=wheels/ build - python -m build - python -m pip install --find-links=dist/ --find-links=wheels/ --no-index instruct[devel,test] - - - name: Test with pytest - run: | - python -m pytest diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 67894cd..42a4c95 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,35 @@ repos: -- repo: https://github.com/python/black - rev: '19.3b0' + - + repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 hooks: - - id: black - language_version: /usr/local/bin/python3 - args: - - '--config' - - '.black.toml' + - + id: check-ast + - + id: check-case-conflict + - + id: check-executables-have-shebangs + - + id: check-merge-conflict + - + id: check-yaml + - + id: end-of-file-fixer + - + id: check-shebang-scripts-are-executable + - + id: detect-private-key + - + id: trailing-whitespace + args: + - '--markdown-linebreak-ext=rst' + - + repo: https://github.com/python/black + rev: '23.11.0' + hooks: + - + id: black + language_version: python3 + args: + - '--config' + - '.black.toml' diff --git a/CHANGES.rst b/CHANGES.rst index 20776f7..edd0d73 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,24 @@ +Version 0.7.4 +---------------- + +Release 2023-12-09 + +- Supported more Python versions and implementations: + - Tier 1 support: CPython 3.8, 3.9, 3.10, 3.11, 3.12 + - Tier 2 support: PyPy 3.7, 3.8, 3.9 + - Tier 3 support: PyPy 3.10 +- Switched to a tag based release model +- Add infrastructure for running tests + +Version 0.7.3 +----------------- + +Release 2023-07-18 + +- add notes on use of ``Range`` and friends +- Export ``RangeFlags`` from ``__init__`` +- Unlock ``typing-extensions`` range + Version 0.7.2 ----------------- @@ -36,4 +57,3 @@ Release 2022-05-12 - Upgrade to Jinja2 for the 3.x series! - Upgrade typing-extensions to 4.2.0 - Mark support as Python 3.7+ - diff --git a/CURRENT_VERSION.txt b/CURRENT_VERSION.txt index 62417b5..92d5cf1 100644 --- a/CURRENT_VERSION.txt +++ b/CURRENT_VERSION.txt @@ -1,2 +1,2 @@ # bump the below version on release -0.7.3.post1 \ No newline at end of file +0.7.4 diff --git a/README.rst b/README.rst index 8e50197..aba4860 100644 --- a/README.rst +++ b/README.rst @@ -1,12 +1,9 @@ Instruct ========== -|Release| -|Style| +|Build| |PyPI| - - A compact, fast object system that can serve as the basis for a DAO model. To that end, instruct uses ``__slots__`` to prevent new attribute addition, properties to control types, event listeners and historical changes, and a Jinja2-driven codegen to keep a pure-Python implementation as fast and as light as possible. @@ -125,14 +122,14 @@ And have it work like this? print(tuple(org.list_changes())) # Returns # ( - # LoggedDelta(timestamp=1652412832.7408261, key='name', delta=Delta(state='default', old=Undefined, new='', index=0)), - # LoggedDelta(timestamp=1652412832.7408261, key='id', delta=Delta(state='default', old=Undefined, new=-1, index=0)), - # LoggedDelta(timestamp=1652412832.7408261, key='members', delta=Delta(state='default', old=Undefined, new=[], index=0)), - # LoggedDelta(timestamp=1652412832.7408261, key='created_date', delta=Delta(state='default', old=Undefined, new=datetime.datetime(2022, 5, 13, 3, 33, 52, 740650), index=0)), - # LoggedDelta(timestamp=1652412832.740923, key='id', delta=Delta(state='initialized', old=-1, new=123, index=4)), - # LoggedDelta(timestamp=1652412832.741002, key='members', delta=Delta(state='initialized', old=[], new=[<__main__.Member._Member object at 0x104364640>], index=5)), - # LoggedDelta(timestamp=1652412832.741009, key='name', delta=Delta(state='initialized', old='', new='An Org', index=6)), - # LoggedDelta(timestamp=1652412832.741021, key='name', delta=Delta(state='update', old='An Org', new='New Name', index=7)), + # LoggedDelta(timestamp=1652412832.7408261, key='name', delta=Delta(state='default', old=Undefined, new='', index=0)), + # LoggedDelta(timestamp=1652412832.7408261, key='id', delta=Delta(state='default', old=Undefined, new=-1, index=0)), + # LoggedDelta(timestamp=1652412832.7408261, key='members', delta=Delta(state='default', old=Undefined, new=[], index=0)), + # LoggedDelta(timestamp=1652412832.7408261, key='created_date', delta=Delta(state='default', old=Undefined, new=datetime.datetime(2022, 5, 13, 3, 33, 52, 740650), index=0)), + # LoggedDelta(timestamp=1652412832.740923, key='id', delta=Delta(state='initialized', old=-1, new=123, index=4)), + # LoggedDelta(timestamp=1652412832.741002, key='members', delta=Delta(state='initialized', old=[], new=[<__main__.Member._Member object at 0x104364640>], index=5)), + # LoggedDelta(timestamp=1652412832.741009, key='name', delta=Delta(state='initialized', old='', new='An Org', index=6)), + # LoggedDelta(timestamp=1652412832.741021, key='name', delta=Delta(state='update', old='An Org', new='New Name', index=7)), # LoggedDelta(timestamp=1652412832.741031, key='created_date', delta=Delta(state='update', old=datetime.datetime(2022, 5, 13, 3, 33, 52, 740650), new=datetime.datetime(2018, 10, 23, 0, 0), index=8)) # ) @@ -259,7 +256,7 @@ Example: File "/Users/autumn/software/instruct/instruct/__init__.py", line 2094, in _handle_init_errors ) from errors[0] instruct.exceptions.ClassCreationFailed: ('Unable to construct Planet, encountered 1 error', RangeError('Unable to fit 1188.30742 into [2439.766, 142800)', 1188.30742, (Range(2439.766, 142800, flags=CLOSED_OPEN, type_restrictions=()),))) - >>> + >>> Comparison to Pydantic @@ -432,8 +429,5 @@ After additions of those. Safety is expensive. .. |PyPI| image:: https://img.shields.io/pypi/v/instruct.svg :target: https://pypi.python.org/pypi/instruct -.. |Release| image:: https://github.com/autumnjolitz/instruct/actions/workflows/release.yml/badge.svg - :target: https://github.com/autumnjolitz/instruct/actions/workflows/release.yml - -.. |Style| image:: https://github.com/autumnjolitz/instruct/actions/workflows/style-check.yml/badge.svg - :target: https://github.com/autumnjolitz/instruct/actions/workflows/style-check.yml +.. |Build| image:: https://github.com/autumnjolitz/instruct/actions/workflows/build.yml/badge.svg + :target: https://github.com/autumnjolitz/instruct/actions/workflows/build.yml diff --git a/dev-requirements.txt b/dev-requirements.txt index 2d3a821..ec8973e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,3 +1,6 @@ twine build black +invoke +# bump-my-version +# git-changelog diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..f55efc3 --- /dev/null +++ b/tasks.py @@ -0,0 +1,349 @@ +import json +import os +import shutil +import types +import base64 +import builtins +import sys +from contextlib import suppress, contextmanager +from pathlib import Path +from typing import Type, Union, Dict, Tuple, Iterable, TypeVar + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + + +from invoke.context import Context +from tasksupport import task, first, InvertedMapping, trim, truncate + +_ = types.SimpleNamespace() +this = sys.modules[__name__] + +DEFAULT_FORMAT = "lines" + + +def perror(*args, file=None, **kwargs): + if file is None: + file = sys.stderr + return print(*args, file=file, **kwargs) + + +@contextmanager +def create_environment(*, copy_os_environ: bool = False, **kwargs: str) -> Dict[str, str]: + """ + Returns some common values for Docker builds + """ + environment = { + **(os.environ if copy_os_environ else {}), + "NO_COLOR": "1", + "COMPOSE_DOCKER_CLI_BUILD": "1", + "BUILDX_EXPERIMENTAL": "1", + "BUILDX_GIT_LABELS": "full", + "BUILDKIT_PROGRESS": "plain", + "DOCKER_BUILDKIT": "1", + "COMPOSE_PROJECT_NAME": _.project_name(silent=True), + **kwargs, + } + return environment + + +@contextmanager +def cd(path: Union[str, Path]): + if not isinstance(path, Path): + path = Path(path) + new_cwd = path.resolve() + prior_cwd = Path(os.getcwd()).resolve() + try: + os.chdir(new_cwd) + yield prior_cwd + finally: + os.chdir(prior_cwd) + + +@task +def branch_name(context: Context) -> str: + with suppress(KeyError): + return os.environ["GITHUB_REF_NAME"] + here = this._.project_root(Path, silent=True) + if (here / ".git").is_dir(): + with suppress(FileNotFoundError): + return context.run(f"git -C {here!s} branch --show-current", hide="both").stdout.strip() + with open(here / ".git" / "HEAD") as fh: + for line in fh: + if line.startswith("ref:"): + _, line = (x.strip() for x in line.split(":", 1)) + if line.startswith("refs/heads/"): + return line.removeprefix("refs/heads/") + raise ValueError("Unable to determine branch name!") + + +T = TypeVar("T") + + +def window(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]: + g = iter(iterable) + item = next(g) + for next_item in g: + yield item, next_item + next_item = item + + +@task +def setup_metadata() -> Dict[str, str]: + with open(_.project_root(Path, silent=True) / "setup.cfg") as fh: + in_multiline = False + in_metadata = False + values = None + key = None + mapping = {} + for line in fh: + if line.startswith("[metadata]"): + in_metadata = True + continue + if line.startswith("[") and in_metadata: + in_metadata = False + if in_metadata: + if "=" in line and in_multiline: + in_multiline = False + mapping[key] = values + values = None + if in_multiline: + values.append(line.strip()) + continue + key, value = [x.strip() for x in line.split("=", 1)] + if not value: + in_multiline = True + values = [] + continue + mapping[key] = value.strip() + return mapping + + +@task +def project_name(context: Context) -> str: + return _.setup_metadata(silent=True)["name"] + + +@task +def b64encode(value: str, silent: bool = True) -> str: + return base64.urlsafe_b64encode(value.encode()).decode().strip() + + +@task +def b64decode(value: str, silent: bool = True) -> str: + remainder = len(value) % 8 + if remainder: + value += "=" * remainder + return base64.urlsafe_b64decode(value).decode().strip() + + +@task +def build(context: Context) -> Tuple[Path, ...]: + python_bin = _.python_path(str) + context.run(f"{python_bin} -m build") + return tuple(Path("dist").iterdir()) + + +# @task +# def get_tags_from(context: Context, image_name: str) -> Iterable[str]: +# """ +# Given an image url, return the repo tags +# """ +# try: +# result = context.run(f"docker inspect {image_name}", hide="both") +# except UnexpectedExit as e: +# if "Error: No such object:" in e.result.stderr: +# context.run(f"docker pull {image_name}", env=compose_environ()) +# result = context.run(f"docker inspect {image_name}", hide="both") +# else: +# raise +# image = json.loads(result.stdout) +# results = [] +# for match in image: +# results.extend(match["RepoTags"]) +# return results + + +@task +def project_root( + type: Union[Type[str], Type[Path], Literal["str", "Path"]] = "str" +) -> Union[str, Path]: + """ + Get the absolute path of the project root assuming tasks.py is in the repo root. + """ + if isinstance(type, builtins.type): + type = type.__name__ + assert type in ("str", "Path"), f"{type} may be str or Path" + root = Path(__file__).resolve().parent + if type == "str": + return str(root) + return root + + +@task +def python_path( + type_name: Literal["str", "Path", str, Path] = "str", + *, + skip_venv: bool = False, +) -> Union[str, Path]: + """ + Return the best python to use + """ + if isinstance(type_name, type): + type_name = type_name.__name__ + assert type_name in ("Path", "str") + root = Path(__file__).resolve().parent + python = root / "python" / "bin" / "python" + if not python.exists(): + with suppress(KeyError): + python = Path(os.environ["VIRTUAL_ENV"]) / "bin" / "python" + if skip_venv or not python.exists(): + failed_pythons = [] + for version in ("3.12", "3.11", "3.10", "3.9", "3.8", "3.7", "3"): + candidate = shutil.which(f"python{version}") + if candidate is None: + continue + with suppress(FileNotFoundError): + python = Path( + candidate, + path=":".join( + x for x in os.environ["PATH"].split(":") if Path(x) != python.parent + ), + ).resolve(True) + break + failed_pythons.append(candidate or f"python{version}") + else: + raise FileNotFoundError( + "Unable to find a single python3 binary! Tried {}".format(", ".join(failed_pythons)) + ) + if type_name == "str": + return str(python) + return python + + +@task +def setup( + context: Context, + python_bin: Union[str, None] = None, + tests: bool = False, + devel: bool = False, + project: bool = True, + swap_venv_stage: str = None, +) -> Path: + """ + Create the venv for this project. + + This task can destroy the project's venv and recreate it from the same process id. + + swap_venv_stage: This is the internals of how a venv can replace itself while depending only + on the utilities within it (i.e. invoke). + """ + root = _.project_root(Path) + venv = root / "python" + if python_bin is None: + python_bin = _.python_path(str) + + requirements = "" + if project: + requirements = "-r setup-requirements.txt" + if devel: + requirements = f"{requirements} -r dev-requirements.txt" + else: + requirements = f"{requirements} invoke" + if sys.version_info[:2] < (3, 11): # 3.11 has get_overloads + requirements = f"{requirements} typing-extensions" + if tests: + requirements = f"{requirements} -r test-requirements.txt" + if project: + requirements = f"{requirements} -e ." + if any((devel, tests)): + extra_addons = ",".join( + [arg for arg, enabled in (("devel", devel), ("test", tests)) if enabled] + ) + requirements = f"{requirements}[{extra_addons}]" + + if swap_venv_stage == "1-copy-new-venv": + perror(f"Removing old venv at {venv}") + shutil.rmtree(root / "python") + context.run(f"{venv!s}_/bin/python -m venv --copies {venv!s}") + if requirements: + context.run(f"{venv!s}/bin/python -m pip install {requirements}") + os.execve( + f"{venv!s}/bin/python", + ("python", "-m", "invoke", "setup", "--swap-venv-stage", "2-remove-tmp-venv"), + os.environ, + ) + assert False, "unreachable!" + if swap_venv_stage == "2-remove-tmp-venv": + tmp_venv = root / "python_" + perror(f"Removing temp venv {tmp_venv}") + shutil.rmtree(tmp_venv) + original_argv = [] + try: + original_argv = json.loads(os.environ["_INSTRUCT_INVOKE_TASK_ORIG_ARGS"]) + except ValueError: + perror("Unable to decode original _INSTRUCT_INVOKE_TASK_ORIG_ARGS!", file=sys.stderr) + while original_argv and original_argv[0] == "--": + del original_argv[0] + perror("Attempting to restore argv after setup which is", original_argv) + if not original_argv: + return + os.execve(f"{venv!s}/bin/python", ("python", "-m", "invoke", *original_argv), os.environ) + assert False, "unreachable!" + + current_python = Path(sys.executable) + with suppress(FileNotFoundError): + shutil.rmtree(f"{venv!s}_") + if venv.exists() and str(current_python).startswith(str(venv)): + # ARJ: Complex path: replacing a running environment. + # Time for the os.execve hat dance! + # make the subenvironment + perror(f"installing tmp venv at {venv!s}_") + context.run(f"{python_bin} -m venv {venv!s}_", hide="both") + with Path(root / "dev-requirements.txt").open("rb") as fh: + for line in fh: + line_st = line.strip() + while b"#" in line_st: + line_st = line[: line_st.rindex(b"#")].strip() + if not line_st: + continue + if line.startswith(b"invoke"): + break + else: + line = b"invoke" + perror(f"installing tmp venv invoke") + context.run(f"{venv!s}_/bin/python -m pip install {line.decode()}", hide="both") + + args = [] + skip_if_args = 0 + task_executed = True + for arg in sys.argv: + if task_executed and arg == "setup": + skip_if_args += 2 + task_executed = False + continue + if arg == "--" or not arg.startswith("-"): + skip_if_args = 0 + if arg == "--": + continue + elif skip_if_args: + skip_if_args -= 1 + continue + if task_executed is False: + args.append(arg) + os.environ["_INSTRUCT_INVOKE_TASK_ORIG_ARGS"] = json.dumps(args) + os.execve( + f"{venv!s}_/bin/python", + ("python", "-m", "invoke", "setup", "--swap-venv-stage", "1-copy-new-venv"), + os.environ, + ) + assert False, "unreachable" + # Happy path: + with suppress(FileNotFoundError): + shutil.rmtree(root / "python") + context.run(f"{python_bin} -m venv {venv!s}") + if requirements: + context.run(f"{venv!s}/bin/python -m pip install {requirements}") + return venv diff --git a/tasksupport.py b/tasksupport.py new file mode 100644 index 0000000..06baad4 --- /dev/null +++ b/tasksupport.py @@ -0,0 +1,836 @@ +import inspect +import json +import sys +import os +import functools +import types +import dataclasses +import shutil +import pprint +import typing +import importlib +import builtins +import importlib.util +from pathlib import Path +from typing import ( + Any, + Tuple, + Optional, + Dict, + TypeVar, + Iterable, + ValuesView, + KeysView, + Generic, + Mapping, + Union, + Type, + overload, + List, + NamedTuple, +) +from invoke import task as _task +from invoke.context import Context +from collections import ChainMap +from contextlib import suppress +from collections.abc import MutableSet as AbstractSet + +try: + import typing_extensions +except ImportError: + has_typing_extensions = False +else: + has_typing_extensions = True + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal +try: + from typing import get_origin, get_args, get_overloads +except ImportError: + from typing_extensions import get_overloads, get_args, get_origin + +T = TypeVar("T") +U = TypeVar("U") +DEBUG_CODEGEN = "DEBUG_CODEGEN" in os.environ and os.environ["DEBUG_CODEGEN"].lower().startswith( + ("1", "yes", "y", "on", "t") +) +DEFAULT_FORMAT: Literal["lines", "json", "python"] = "lines" + + +class InvertedMapping(Generic[T, U], AbstractSet): + _mapping: Mapping[T, U] + _keys_view: Optional[KeysView] + __slots__ = ( + "_mapping", + "_keys_view", + "_values_view", + "_iter_values_func", + "_contains_func", + "_getitem_func", + ) + + # Start by filling-out the abstract methods + def __init__(self, mapping, **kwargs): + self._mapping = mapping + self._iter_values_func = self._iter_via_iter + self._contains_func = self._mapping_contains + self._getitem_func = self._get_key_from_value_mapping + + with suppress(AttributeError): + keys = mapping.keys() + if isinstance(keys, KeysView): + self._keys_view = keys + self._iter_values_func = self._iter_view_values_view + with suppress(AttributeError): + values = mapping.values() + if isinstance(values, ValuesView): + self._values_view = values + self._contains_func = self._view_contains + self._getitem_func = self._get_key_from_value_view + + def __getitem__(self, key: Union[U, slice]) -> T: + if isinstance(key, slice): + if key.start and key.stop and key.start != key.stop: + emit = False + keys = [] + for mapkey in self._mapping: + if key.start == mapkey: + emit = True + if key.stop == mapkey: + if not emit: + return () + emit = False + if emit: + keys.append(key) + return tuple(self[key] for key in keys) + if key.start: + return self._getitem_func(key.start, all=True) + if not any((key.start, key.stop, key.step)): + return type(self)(self._mapping.copy()) + raise ValueError + return self._getitem_func(key) + + def add(self, value: U): + self._mapping[value] = value + + def discard(self, value: U) -> None: + keys = self.getall(value) + for key in keys: + del self._mapping[key] + + def getall(self, value: U) -> Tuple[T, ...]: + return self._get_key_from_value_mapping(value, all=True) + + def get(self, value: U, default: Optional[T] = None) -> Optional[T]: + keys = self.getall(value) + with suppress(ValueError): + key, *_ = keys + return key + return default + + def _get_key_from_value_mapping(self, value: U, all: bool = False) -> Union[T, Tuple[T, ...]]: + keys = [] + for map_key, map_value in self._mapping.items(): + if map_value == value: + keys.append(map_key) + if keys: + if all: + return tuple(keys) + return keys[0] + if all: + return () + raise KeyError(value) + + def _get_key_from_value(self, value: U, all: bool = False) -> T: + if value not in self._values_view: + raise KeyError(value) + return self._get_key_from_value_mapping(value, all=all) + + def __len__(self): + return len(self._mapping) + + def __iter__(self): + with suppress(AttributeError): + yield from self._iter_values_func() + + def __contains__(self, value): + with suppress(AttributeError): + return self._contains_func(value) + return False + + def _iter_view_values_view(self): + yield from self._values_view + + def _iter_via_iter(self) -> Iterable[T]: + for key in self._mapping: + yield self._mapping[key] + + # Modify __contains__ and get() to work like dict + # does when __missing__ is present. + def _view_contains(self, value: Any) -> bool: + return value in self._values_view + + def _mapping_contains(self, value: U) -> bool: + for member in self: + if member == value: + return True + return False + + def __str__(self): + keys = repr(tuple(self))[1:-1] + return "{%s}" % keys + + # Now, add the methods in dicts but not in MutableMapping + def __repr__(self): + return f"{type(self).__name__}({self._mapping!r})" + + +def is_context_param( + param: inspect.Parameter, context_param_names: Tuple[str, ...] = ("c", "ctx", "context") +) -> Optional[Literal["name", "type", "name_and_type"]]: + value = None + if param.name in context_param_names: + value = "name" + if param.annotation: + if param.annotation is Context: + if value: + value = f"{value}_and_type" + else: + value = "type" + elif get_origin(param.annotation) is typing.Union: + if Context in get_args(param.annotation): + if value: + value = f"{value}_and_type" + else: + value = "type" + return value + + +if "slots" in inspect.signature(dataclasses.dataclass).parameters: + thunk = dataclasses.dataclass(frozen=True, order=True, slots=True) +else: + thunk = dataclasses.dataclass(frozen=True, order=True) + + +@thunk +class FoundType: + in_namespace: bool = dataclasses.field(hash=True, compare=True) + namespace_path: Tuple[str, ...] = dataclasses.field(hash=True, compare=True) + namespace_values: Tuple[Any, ...] = dataclasses.field(hash=False, compare=False) + + @property + def key(self): + return self.namespace_path[0] + + @property + def value(self): + return self.namespace_values[0] + + +def is_literal(item) -> bool: + with suppress(AttributeError): + return (item.__module__, item.__name__) in ( + ("typing", "Literal"), + ("typing_extensions", "Literal"), + ) + return False + + +def is_type_container(item): + origin = get_origin(item) + if origin is None: + return False + return True + + +def find_this(name="tasks") -> types.ModuleType: + with suppress(KeyError): + return sys.modules[name] + importlib.import_module(name) + return sys.modules[name] + + +def get_types_from( + annotation, + in_namespace: Optional[Dict[str, Any]] = None, +) -> Iterable[FoundType]: + if in_namespace is None: + in_namespace = vars(find_this()) + if annotation is inspect.Signature.empty: + annotation = Any + if isinstance(annotation, str): + ns = {} + exec(f"annotation = {annotation!s}", vars(find_this()), ns) + annotation = ns["annotation"] + + if is_literal(annotation): + return + if annotation in (Any, Ellipsis): + return + type_name = None + with suppress(AttributeError): + type_name = annotation.__qualname__ + origin = get_origin(annotation) + args = get_args(annotation) + if origin is Literal: + yield FoundType( + "Literal" in in_namespace, + [ + "Literal", + ], + [Literal], + ) + for arg in args: + if not isinstance(arg, type): + arg = type(arg) + if arg.__name__: + yield FoundType(arg.__name__ in in_namespace, [arg.__name__], [arg]) + return + if origin is not None and args is not None: + for module in types, typing, builtins: + for value in vars(module).values(): + if value is origin: + for arg in args: + yield from get_types_from(arg, in_namespace) + return + else: + if isinstance(origin, type): + yield FoundType(origin.__name__ in in_namespace, [origin.__name__], [origin]) + for arg in args: + yield from get_types_from(arg) + return + raise NotImplementedError(f"Unsupported origin type {origin!r} {annotation}") + assert not args and not origin + if annotation is None: + yield FoundType("None" in in_namespace, ["None"], [None]) + return + assert isinstance( + annotation, type + ), f"not a type - {annotation!r} {type(annotation)} {annotation.__module__}" + if type_name.split(".")[0] in vars(builtins): + return + if f"{annotation.__module__}.{annotation.__name__}" != annotation.__qualname__: + type_name = f"{annotation.__module__}.{annotation.__name__}" + path = [] + target = types.SimpleNamespace(**in_namespace) + path_values = [] + for step in type_name.split("."): + path.append(step) + try: + target = getattr(target, step) + except AttributeError as e: + try: + target = getattr(find_this(), path[0]) + except AttributeError: + try: + # print('trying', path, type_name) + target = importlib.import_module(".".join(path)) + except ImportError: + raise e from None + path_values.append(target) + + yield FoundType(path[0] in in_namespace, path, path_values) + + +def reify_annotations_in( + namespace: Dict[str, Any], signature: inspect.Signature +) -> inspect.Signature: + for index, param in enumerate(signature.parameters): + param = signature.parameters[param] + for result in get_types_from(param.annotation, namespace): + if result.in_namespace: + continue + namespace[result.key] = result.value + # print('setting', result.key, 'to', result.value) + for result in get_types_from(signature.return_annotation): + if result.in_namespace: + continue + namespace[result.key] = result.value + return signature + + +def sanitize_return(func, ns): + NOT_SET = object() + sig = inspect.signature(func) + if sig.return_annotation is inspect.Signature.empty: + returns = NOT_SET + for overload_func in get_overloads(func): + overload_signature = reify_annotations_in(ns, inspect.signature(overload_func)) + # print(overload_signature) + if returns is NOT_SET: + returns = overload_signature.return_annotation + continue + returns |= overload_signature.return_annotation + if returns is not NOT_SET: + sig = sig.replace(return_annotation=returns) + else: + sig = sig.replace(return_annotation=Any) + return sig + + +def safe_annotation_string_from(annotation): + if str(annotation).startswith(" Optional[Any]: + try: + value = kwargs[keyname] + except KeyError: + if keyname in signature.parameters: + for index, value in enumerate(tuple(signature.parameters)): + value = signature.parameters[value] + if value.name == keyname: + with suppress(IndexError): + value = args[index] + if delete_if_not_in_signature and wrapper_signature: + if keyname not in wrapper_signature.parameters: + del args[index] + return value + else: + if delete_if_not_in_signature and wrapper_signature: + if keyname not in wrapper_signature.parameters: + del kwargs[keyname] + return value + return None + + +def raw_param_body_from(function: inspect.Signature): + sig_funccall = [] + for param_name in function.parameters: + param = function.parameters[param_name] + if param.kind in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ): + sig_funccall.append(f"{param.name}") + elif param.kind is inspect.Parameter.KEYWORD_ONLY: + sig_funccall.append(f"{param.name}={param.name}") + elif param.kind is inspect.Parameter.VAR_KEYWORD: + sig_funccall.append(f"**{param.name}") + elif param.kind is inspect.Parameter.VAR_POSITIONAL: + sig_funccall.append(f"*{param.name}") + return ", ".join(sig_funccall) + + +T = TypeVar("T") + + +def first(iterable: Iterable[T]) -> T: + for item in iterable: + return item + + +def indentation_length(s: str) -> int: + length = 0 + if "\n" in s: + for line in s.splitlines(True)[1:]: + for char in line: + if char == " ": + length += 1 + continue + break + return length + for char in s: + if char == " ": + length += 1 + continue + break + return length + + +INTERNAL_WRAPPER = """ +def %(name)s%(args)s: + _priv_format = %(format_kwarg)s + if _priv_format not in (None, 'json', 'python', 'lines'): + raise ValueError("Argument %(format_kwarg)s must be either None or one of 'json', 'python', 'lines'") + # print("Called from %(name)s%(args)s and proxied to %(name)s(%(sig_funccall)s)") + # if "%(name)s" == "b64encode": + # print("ARJ!", type(value)) + result = _._original_%(name)s(%(sig_funccall)s) + if silent: + return result + + if _priv_format is None: + _priv_format = 'lines' + if _priv_format == "json": + kwargs = {} + if sys.stdout.isatty(): + kwargs = {"indent": 4, "sort_keys": True} + try: + print(json.dumps(result, **kwargs)) + except ValueError: + print('Unable to render as json!', file=sys.stderr) + _priv_format = "json" + else: + return result + if _priv_format == "python": + print(pprint.pformat(result)) + return result + if _priv_format == 'lines': + if isinstance(result, Mapping): + for key in result: + value = result[key] + print(f"{key}:\t{value}") + return result + elif isinstance(result, Iterable) and not isinstance(result, (str, bytes)): + for item in result: + print(item) + return result + if result is not None: + print(result) + return result + return result + + """ + +PUBLIC_WRAPPER_FOR_INVOKE = """ +def %(name)s%(args)s: + _priv_format = %(format_kwarg)s + if _priv_format not in (None, 'json', 'python', 'lines'): + raise ValueError("Argument %(format_kwarg)s must be either None or one of 'json', 'python', 'lines'") + result = this._._original_%(name)s(%(sig_funccall)s) + if silent: + return result + + if _priv_format is None: + try: + _priv_format = this.DEFAULT_FORMAT + except AttributeError: + _priv_format = _tasksupport.DEFAULT_FORMAT + print(f'WARNING: {this.__name__}.DEFAULT_FORMAT not defined (see {this.__file__!r}). Defaulting to \"lines\"', file=sys.stderr) + this.DEFAULT_FORMAT = _priv_format + if _priv_format == "json": + kwargs = {} + if sys.stdout.isatty(): + kwargs = {"indent": 4, "sort_keys": True} + try: + print(json.dumps(result, **kwargs)) + except ValueError: + print('Unable to render as json!', file=sys.stderr) + _priv_format = "json" + else: + return result + if _priv_format == "python": + print(pprint.pformat(result)) + return result + if _priv_format == 'lines': + if isinstance(result, Mapping): + for key in result: + value = result[key] + print(f"{key}:\t{value}") + return result + elif isinstance(result, Iterable) and not isinstance(result, (str, bytes)): + for item in result: + print(item) + return result + if result is not None: + print(result) + return result + return result + +""" + + +def task(callable_=None, **kwargs): + def wrapper(func): + # print("Called from", inspect.stack()[1].frame.f_globals["__name__"]) + # Make a read only copy + stack = inspect.stack()[1:] + task_frame = stack[0].frame + while stack and task_frame.f_globals["__name__"] == __name__: + del stack[0] + task_frame = stack[0].frame + assert task_frame.f_globals["__name__"] != __name__ + assert task_frame.f_globals["__name__"] == "tasks" + if "Mapping": + task_frame.f_globals["Mapping"] = Mapping + support_module_name = Path(task_frame.f_globals["__file__"]).stem + filename = f"_support_cache/{support_module_name}_{func.__name__}.py" + with suppress(FileNotFoundError): + os.remove(filename) + if DEBUG_CODEGEN: + os.makedirs("_support_cache", exist_ok=True) + elif os.path.exists("_support_cache"): + shutil.rmtree("_support_cache") + this = sys.modules[task_frame.f_globals["__name__"]] + if "this" not in task_frame.f_globals: + task_frame.f_globals = this + task_frame.f_globals[f"_{__name__}"] = find_this(__name__) + assert this.__name__ == "tasks" + globalns = { + "_origin_globals_ref": task_frame.f_globals, + "__name__": task_frame.f_globals["__name__"], + # '__file__': task_frame.f_globals["__file__"], + "__builtins__": builtins, + "__file__": filename, + "ModuleType": types.ModuleType, + "Any": Any, + "Optional": Optional, + "List": List, + "Tuple": Tuple, + "NamedTuple": NamedTuple, + "typing": typing, + "NoneType": type(None), + } + if has_typing_extensions: + globalns["typing_extensions"] = typing_extensions + # populate global ns with a chain map: + truly_local_modifications = {} + localns = ChainMap(truly_local_modifications, task_frame.f_locals, task_frame.f_globals) + module = importlib.util.module_from_spec( + importlib.util.spec_from_file_location(f"tasksupport.support.{func.__name__}", filename) + ) + localns.maps.append(globalns) + code = """ +this: Optional[ModuleType] = None + +def __getattr__(name: str): + ''' + Look up in original global ns. Effective ChainMap of namespaces. + ''' + print('a') + return _origin_globals_ref[name] + +""" + if DEBUG_CODEGEN: + with open(filename, "a+") as fh: + fh.write(code) + fh.seek(0) + code = fh.read() + exec(compile(code, filename, "exec"), globalns, localns) + globalns.update(truly_local_modifications) + truly_local_modifications.clear() + blank = "" + sig = sanitize_return(func, module.__dict__) + inner_function_call = sig + is_contextable = False + + if sig.parameters: + for param in sig.parameters: + if is_context_param(sig.parameters[param]): + is_contextable = True + break + if not is_contextable: + for index, param in enumerate(sig.parameters): + param = sig.parameters[param] + if not index: + continue + if is_context_param(param) in ("type", "name_and_type"): + # okay, the context is definitely out of order + raise NotImplementedError( + "TODO: Implement generating an inner_function_call with rearranged values" + ) + prefix_params = [] + if not is_contextable: + prefix_params = [ + inspect.Parameter("context", inspect.Parameter.POSITIONAL_ONLY, annotation=Context) + ] + + additional_params = [] + if "silent" not in inner_function_call.parameters: + silent = inspect.Parameter( + "silent", inspect.Parameter.KEYWORD_ONLY, annotation=bool, default=False + ) + additional_params.append(silent) + format_key = "format" + if format_key in inner_function_call.parameters: + format_key = "format_" + try: + task_module_format_default = find_this().DEFAULT_FORMAT + except AttributeError: + task_module_format_default = DEFAULT_FORMAT + format_ = inspect.Parameter( + format_key, + inspect.Parameter.KEYWORD_ONLY, + annotation=Optional[Literal["json", "python", "lines"]], + default=None, + ) + if format_key not in inner_function_call.parameters: + additional_params.append(format_) + kwargs.setdefault("help", {}) + kwargs["help"][format_key] = ( + 'may be one of "json", "python", or "lines" ' + f"(defaults to {task_module_format_default!r})." + ) + + # Load into the local namespace any missing annotations necessary to run with when + # we recreate the argument signature: + new_signature = reify_annotations_in( + localns, + sig.replace( + parameters=( + *prefix_params, + *sig.parameters.values(), + *additional_params, + ) + ), + ) + if "silent" in new_signature.parameters: + kwargs.setdefault("help", {}) + silent_default = new_signature.parameters["silent"].default + kwargs["help"][ + "silent" + ] = f"Set to reduce console output (defaults to {silent_default!r})" + del silent_default + + # Merge into the proxy module any missing deps + module.__dict__.update(truly_local_modifications) + # Merge in the new globals + module.__dict__.update(globalns) + + def wrap_func(func): + internal_wrapper_signature = reify_annotations_in( + localns, + sig.replace( + parameters=( + *sig.parameters.values(), + *additional_params, + ) + ), + ) + + ns = ChainMap({}, localns, vars(module)) + signature = inspect.signature(func) + code = INTERNAL_WRAPPER % dict( + name=func.__name__, + args=str(internal_wrapper_signature), + sig_funccall=raw_param_body_from(signature), + format_kwarg=format_key, + ) + + if DEBUG_CODEGEN: + with open(filename, "a+") as fh: + fh.write(code) + fh.seek(0) + code = fh.read() + + exec(compile(code, filename, "exec"), task_frame.f_globals, ns) + new_func = ns.maps[0][func.__name__] + setattr(this._, f"_original_{func.__name__}", func) + return new_func + + public_signature = str(new_signature).replace("/, *,", "") + if sys.version_info[:2] < (3, 9): + if "/," in public_signature: + public_signature = public_signature.replace("/,", "") + + code = PUBLIC_WRAPPER_FOR_INVOKE % dict( + name=func.__name__, + args=public_signature, + sig_funccall=raw_param_body_from(inner_function_call), + format_kwarg=format_key, + ) + # print(code) + if DEBUG_CODEGEN: + with open(filename, "a+") as fh: + fh.write(code) + fh.seek(0) + code = fh.read() + exec(compile(code, filename, "exec"), task_frame.f_globals, localns) + setattr(this._, func.__name__, wrap_func(func)) + public_func = localns[func.__name__] + indent = " " * indentation_length(func.__doc__ or blank) + if ":returns:" not in (func.__doc__ or blank): + func.__doc__ = f"{func.__doc__ or blank}\n{indent}:returns: {safe_annotation_string_from(new_signature.return_annotation)}" + public_func.__doc__ = func.__doc__ + if kwargs: + return _task(**kwargs)(public_func) + return _task(public_func) + + if callable_ is not None: + with suppress(AttributeError): + name = callable_.__name__ + wrapper.__name__ = f"wrapper_for_{name}" + return wrapper(callable_) + wrapper.__name__ = f"wrapper_for_unnamed_caller" + return wrapper + + +@overload +def trim(s: str, *, left: Union[str, Tuple[str, ...]]) -> str: + ... + + +@overload +def trim(s: str, *, right: Union[str, Tuple[str, ...]]) -> str: + ... + + +@overload +def trim(s: str, both: Union[str, Tuple[str, ...]]) -> str: + ... + + +def trim(s: str, *args, left=None, right=None) -> str: + if not any((args, left, right)): + raise TypeError + if sum(1 for x in (args, left, right) if x) > 1: + raise TypeError + if len(args) == 1: + (both,) = args + return trim_both(s, both) + if left is not None: + return ltrim(s, left) + if right is not None: + return rtrim(s, right) + raise TypeError + + +def ltrim(s: str, left: Union[str, Tuple[str, ...]]) -> str: + if not isinstance(left, (str, tuple)): + raise TypeError(f"left must be a str or Tuple[str, ...]") + if isinstance(left, str): + left = tuple(left) + if s.startswith(left): + for index, char in enumerate(s): + if char in left: + continue + s = s[index:] + break + else: + return "" + return s + + +def rtrim(s: str, right: Union[str, Tuple[str, ...]]) -> str: + if not isinstance(right, (str, tuple)): + raise TypeError(f"right must be a str or Tuple[str, ...]") + if isinstance(right, str): + right = tuple(right) + if s.endswith(right): + for index in range(len(s) - 1, -1, -1): + char = s[index] + if char in right: + continue + s = s[: index + 1] + break + else: + return "" + return s + + +def trim_both(s: str, both: Union[str, Tuple[str, ...]]) -> str: + if not isinstance(both, (str, tuple)): + raise TypeError(f"both must be a str or Tuple[str, ...]") + if isinstance(both, str): + both = tuple(both) + return rtrim(ltrim(s, both), both) + + +def truncate(s: str, limit: int, *, trailer: str = "…") -> str: + if len(s) <= limit: + return s + if trailer: + return s[: limit - 1] + trailer + return s[:limit]