diff --git a/.env.template b/.env.template index 6a99c48..98a5794 100644 --- a/.env.template +++ b/.env.template @@ -14,6 +14,6 @@ ## KECHAIN_SCOPE_ID - the UUID of the project / scope. ## -KECHAIN_URL=https://pim3-test.ke-chain.com +KECHAIN_URL=https://....ke-chain.com KECHAIN_TOKEN=... token from your user account ... -KECHAIN_SCOPE_ID=12345678-1234-5678-123456-7812345678 +KECHAIN_SCOPE_ID=... scope id ... diff --git a/.github/workflows/python_publish.yml b/.github/workflows/python_publish.yml index 8c722ed..430247c 100644 --- a/.github/workflows/python_publish.yml +++ b/.github/workflows/python_publish.yml @@ -12,7 +12,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v1 with: - python-version: '3.7' + python-version: '3.12' - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/python_testing.yml b/.github/workflows/python_testing.yml index 962e0c9..6e98742 100644 --- a/.github/workflows/python_testing.yml +++ b/.github/workflows/python_testing.yml @@ -7,10 +7,10 @@ jobs: runs-on: ubuntu-latest strategy: - max-parallel: 7 + max-parallel: 6 fail-fast: False matrix: - python-version: [2.7, 3.5, 3.6, 3.7, 3.8-dev, pypy2, pypy3] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v1 @@ -36,7 +36,7 @@ jobs: pytest --cov=kecpkg tests - name: Upload coverage to coveralls.io - if: matrix.python-version == 3.6 + if: matrix.python-version == 3.12 env: COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} run: | @@ -44,7 +44,7 @@ jobs: coveralls - name: Check docs and distribution - if: matrix.python-version == 3.6 + if: matrix.python-version == 3.12 run: | pip install flake8 pydocstyle check-manifest readme_renderer[md] twine>=2.0.0 diff --git a/.idea/git_toolbox_blame.xml b/.idea/git_toolbox_blame.xml new file mode 100644 index 0000000..7dc1249 --- /dev/null +++ b/.idea/git_toolbox_blame.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/git_toolbox_prj.xml b/.idea/git_toolbox_prj.xml new file mode 100644 index 0000000..02b915b --- /dev/null +++ b/.idea/git_toolbox_prj.xml @@ -0,0 +1,15 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index cbd2af5..7d88f00 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -4,13 +4,17 @@ diff --git a/.idea/kecpkg-tools.iml b/.idea/kecpkg-tools.iml index acc7b7e..b73d0ed 100644 --- a/.idea/kecpkg-tools.iml +++ b/.idea/kecpkg-tools.iml @@ -4,7 +4,7 @@ - + diff --git a/.idea/misc.xml b/.idea/misc.xml index 93db024..6a05be0 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,6 +1,9 @@ - + + + diff --git a/.travis.yml-OFF b/.travis.yml-OFF deleted file mode 100644 index b331123..0000000 --- a/.travis.yml-OFF +++ /dev/null @@ -1,19 +0,0 @@ -sudo: false -language: python - -python: - - "2.7" - - "3.5" - - "3.6" - - "3.7-dev" - - "pypy" - - "pypy3" - -install: - pip install tox-travis - -script: tox - -notifications: - email: false - diff --git a/CHANGELOG.md b/CHANGELOG.md index be0de52..1e4d52b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.1.0 (3JUL24) + * :shield: Maintenance release. Deprecation of python 2.7 and all python version upto and included 3.6 as these versions are out of support. + ## 1.0.4 (26NOV19) * Maintenance release. * changed CI setup to use github actions. No end-user facing changes. #10 diff --git a/MANIFEST.in b/MANIFEST.in index 9e73c53..d04bb90 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -12,6 +12,7 @@ graft tests graft kecpkg prune .idea prune .env* +exclude .env.template prune *.depr exclude build_release.sh recursive-include kecpkg *.template @@ -20,5 +21,3 @@ global-exclude *.pyc exclude *-OFF exclude *.yml prune .github - - diff --git a/kecpkg/__init__.py b/kecpkg/__init__.py index 8a81504..6849410 100644 --- a/kecpkg/__init__.py +++ b/kecpkg/__init__.py @@ -1 +1 @@ -__version__ = '1.0.4' +__version__ = "1.1.0" diff --git a/kecpkg/__main__.py b/kecpkg/__main__.py index ca90324..13123c4 100644 --- a/kecpkg/__main__.py +++ b/kecpkg/__main__.py @@ -1,3 +1,4 @@ import sys from kecpkg.cli import kecpkg + sys.exit(kecpkg()) diff --git a/kecpkg/commands/build.py b/kecpkg/commands/build.py index 03ad5d9..e16221c 100644 --- a/kecpkg/commands/build.py +++ b/kecpkg/commands/build.py @@ -9,76 +9,139 @@ from kecpkg.commands.sign import verify_signature, verify_artifacts_hashes from kecpkg.commands.utils import CONTEXT_SETTINGS from kecpkg.gpg import hash_of_file, get_gpg, tabulate_keys -from kecpkg.settings import load_settings, SETTINGS_FILENAME, ARTIFACTS_SIG_FILENAME, ARTIFACTS_FILENAME -from kecpkg.utils import ensure_dir_exists, remove_path, get_package_dir, get_artifacts_on_disk, render_package_info, \ - create_file, echo_success, echo_failure, echo_info - - -@click.command(context_settings=CONTEXT_SETTINGS, - short_help="Build the package and create a kecpkg file") -@click.argument('package', required=False) -@click.option('--settings', '--config', '-s', 'settings_filename', - help="path to the setting file (default `{}`".format(SETTINGS_FILENAME), - type=click.Path(exists=True), default=SETTINGS_FILENAME) -@click.option('--clean', '--clear', '--prune', 'clean_first', is_flag=True, - help='Remove build artifacts before building') -@click.option('--update/--no-update', 'update_package_info', is_flag=True, default=True, - help="Update the `package-info.json` for the KE-crunch execution to point to correct entrypoint based on " - "settings. This is okay to leave ON. Use `--no-update` if you have a custom `package-info.json`.") -@click.option('--sign/--no-sign', 'do_sign', is_flag=True, default=False, - help="Sign the contents of the package with a cryptographic key from the keyring. Defaults to not sign.") -@click.option('--keyid', '--key-id', '-k', 'sign_keyid', - help="ID of the cryptographic key to do the sign the contents of the built package. If not provided it " - "will use the default key from the KECPKG keystore. Use in combination with `--sign`") -@click.option('--passphrase', '-p', 'sign_passphrase', hide_input=True, - help="Passphrase of the cryptographic key to sing the contents of the built package. " - "Use in combination with `--sign` and `--keyid`") -@click.option('-v', '--verbose', help="Be more verbose", is_flag=True) +from kecpkg.settings import ( + load_settings, + SETTINGS_FILENAME, + ARTIFACTS_SIG_FILENAME, + ARTIFACTS_FILENAME, +) +from kecpkg.utils import ( + ensure_dir_exists, + remove_path, + get_package_dir, + get_artifacts_on_disk, + render_package_info, + create_file, + echo_success, + echo_failure, + echo_info, +) + + +@click.command( + context_settings=CONTEXT_SETTINGS, + short_help="Build the package and create a kecpkg file", +) +@click.argument("package", required=False) +@click.option( + "--settings", + "--config", + "-s", + "settings_filename", + help=f"path to the setting file (default `{SETTINGS_FILENAME}`", + type=click.Path(exists=True), + default=SETTINGS_FILENAME, +) +@click.option( + "--clean", + "--clear", + "--prune", + "clean_first", + is_flag=True, + help="Remove build artifacts before building", +) +@click.option( + "--update/--no-update", + "update_package_info", + is_flag=True, + default=True, + help="Update the `package-info.json` for the KE-crunch execution to point to correct " + "entrypoint based on settings. This is okay to leave ON. Use `--no-update` if you " + "have a custom `package-info.json`.", +) +@click.option( + "--sign/--no-sign", + "do_sign", + is_flag=True, + default=False, + help="Sign the contents of the package with a cryptographic key from the keyring. Defaults " + "to not sign.", +) +@click.option( + "--keyid", + "--key-id", + "-k", + "sign_keyid", + help="ID of the cryptographic key to do the sign the contents of the built package. If not" + "provided it will use the default key from the KECPKG keystore. Use in combination " + "with `--sign`", +) +@click.option( + "--passphrase", + "-p", + "sign_passphrase", + hide_input=True, + help="Passphrase of the cryptographic key to sing the contents of the built package. " + "Use in combination with `--sign` and `--keyid`", +) +@click.option("-v", "--verbose", help="Be more verbose", is_flag=True) def build(package=None, **options): """Build the package and create a kecpkg file.""" - echo_info('Locating package ``'.format(package)) + echo_info(f"Locating package `{package}`") package_dir = get_package_dir(package_name=package) package_name = os.path.basename(package_dir) - echo_info('Package `{}` has been selected'.format(package_name)) - settings = load_settings(package_dir=package_dir, settings_filename=options.get('settings_filename')) + echo_info(f"Package `{package_name}` has been selected") + settings = load_settings( + package_dir=package_dir, settings_filename=options.get("settings_filename") + ) # ensure build directory is there - build_dir = settings.get('build_dir', 'dist') + build_dir = settings.get("build_dir", "dist") build_path = os.path.join(package_dir, build_dir) - if options.get('update_package_info'): + if options.get("update_package_info"): render_package_info(settings, package_dir=package_dir, backup=True) - if options.get('clean_first'): + if options.get("clean_first"): remove_path(build_path) ensure_dir_exists(build_path) # do package building - build_package(package_dir, build_path, settings, options=options, verbose=options.get('verbose')) + build_package( + package_dir, + build_path, + settings, + options=options, + verbose=options.get("verbose"), + ) - echo_success('Complete') + echo_success("Complete") def build_package(package_dir, build_path, settings, options=None, verbose=False): """Perform the actual building of the kecpkg zip.""" - additional_exclude_paths = settings.get('exclude_paths') - - artifacts = get_artifacts_on_disk(package_dir, verbose=verbose, - additional_exclude_paths=additional_exclude_paths) # type: set - dist_filename = '{}-{}-py{}.kecpkg'.format(settings.get('package_name'), settings.get('version'), - settings.get('python_version')) - echo_info('Creating package name `{}`'.format(dist_filename)) + additional_exclude_paths = settings.get("exclude_paths") + + artifacts = get_artifacts_on_disk( + package_dir, verbose=verbose, additional_exclude_paths=additional_exclude_paths + ) # type: set + dist_filename = "{}-{}-py{}.kecpkg".format( + settings.get("package_name"), + settings.get("version"), + settings.get("python_version"), + ) + echo_info(f"Creating package name `{dist_filename}`") if verbose: echo_info("Creating 'ARTIFACTS' file with list of contents and their hashes") generate_artifact_hashes(package_dir, artifacts, settings, verbose=verbose) - artifacts.add(settings.get('artifacts_filename', 'ARTIFACTS')) + artifacts.add(settings.get("artifacts_filename", "ARTIFACTS")) - if options.get('do_sign'): + if options.get("do_sign"): sign_package(package_dir, settings, options=options, verbose=verbose) - artifacts.add(settings.get('artifacts_filename', 'ARTIFACTS') + '.SIG') + artifacts.add(settings.get("artifacts_filename", "ARTIFACTS") + ".SIG") - with ZipFile(os.path.join(build_path, dist_filename), 'w') as dist_zip: + with ZipFile(os.path.join(build_path, dist_filename), "w") as dist_zip: for artifact in artifacts: dist_zip.write(os.path.join(package_dir, artifact), arcname=artifact) @@ -96,8 +159,8 @@ def generate_artifact_hashes(package_dir, artifacts, settings, verbose=False): :param verbose: be verbose (or not) :return: None """ - artifacts_fn = settings.get('artifacts_filename', 'ARTIFACTS') - algorithm = settings.get('hash_algorithm', 'sha256') + artifacts_fn = settings.get("artifacts_filename", "ARTIFACTS") + algorithm = settings.get("hash_algorithm", "sha256") if algorithm not in hashlib.algorithms_guaranteed: raise @@ -107,19 +170,24 @@ def generate_artifact_hashes(package_dir, artifacts, settings, verbose=False): artifacts_content = [] for af in artifacts: - # we do not need to create a hash from the ARTIFACTS and ARTIFACTS.SIG file if they are present in the list - if af not in [artifacts_fn, artifacts_fn + '.SIG']: + # we do not need to create a hash from the ARTIFACTS and ARTIFACTS.SIG file if they + # are present in the list + if af not in [artifacts_fn, artifacts_fn + ".SIG"]: af_fp = os.path.join(package_dir, af) - artifacts_content.append('{},{}={},{}\n'.format( - af, - algorithm, - hash_of_file(af_fp, algorithm=algorithm), - os.stat(af_fp).st_size - )) - - create_file(os.path.join(package_dir, artifacts_fn), - content=artifacts_content, - overwrite=True) + artifacts_content.append( + "{},{}={},{}\n".format( + af, + algorithm, + hash_of_file(af_fp, algorithm=algorithm), + os.stat(af_fp).st_size, + ) + ) + + create_file( + os.path.join(package_dir, artifacts_fn), + content=artifacts_content, + overwrite=True, + ) def sign_package(package_dir, settings, options=None, verbose=False): @@ -134,33 +202,41 @@ def sign_package(package_dir, settings, options=None, verbose=False): """ gpg = get_gpg() - if options.get('sign_keyid') is None: + if options.get("sign_keyid") is None: tabulate_keys(gpg, explain=True) - options['sign_keyid'] = click.prompt("Provide Key (Name, Comment, Email, Fingerprint) to sign package with", - default=settings.get('email')) - if options.get('sign_passphrase') is None: - options['sign_passphrase'] = click.prompt("Provide Passphrase", hide_input=True) - - echo_info('Signing package contents') - - with open(os.path.join(package_dir, settings.get('artifacts_filename', ARTIFACTS_FILENAME)), 'rb') as fd: - results = gpg.sign_file(fd, - keyid=options.get('sign_keyid'), - passphrase=options.get('sign_passphrase'), - detach=True, - output=settings.get('artifacts_sig_filename', ARTIFACTS_SIG_FILENAME) - ) + options["sign_keyid"] = click.prompt( + "Provide Key (Name, Comment, Email, Fingerprint) to sign package with", + default=settings.get("email"), + ) + if options.get("sign_passphrase") is None: + options["sign_passphrase"] = click.prompt("Provide Passphrase", hide_input=True) + + echo_info("Signing package contents") + + with open( + os.path.join( + package_dir, settings.get("artifacts_filename", ARTIFACTS_FILENAME) + ), + "rb", + ) as fd: + results = gpg.sign_file( + fd, + keyid=options.get("sign_keyid"), + passphrase=options.get("sign_passphrase"), + detach=True, + output=settings.get("artifacts_sig_filename", ARTIFACTS_SIG_FILENAME), + ) pprint(results.__dict__) if results and results.status is not None: - echo_info("Signed package contents: {}".format(results.status)) + echo_info(f"Signed package contents: {results.status}") else: failure_text = results.stderr.split("\n")[-2] - echo_failure("Could not sign the package contents: '{}'".format(failure_text)) + echo_failure(f"Could not sign the package contents: '{failure_text}'") sys.exit(1) if verbose: - echo_success('Successfully signed the package contents.') + echo_success("Successfully signed the package contents.") verify_signature(package_dir, ARTIFACTS_FILENAME, ARTIFACTS_SIG_FILENAME) verify_artifacts_hashes(package_dir, ARTIFACTS_FILENAME) diff --git a/kecpkg/commands/config.py b/kecpkg/commands/config.py index 075d0e6..4f0731e 100644 --- a/kecpkg/commands/config.py +++ b/kecpkg/commands/config.py @@ -4,22 +4,46 @@ from tabulate import tabulate from kecpkg.commands.utils import CONTEXT_SETTINGS -from kecpkg.settings import load_settings, copy_default_settings, save_settings, SETTINGS_FILENAME +from kecpkg.settings import ( + load_settings, + copy_default_settings, + save_settings, + SETTINGS_FILENAME, +) from kecpkg.utils import get_package_dir, copy_path, echo_success, echo_info -@click.command(context_settings=CONTEXT_SETTINGS, - short_help="Finds and updated the configuration of the kecpkg") -@click.argument('package', required=False) -@click.option('--settings', '--config', '-s', 'settings_filename', - help="path to the setting file (default `{}`".format(SETTINGS_FILENAME), - type=click.Path(exists=True), default=SETTINGS_FILENAME) -@click.option('--init', is_flag=True, help="will init a settingsfile if not found") -@click.option('--interactive', '-i', is_flag=True, help="interactive mode; guide me through the settings") -@click.option('--get', '-g', 'get_key', help="Key to get and display", required=False) -@click.option('--set', '-s', 'set_key', nargs=2, help="Key to set . Value is set as string.", - required=False) -@click.option('--verbose', '-v', is_flag=True, help="be more verbose (print settings)") +@click.command( + context_settings=CONTEXT_SETTINGS, + short_help="Finds and updated the configuration of the kecpkg", +) +@click.argument("package", required=False) +@click.option( + "--settings", + "--config", + "-s", + "settings_filename", + help=f"path to the setting file (default `{SETTINGS_FILENAME}`", + type=click.Path(exists=True), + default=SETTINGS_FILENAME, +) +@click.option("--init", is_flag=True, help="will init a settingsfile if not found") +@click.option( + "--interactive", + "-i", + is_flag=True, + help="interactive mode; guide me through the settings", +) +@click.option("--get", "-g", "get_key", help="Key to get and display", required=False) +@click.option( + "--set", + "-s", + "set_key", + nargs=2, + help="Key to set . Value is set as string.", + required=False, +) +@click.option("--verbose", "-v", is_flag=True, help="be more verbose (print settings)") def config(package, **options): """Manage the configuration (or settings) of the package. @@ -48,60 +72,97 @@ def config(package, **options): (or recently uploaded) last_upload: date and time of the last upload """ - echo_info('Locating package ``'.format(package)) + echo_info(f"Locating package `{package}`") package_dir = get_package_dir(package_name=package) package_name = os.path.basename(package_dir) - echo_info('Package `{}` has been selected'.format(package_name)) + echo_info(f"Package `{package_name}` has been selected") - if options.get('init'): - if os.path.exists(os.path.join(package_dir, options.get('settings_filename'))) and \ - click.confirm('Are you sure you want to overwrite the current settingsfile ' - '(old settings will be a backup)?'): - copy_path(os.path.join(package_dir, options.get('settings_filename')), - os.path.join(package_dir, "{}-backup".format(options.get('settings_filename')))) - echo_info('Creating new settingsfile') + if options.get("init"): + if os.path.exists( + os.path.join(package_dir, options.get("settings_filename")) + ) and click.confirm( + "Are you sure you want to overwrite the current settingsfile " + "(old settings will be a backup)?" + ): + copy_path( + os.path.join(package_dir, options.get("settings_filename")), + os.path.join( + package_dir, "{}-backup".format(options.get("settings_filename")) + ), + ) + echo_info("Creating new settingsfile") settings = copy_default_settings() - settings['package_name'] = package_name - save_settings(settings, package_dir=package_dir, settings_filename=options.get('settings_filename')) + settings["package_name"] = package_name + save_settings( + settings, + package_dir=package_dir, + settings_filename=options.get("settings_filename"), + ) settings = load_settings(package_dir=package_dir) - if options.get('interactive'): - settings['version'] = click.prompt('Version', default=settings.get('version', '0.0.1')) - settings['description'] = click.prompt('Description', default=settings.get('description', '')) - settings['name'] = click.prompt('Author', default=settings.get('name', os.environ.get('USER', ''))) - settings['email'] = click.prompt('Author\'s email', default=settings.get('email', '')) - settings['python_version'] = click.prompt('Python version (choose from: {})'. - format(settings.get('pyversions')), default='3.5') - settings['exclude_paths'] = click.prompt("Exclude additional paths from kecpkg (eg. 'data, input')", - default=settings.get('exclude_paths', ''), - value_proc=process_additional_exclude_paths) - save_settings(settings, package_dir=package_dir, settings_filename=options.get('settings_filename')) + if options.get("interactive"): + settings["version"] = click.prompt( + "Version", default=settings.get("version", "0.0.1") + ) + settings["description"] = click.prompt( + "Description", default=settings.get("description", "") + ) + settings["name"] = click.prompt( + "Author", default=settings.get("name", os.environ.get("USER", "")) + ) + settings["email"] = click.prompt( + "Author's email", default=settings.get("email", "") + ) + settings["python_version"] = click.prompt( + "Python version (choose from: {})".format(settings.get("pyversions")), + default="3.5", + ) + settings["exclude_paths"] = click.prompt( + "Exclude additional paths from kecpkg (eg. 'data, input')", + default=settings.get("exclude_paths", ""), + value_proc=process_additional_exclude_paths, + ) + save_settings( + settings, + package_dir=package_dir, + settings_filename=options.get("settings_filename"), + ) - if options.get('set_key'): - k, v = options.get('set_key') - if options.get('verbose'): - echo_info("Set the key '{}' to value '{}'".format(k, v)) + if options.get("set_key"): + k, v = options.get("set_key") + if options.get("verbose"): + echo_info(f"Set the key '{k}' to value '{v}'") settings[k] = v - save_settings(settings, package_dir=package_dir, settings_filename=options.get('settings_filename')) + save_settings( + settings, + package_dir=package_dir, + settings_filename=options.get("settings_filename"), + ) - if options.get('get_key'): - echo_info(tabulate([(options.get('get_key'), settings.get(options.get('get_key')))], - headers=("key", "value"))) + if options.get("get_key"): + echo_info( + tabulate( + [(options.get("get_key"), settings.get(options.get("get_key")))], + headers=("key", "value"), + ) + ) return - if options.get('verbose'): + if options.get("verbose"): echo_info(tabulate(settings.items(), headers=("key", "value"))) - if not options.get('interactive'): - echo_success('Settings file identified and correct') + if not options.get("interactive"): + echo_success("Settings file identified and correct") def process_additional_exclude_paths(raw_value): """Process additional list of exclude paths and return a list.""" - assert isinstance(raw_value, str), "The value should be a string, got: {}".format(type(raw_value)) + assert isinstance( + raw_value, str + ), f"The value should be a string, got: {type(raw_value)}" pathlist = [] - raw_pathlist = raw_value.split(',') + raw_pathlist = raw_value.split(",") for raw_path in raw_pathlist: pathlist.append(raw_path.strip()) return pathlist diff --git a/kecpkg/commands/new.py b/kecpkg/commands/new.py index 2ab9af8..abe16a4 100644 --- a/kecpkg/commands/new.py +++ b/kecpkg/commands/new.py @@ -6,22 +6,43 @@ from kecpkg.commands.config import process_additional_exclude_paths from kecpkg.commands.utils import CONTEXT_SETTINGS from kecpkg.create import create_package, create_venv, pip_install_venv -from kecpkg.settings import load_settings, copy_default_settings, save_settings, SETTINGS_FILENAME +from kecpkg.settings import ( + load_settings, + copy_default_settings, + save_settings, + SETTINGS_FILENAME, +) from kecpkg.utils import normalise_name, echo_success, echo_failure, echo_info -@click.command(short_help="Create a new kecpkg SIM script package", - context_settings=CONTEXT_SETTINGS) -@click.argument('package', required=False) -@click.option('--settings', '--config', '-s', 'settings_filename', - help="path to the setting file (default `{}`".format(SETTINGS_FILENAME), - type=click.Path(), default=SETTINGS_FILENAME) -@click.option('--venv', help="name of the virtual python environment to create") -@click.option('--script', help="name of the script inside the package that contains the entrypoint") -@click.option('--global-packages', is_flag=True, - help='Gives created virtual envs access to the global site-packages.') -@click.option('--no-venv', help="suppress the creation of the virtual environment", is_flag=True) -@click.option('-v', '--verbose', help="Be more verbose", is_flag=True) +@click.command( + short_help="Create a new kecpkg SIM script package", + context_settings=CONTEXT_SETTINGS, +) +@click.argument("package", required=False) +@click.option( + "--settings", + "--config", + "-s", + "settings_filename", + help=f"path to the setting file (default `{SETTINGS_FILENAME}`", + type=click.Path(), + default=SETTINGS_FILENAME, +) +@click.option("--venv", help="name of the virtual python environment to create") +@click.option( + "--script", + help="name of the script inside the package that contains the entrypoint", +) +@click.option( + "--global-packages", + is_flag=True, + help="Gives created virtual envs access to the global site-packages.", +) +@click.option( + "--no-venv", help="suppress the creation of the virtual environment", is_flag=True +) +@click.option("-v", "--verbose", help="Be more verbose", is_flag=True) def new(package=None, **options): """ Create a new package directory structure. @@ -36,7 +57,9 @@ def new(package=None, **options): +-- .gitignore +-- .kecpkg-settings.json """ - settings = load_settings(lazy=True, settings_filename=options.get('settings_filename')) + settings = load_settings( + lazy=True, settings_filename=options.get("settings_filename") + ) if not settings: settings = copy_default_settings() package_root_dir = os.getcwd() @@ -46,43 +69,65 @@ def new(package=None, **options): package_name = normalise_name(package_name) # save to settings - settings['package_name'] = package_name + settings["package_name"] = package_name package_dir = os.path.join(package_root_dir, package_name) if os.path.exists(package_dir): - echo_failure("Directory '{}' already exists.".format(package_dir)) + echo_failure(f"Directory '{package_dir}' already exists.") sys.exit(1) if not package: - settings['version'] = click.prompt('Version', default=settings.get('version', '0.0.1')) - settings['description'] = click.prompt('Description', default='') - settings['name'] = click.prompt('Author', default=settings.get('name', os.environ.get('USER', ''))) - settings['email'] = click.prompt('Author\'s email', default=settings.get('email', '')) - settings['python_version'] = click.prompt('Python version (choose from: {})'.format(settings.get('pyversions')), - default='3.5') - settings['exclude_paths'] = click.prompt("Exclude additional paths from kecpkg (eg. 'data, input')", - default=settings.get('exclude_paths', ''), - value_proc=process_additional_exclude_paths) - if options.get('script'): - script_base = normalise_name(options.get('script').replace('.py', '')) - echo_info('Setting the script to `{}`'.format(script_base)) - settings['entrypoint_script'] = script_base + settings["version"] = click.prompt( + "Version", default=settings.get("version", "0.0.1") + ) + settings["description"] = click.prompt("Description", default="") + settings["name"] = click.prompt( + "Author", default=settings.get("name", os.environ.get("USER", "")) + ) + settings["email"] = click.prompt( + "Author's email", default=settings.get("email", "") + ) + settings["python_version"] = click.prompt( + "Python version (choose from: {})".format(settings.get("pyversions")), + default="3.5", + ) + settings["exclude_paths"] = click.prompt( + "Exclude additional paths from kecpkg (eg. 'data, input')", + default=settings.get("exclude_paths", ""), + value_proc=process_additional_exclude_paths, + ) + if options.get("script"): + script_base = normalise_name(options.get("script").replace(".py", "")) + echo_info(f"Setting the script to `{script_base}`") + settings["entrypoint_script"] = script_base - if options.get('venv'): - settings['venv_dir'] = normalise_name(options.get('venv')) + if options.get("venv"): + settings["venv_dir"] = normalise_name(options.get("venv")) echo_info("Creating package structure") create_package(package_dir, settings=settings) - if not options.get('no_venv'): + if not options.get("no_venv"): echo_info("Creating virtual environment") - create_venv(package_dir, settings, pypath=None, use_global=options.get('global_packages'), - verbose=options.get('verbose')) - pip_install_venv(package_dir, settings, verbose=options.get('verbose')) + create_venv( + package_dir, + settings, + pypath=None, + use_global=options.get("global_packages"), + verbose=options.get("verbose"), + ) + pip_install_venv(package_dir, settings, verbose=options.get("verbose")) else: - settings['venv_dir'] = None + settings["venv_dir"] = None # save the settings (in the package_dir) - save_settings(settings, package_dir=package_dir, settings_filename=options.get('settings_filename')) + save_settings( + settings, + package_dir=package_dir, + settings_filename=options.get("settings_filename"), + ) - echo_success('Package `{package_name}` created in `{package_dir}`'.format(package_name=package_name, - package_dir=package_dir)) + echo_success( + "Package `{package_name}` created in `{package_dir}`".format( + package_name=package_name, package_dir=package_dir + ) + ) diff --git a/kecpkg/commands/prune.py b/kecpkg/commands/prune.py index 1160e01..65ca94d 100644 --- a/kecpkg/commands/prune.py +++ b/kecpkg/commands/prune.py @@ -4,30 +4,40 @@ from kecpkg.commands.utils import CONTEXT_SETTINGS from kecpkg.settings import load_settings -from kecpkg.utils import get_package_name, get_package_dir, remove_path, echo_failure, echo_warning +from kecpkg.utils import ( + get_package_name, + get_package_dir, + remove_path, + echo_failure, + echo_warning, +) -@click.command(context_settings=CONTEXT_SETTINGS, - short_help="Removes a project's build artifacts") -@click.argument('package', required=False) -@click.option('--force', '-f', is_flag=True, help="Forcefully removes the project build artifacts") +@click.command( + context_settings=CONTEXT_SETTINGS, short_help="Removes a project's build artifacts" +) +@click.argument("package", required=False) +@click.option( + "--force", "-f", is_flag=True, help="Forcefully removes the project build artifacts" +) def prune(package, **options): """Remove a project's build artifacts.""" - package_name = package or get_package_name() or click.prompt('Provide package name') + package_name = package or get_package_name() or click.prompt("Provide package name") package_dir = get_package_dir(package_name) settings = load_settings(package_name) # ensure build directory is there - build_dir = settings.get('build_dir', 'dist') + build_dir = settings.get("build_dir", "dist") build_path = os.path.join(package_dir, build_dir) if os.path.exists(build_path): - if options.get('force') or click.confirm( - "Do you want to prune build artifacts for package '{}'?".format(package_name)): + if options.get("force") or click.confirm( + f"Do you want to prune build artifacts for package '{package_name}'?" + ): remove_path(build_path) if os.path.exists(build_path): - echo_failure('Something went wrong pruning pacakage `{}`'.format(package_name)) + echo_failure(f"Something went wrong pruning pacakage `{package_name}`") else: - echo_warning('Package `{}` will not be pruned'.format(package_name)) + echo_warning(f"Package `{package_name}` will not be pruned") else: - echo_failure('Package `{}` does not exist'.format(package_name)) + echo_failure(f"Package `{package_name}` does not exist") diff --git a/kecpkg/commands/purge.py b/kecpkg/commands/purge.py index 714eefb..8c941a8 100644 --- a/kecpkg/commands/purge.py +++ b/kecpkg/commands/purge.py @@ -3,13 +3,21 @@ import click from kecpkg.commands.utils import CONTEXT_SETTINGS -from kecpkg.utils import remove_path, get_package_dir, echo_success, echo_failure, echo_warning +from kecpkg.utils import ( + remove_path, + get_package_dir, + echo_success, + echo_failure, + echo_warning, +) -@click.command(context_settings=CONTEXT_SETTINGS, - short_help="Purge and delete a project (No reverse)") -@click.argument('package', required=False) -@click.option('--force', '-f', is_flag=True, help="Force purge (no confirmation)") +@click.command( + context_settings=CONTEXT_SETTINGS, + short_help="Purge and delete a project (No reverse)", +) +@click.argument("package", required=False) +@click.option("--force", "-f", is_flag=True, help="Force purge (no confirmation)") def purge(package, **options): """ Purge and clean a package directory structure. @@ -18,18 +26,21 @@ def purge(package, **options): :param options: :return: """ - package_name = package or click.prompt('Provide package name') + package_name = package or click.prompt("Provide package name") package_dir = get_package_dir(package_name) if os.path.exists(package_dir): - if options.get('force') or click.confirm( - "Do you want to purge and completely remove '{}'?".format(package_name)): + if options.get("force") or click.confirm( + f"Do you want to purge and completely remove '{package_name}'?" + ): remove_path(package_dir) if not os.path.exists(package_dir): - echo_success('Package `{}` is purged and removed from disk'.format(package_name)) + echo_success( + f"Package `{package_name}` is purged and removed from disk" + ) else: - echo_failure('Something went wrong pruning pacakage `{}`'.format(package_name)) + echo_failure(f"Something went wrong pruning pacakage `{package_name}`") else: - echo_warning('Package `{}` will not be purged'.format(package_name)) + echo_warning(f"Package `{package_name}` will not be purged") else: - echo_failure('Package `{}` does not exist'.format(package_name)) + echo_failure(f"Package `{package_name}` does not exist") diff --git a/kecpkg/commands/sign.py b/kecpkg/commands/sign.py index 9e0bbeb..e30fbd5 100644 --- a/kecpkg/commands/sign.py +++ b/kecpkg/commands/sign.py @@ -7,49 +7,115 @@ from kecpkg.commands.utils import CONTEXT_SETTINGS from kecpkg.gpg import get_gpg, list_keys, hash_of_file -from kecpkg.settings import SETTINGS_FILENAME, GNUPG_KECPKG_HOME, load_settings, DEFAULT_SETTINGS, ARTIFACTS_FILENAME, \ - ARTIFACTS_SIG_FILENAME -from kecpkg.utils import remove_path, echo_info, echo_success, echo_failure, get_package_dir, unzip_package - - -@click.command(context_settings=CONTEXT_SETTINGS, - short_help="Perform package signing and key management.") -@click.argument('package', required=False) -@click.option('--settings', '--config', '-s', 'settings_filename', - help="path to the setting file (default `{}`".format(SETTINGS_FILENAME), - type=click.Path(), default=SETTINGS_FILENAME) -@click.option('--keyid', '--key-id', '-k', 'keyid', - help="ID (name, email, KeyID) of the cryptographic key to do the operation with. ") +from kecpkg.settings import ( + SETTINGS_FILENAME, + GNUPG_KECPKG_HOME, + load_settings, + DEFAULT_SETTINGS, + ARTIFACTS_FILENAME, + ARTIFACTS_SIG_FILENAME, +) +from kecpkg.utils import ( + remove_path, + echo_info, + echo_success, + echo_failure, + get_package_dir, + unzip_package, +) + + +@click.command( + context_settings=CONTEXT_SETTINGS, + short_help="Perform package signing and key management.", +) +@click.argument("package", required=False) +@click.option( + "--settings", + "--config", + "-s", + "settings_filename", + help=f"path to the setting file (default `{SETTINGS_FILENAME}`", + type=click.Path(), + default=SETTINGS_FILENAME, +) +@click.option( + "--keyid", + "--key-id", + "-k", + "keyid", + help="ID (name, email, KeyID) of the cryptographic key to do the operation with. ", +) # @click.option('--passphrase', '-p', 'sign_passphrase', hide_input=True, # help="Passphrase of the cryptographic key to sign the contents of the package. " # "Use in combination with `--sign` and `--keyid`") -@click.option('--import-key', '--import', '-i', 'do_import', type=click.Path(exists=True), - help="Import secret keyfile (in .asc) to the KECPKG keyring which will be used for signing. " - "You can export a created key in gpg with `gpg -a --export-secret-key [keyID] > secret_key.asc`.") -@click.option('--delete-key', '-d', 'do_delete_key', - help="Delete key by its fingerprint permanently from the KECPKG keyring. To retrieve the full " - "fingerprint of the key, use the `--list` option and look at the 'fingerprint' section.") -@click.option('--create-key', '-c', 'do_create_key', is_flag=True, - help="Create secret key and add it to the KECPKG keyring.") -@click.option('--export-key', '--export', '-e', 'do_export_key', type=click.Path(), - help="Export public key to filename with `--keyid KeyID` in .ASC format for public distribution.") -@click.option('--clear-keyring', 'do_clear', is_flag=True, default=False, - help="Clear all keys from the KECPKG keyring") -@click.option('--list', '-l', 'do_list', is_flag=True, - help="List all available keys in the KECPKG keyring") -@click.option('--verify-kecpkg', 'do_verify_kecpkg', type=click.Path(exists=True), - help="Verify contents and signature of an existing kecpkg.") -@click.option('--yes', '-y', 'do_yes', is_flag=True, - help="Don't ask questions, just do it.") -@click.option('-v', '--verbose', help="Be more verbose", is_flag=True) +@click.option( + "--import-key", + "--import", + "-i", + "do_import", + type=click.Path(exists=True), + help="Import secret keyfile (in .asc) to the KECPKG keyring which will be used for signing. " + "You can export a created key in gpg with `gpg -a --export-secret-key [keyID] > " + "secret_key.asc`.", +) +@click.option( + "--delete-key", + "-d", + "do_delete_key", + help="Delete key by its fingerprint permanently from the KECPKG keyring. To retrieve the full " + "fingerprint of the key, use the `--list` option and look at the 'fingerprint' section.", +) +@click.option( + "--create-key", + "-c", + "do_create_key", + is_flag=True, + help="Create secret key and add it to the KECPKG keyring.", +) +@click.option( + "--export-key", + "--export", + "-e", + "do_export_key", + type=click.Path(), + help="Export public key to filename with `--keyid KeyID` in .ASC format for " + "public distribution.", +) +@click.option( + "--clear-keyring", + "do_clear", + is_flag=True, + default=False, + help="Clear all keys from the KECPKG keyring", +) +@click.option( + "--list", + "-l", + "do_list", + is_flag=True, + help="List all available keys in the KECPKG keyring", +) +@click.option( + "--verify-kecpkg", + "do_verify_kecpkg", + type=click.Path(exists=True), + help="Verify contents and signature of an existing kecpkg.", +) +@click.option( + "--yes", "-y", "do_yes", is_flag=True, help="Don't ask questions, just do it." +) +@click.option("-v", "--verbose", help="Be more verbose", is_flag=True) def sign(package=None, **options): """Sign the package.""" # noinspection PyShadowingNames def _do_clear(options): echo_info("Clearing all keys from the KECPKG keyring") - if not options.get('do_yes'): - options['do_yes'] = click.confirm("Are you sure you want to clear the KECPKG keyring?", default=False) - if options.get('do_yes'): + if not options.get("do_yes"): + options["do_yes"] = click.confirm( + "Are you sure you want to clear the KECPKG keyring?", default=False + ) + if options.get("do_yes"): remove_path(GNUPG_KECPKG_HOME) echo_success("Completed") sys.exit(0) @@ -63,41 +129,66 @@ def _do_list(gpg, explain=False): result = gpg.list_keys(secret=True) if len(result): from tabulate import tabulate - print(tabulate(list_keys(gpg=gpg), headers=("Name", "Comment", "E-mail", "Expires", "Fingerprint"))) + + print( + tabulate( + list_keys(gpg=gpg), + headers=("Name", "Comment", "E-mail", "Expires", "Fingerprint"), + ) + ) else: if explain: - echo_info("No keys found in KECPKG keyring. Use `--import-key` or `--create-key` to add a " - "secret key to the KECPKG keyring in order to sign KECPKG's.") + echo_info( + "No keys found in KECPKG keyring. Use `--import-key` or " + "`--create-key` to add a " + "secret key to the KECPKG keyring in order to sign KECPKG's." + ) sys.exit(1) # noinspection PyShadowingNames def _do_import(gpg, options): - echo_info("Importing secret key into KECPKG keyring from '{}'".format(options.get('do_import'))) - result = gpg.import_keys(open(os.path.abspath(options.get('do_import')), 'rb').read()) + echo_info( + "Importing secret key into KECPKG keyring from '{}'".format( + options.get("do_import") + ) + ) + result = gpg.import_keys( + open(os.path.abspath(options.get("do_import")), "rb").read() + ) # pprint(result.__dict__) if result and result.sec_imported: echo_success("Succesfully imported secret key into the KECPKG keystore") _do_list(gpg=gpg) sys.exit(0) elif result and result.unchanged: - echo_failure("Did not import the secret key into the KECPKG keystore. The key was already " - "in place and was unchanged") + echo_failure( + "Did not import the secret key into the KECPKG keystore. The key was already " + "in place and was unchanged" + ) _do_list(gpg=gpg) sys.exit(1) - echo_failure("Did not import a secret key into the KECPKG keystore. Is something wrong " - "with the file: '{}'? Are you sure it is a ASCII file containing a " - "private key block?".format(options.get('do_import'))) + echo_failure( + "Did not import a secret key into the KECPKG keystore. Is something wrong " + "with the file: '{}'? Are you sure it is a ASCII file containing a " + "private key block?".format(options.get("do_import")) + ) sys.exit(1) # noinspection PyShadowingNames def _do_delete_key(gpg, options): - echo_info("Deleting private key with ID '{}' from the KECPKG keyring".format(options.get('do_delete_key'))) + echo_info( + "Deleting private key with ID '{}' from the KECPKG keyring".format( + options.get("do_delete_key") + ) + ) # custom call to gpg using --delete-secret-and-public-key - result = gpg.result_map['delete'](gpg) + result = gpg.result_map["delete"](gpg) # noinspection PyProtectedMember - p = gpg._open_subprocess(['--yes', '--delete-secret-and-public-key', options.get('do_delete_key')]) + p = gpg._open_subprocess( + ["--yes", "--delete-secret-and-public-key", options.get("do_delete_key")] + ) # noinspection PyProtectedMember gpg._collect_output(p, result, stdin=p.stdin) @@ -120,54 +211,75 @@ def _do_create_key(gpg, options): settings = DEFAULT_SETTINGS if package_dir is not None: package_name = os.path.basename(package_dir) - echo_info('Package `{}` has been selected'.format(package_name)) - settings = load_settings(package_dir=package_dir, settings_filename=options.get('settings_filename')) - - key_info = {'name_real': click.prompt("Name", default=settings.get('name')), - 'name_comment': click.prompt("Comment", default="KECPKG SIGNING KEY"), - 'name_email': click.prompt("Email", default=settings.get('email')), - 'expire_date': click.prompt("Expiration in months", default=12, - value_proc=lambda i: "{}m".format(i)), 'key_type': 'RSA', - 'key_length': 4096, - 'key_usage': '', - 'subkey_type': 'RSA', - 'subkey_length': 4096, - 'subkey_usage': 'encrypt,sign,auth', - 'passphrase': ''} + echo_info(f"Package `{package_name}` has been selected") + settings = load_settings( + package_dir=package_dir, + settings_filename=options.get("settings_filename"), + ) + + key_info = { + "name_real": click.prompt("Name", default=settings.get("name")), + "name_comment": click.prompt("Comment", default="KECPKG SIGNING KEY"), + "name_email": click.prompt("Email", default=settings.get("email")), + "expire_date": click.prompt( + "Expiration in months", default=12, value_proc=lambda i: f"{i}m" + ), + "key_type": "RSA", + "key_length": 4096, + "key_usage": "", + "subkey_type": "RSA", + "subkey_length": 4096, + "subkey_usage": "encrypt,sign,auth", + "passphrase": "", + } passphrase = click.prompt("Passphrase", hide_input=True) passphrase_confirmed = click.prompt("Confirm passphrase", hide_input=True) if passphrase == passphrase_confirmed: - key_info['passphrase'] = passphrase + key_info["passphrase"] = passphrase else: raise ValueError("The passphrases did not match.") - echo_info("Creating the secret key '{name_real} ({name_comment}) <{name_email}>'".format(**key_info)) - echo_info("Please move around mouse or generate other activity to introduce sufficient entropy. " - "This might take a minute...") + echo_info( + "Creating the secret key '{name_real} ({name_comment}) <{name_email}>'".format( + **key_info + ) + ) + echo_info( + "Please move around mouse or generate other activity to introduce sufficient entropy. " + "This might take a minute..." + ) result = gpg.gen_key(gpg.gen_key_input(**key_info)) pprint(result.__dict__) - if result and result.stderr.find('KEY_CREATED'): + if result and result.stderr.find("KEY_CREATED"): echo_success("The key is succesfully created") _do_list(gpg=gpg) sys.exit(0) - echo_failure("Could not generate the key due to an error: '{}'".format(result.stderr)) + echo_failure(f"Could not generate the key due to an error: '{result.stderr}'") sys.exit(1) # noinspection PyShadowingNames def _do_export_key(gpg, options): """Export public key.""" echo_info("Exporting public key") - if options.get('keyid') is None: + if options.get("keyid") is None: _do_list(gpg=gpg) - options['keyid'] = click.prompt("Provide KeyId (name, comment, email, fingerprint) of the key to export") - result = gpg.export_keys(keyids=[options.get('keyid')], secret=False, armor=True) + options["keyid"] = click.prompt( + "Provide KeyId (name, comment, email, fingerprint) of the key to export" + ) + result = gpg.export_keys( + keyids=[options.get("keyid")], secret=False, armor=True + ) if result is not None: - with open(options.get('do_export_key'), 'w') as fd: + with open(options.get("do_export_key"), "w") as fd: fd.write(result) - echo_success("Sucessfully written public key to '{}'".format(options.get('do_export_key'))) + echo_success( + "Sucessfully written public key to '{}'".format( + options.get("do_export_key") + ) + ) sys.exit(0) echo_failure("Could not export key") @@ -176,14 +288,24 @@ def _do_export_key(gpg, options): # noinspection PyShadowingNames def _do_verify_kecpkg(gpg, options): """Verify the kecpkg.""" - echo_info("Verify the contents of the KECPKG and if the KECPKG is signed with a valid signature.") + echo_info( + "Verify the contents of the KECPKG and if the KECPKG is signed with a valid signature." + ) current_working_directory = os.getcwd() with temp_chdir() as d: - unzip_package(package_path=os.path.join(current_working_directory, options.get('do_verify_kecpkg')), - target_path=d) - verify_signature(d, artifacts_filename=ARTIFACTS_FILENAME, artifacts_sig_filename=ARTIFACTS_SIG_FILENAME) + unzip_package( + package_path=os.path.join( + current_working_directory, options.get("do_verify_kecpkg") + ), + target_path=d, + ) + verify_signature( + d, + artifacts_filename=ARTIFACTS_FILENAME, + artifacts_sig_filename=ARTIFACTS_SIG_FILENAME, + ) verify_artifacts_hashes(d, artifacts_filename=ARTIFACTS_FILENAME) sys.exit(0) @@ -191,19 +313,19 @@ def _do_verify_kecpkg(gpg, options): # Dispatcher to subfunctions # - if options.get('do_clear'): + if options.get("do_clear"): _do_clear(options=options) - elif options.get('do_list'): + elif options.get("do_list"): _do_list(gpg=get_gpg(), explain=True) - elif options.get('do_import'): + elif options.get("do_import"): _do_import(gpg=get_gpg(), options=options) - elif options.get('do_delete_key'): + elif options.get("do_delete_key"): _do_delete_key(gpg=get_gpg(), options=options) - elif options.get('do_create_key'): + elif options.get("do_create_key"): _do_create_key(gpg=get_gpg(), options=options) - elif options.get('do_export_key'): + elif options.get("do_export_key"): _do_export_key(gpg=get_gpg(), options=options) - elif options.get('do_verify_kecpkg'): + elif options.get("do_verify_kecpkg"): _do_verify_kecpkg(gpg=get_gpg(), options=options) else: sys.exit(500) @@ -223,19 +345,22 @@ def verify_signature(package_dir, artifacts_filename, artifacts_sig_filename): artifacts_fp = os.path.join(package_dir, artifacts_filename) artifacts_sig_fp = os.path.join(package_dir, artifacts_sig_filename) if not os.path.exists(artifacts_fp): - echo_failure("Artifacts file does not exist: '{}'".format(artifacts_filename)) + echo_failure(f"Artifacts file does not exist: '{artifacts_filename}'") sys.exit(1) if not os.path.exists(artifacts_sig_fp): - echo_failure("Artifacts signature file does not exist: '{}'. Is the package signed?". - format(artifacts_filename)) + echo_failure( + "Artifacts signature file does not exist: '{}'. Is the package signed?".format( + artifacts_filename + ) + ) sys.exit(1) - with open(artifacts_sig_fp, 'rb') as sig_fd: + with open(artifacts_sig_fp, "rb") as sig_fd: results = gpg.verify_file(sig_fd, data_filename=artifacts_fp) if results.valid: echo_info("Verified the signature and the signature is valid") - echo_info("Signed with: '{}'".format(results.username)) + echo_info(f"Signed with: '{results.username}'") elif not results.valid: echo_failure("Signature of the package is invalid") echo_failure(pprint(results.__dict__)) @@ -252,10 +377,10 @@ def verify_artifacts_hashes(package_dir, artifacts_filename): """ artifacts_fp = os.path.join(package_dir, artifacts_filename) if not os.path.exists(artifacts_fp): - echo_failure("Artifacts file does not exist: '{}'".format(artifacts_filename)) + echo_failure(f"Artifacts file does not exist: '{artifacts_filename}'") sys.exit(1) - with open(artifacts_fp, 'r') as fd: + with open(artifacts_fp) as fd: artifacts = fd.readlines() # process the file contents @@ -264,21 +389,25 @@ def verify_artifacts_hashes(package_dir, artifacts_filename): fails = [] for af in artifacts: # noinspection PyShadowingBuiltins,PyShadowingBuiltins - filename, hash, orig_size = af.split(',') - algorithm, orig_hash = hash.split('=') + filename, hash, orig_size = af.split(",") + algorithm, orig_hash = hash.split("=") fp = os.path.join(package_dir, filename) if os.path.exists(fp): found_hash = hash_of_file(fp, algorithm) found_size = os.stat(fp).st_size if found_hash != orig_hash.strip() or found_size != int(orig_size.strip()): - fails.append("File '{}' is changed in the package.".format(filename)) - fails.append("File '{}' original checksum: '{}', found: '{}'".format(filename, orig_hash, found_hash)) - fails.append("File '{}' original size: {}, found: {}".format(filename, orig_size, found_size)) + fails.append(f"File '{filename}' is changed in the package.") + fails.append( + f"File '{filename}' original checksum: '{orig_hash}', found: '{found_hash}'" + ) + fails.append( + f"File '{filename}' original size: {orig_size}, found: {found_size}" + ) else: - fails.append("File '{}' does not exist".format(filename)) + fails.append(f"File '{filename}' does not exist") if fails: - echo_failure('The package has been changed after building the package.') + echo_failure("The package has been changed after building the package.") for fail in fails: print(fail) sys.exit(1) diff --git a/kecpkg/commands/upload.py b/kecpkg/commands/upload.py index 83bca17..9d84819 100644 --- a/kecpkg/commands/upload.py +++ b/kecpkg/commands/upload.py @@ -6,114 +6,194 @@ from kecpkg.commands.utils import CONTEXT_SETTINGS from kecpkg.settings import load_settings, save_settings, SETTINGS_FILENAME -from kecpkg.utils import get_package_dir, get_package_name, echo_success, echo_failure, echo_info - - -@click.command(context_settings=CONTEXT_SETTINGS, - short_help="Upload package to a KE-chain 2 scope") -@click.argument('package', required=False) -@click.option('--settings', '--config', '-s', 'settings_filename', - help="path to the setting file (default `{}`".format(SETTINGS_FILENAME), - type=click.Path(exists=True), default=SETTINGS_FILENAME) -@click.option('--url', '-U', help="URL of the KE-chain instance (eg. https://.ke-chain.com)") -@click.option('--username', '-u', help="username for KE-chain", default=os.environ.get('USER', '')) -@click.option('--password', '-p', help="password for KE-chain") -@click.option('--token', help="token for KE-chain access") -@click.option('--scope', help="scope name to upload the kecpkg to") -@click.option('--scope-id', 'scope_id', help="UUID of the scope to upload the kecpkg to", type=click.UUID) -@click.option('--service-id', 'service_id', help="(optional) id of the service to reupload", type=click.UUID) -@click.option('--reupload', '--replace', '-r', is_flag=True, default=False, - help="(optional) reupload the kecpkg to an already existing service") -@click.option('--interactive', '-i', is_flag=True, help="interactive mode; guide me through the upload") -@click.option('--kecpkg', help="(optional) path to the kecpkg file to upload", type=click.Path()) -@click.option('--store/--no-store', is_flag=True, default=True, - help="(optional) flag to store provided interactive information to settings (except pass)") -def upload(package=None, url=None, username=None, password=None, token=None, scope=None, scope_id=None, kecpkg=None, - **options): +from kecpkg.utils import ( + get_package_dir, + get_package_name, + echo_success, + echo_failure, + echo_info, +) + + +@click.command( + context_settings=CONTEXT_SETTINGS, short_help="Upload package to a KE-chain 2 scope" +) +@click.argument("package", required=False) +@click.option( + "--settings", + "--config", + "-s", + "settings_filename", + help=f"path to the setting file (default `{SETTINGS_FILENAME}`", + type=click.Path(exists=True), + default=SETTINGS_FILENAME, +) +@click.option( + "--url", + "-U", + help="URL of the KE-chain instance (eg. https://.ke-chain.com)", +) +@click.option( + "--username", "-u", help="username for KE-chain", default=os.environ.get("USER", "") +) +@click.option("--password", "-p", help="password for KE-chain") +@click.option("--token", help="token for KE-chain access") +@click.option("--scope", help="scope name to upload the kecpkg to") +@click.option( + "--scope-id", + "scope_id", + help="UUID of the scope to upload the kecpkg to", + type=click.UUID, +) +@click.option( + "--service-id", + "service_id", + help="(optional) id of the service to reupload", + type=click.UUID, +) +@click.option( + "--reupload", + "--replace", + "-r", + is_flag=True, + default=False, + help="(optional) reupload the kecpkg to an already existing service", +) +@click.option( + "--interactive", + "-i", + is_flag=True, + help="interactive mode; guide me through the upload", +) +@click.option( + "--kecpkg", help="(optional) path to the kecpkg file to upload", type=click.Path() +) +@click.option( + "--store/--no-store", + is_flag=True, + default=True, + help="(optional) flag to store provided interactive information to settings (except pass)", +) +def upload( + package=None, + url=None, + username=None, + password=None, + token=None, + scope=None, + scope_id=None, + kecpkg=None, + **options, +): """ Upload built kecpkg to KE-chain. If no options are provided, the interactive mode is triggered. """ - package_name = package or get_package_name() or click.prompt('Package name') - settings = load_settings(package_dir=get_package_dir(package_name), - settings_filename=options.get('settings_filename')) + package_name = package or get_package_name() or click.prompt("Package name") + settings = load_settings( + package_dir=get_package_dir(package_name), + settings_filename=options.get("settings_filename"), + ) if not url or not ((username and password) or token): - url = click.prompt('Url (incl http(s)://)', default=settings.get('url') or url) - username = click.prompt('Username', default=settings.get('username') or username) - password = click.prompt('Password', hide_input=True) + url = click.prompt("Url (incl http(s)://)", default=settings.get("url") or url) + username = click.prompt( + "Username", default=settings.get("username") or username + ) + password = click.prompt("Password", hide_input=True) # set the interactive world to True for continuation sake - options['interactive'] = True - elif not options.get('interactive'): - url = url or settings.get('url') - username = username or settings.get('username') - token = token or settings.get('token') - scope_id = scope_id or settings.get('scope_id') + options["interactive"] = True + elif not options.get("interactive"): + url = url or settings.get("url") + username = username or settings.get("username") + token = token or settings.get("token") + scope_id = scope_id or settings.get("scope_id") client = Client(url) client.login(username=username, password=password, token=token) # scope finder - if not scope_id and settings.get('scope_id') and \ - click.confirm("Do you wish to use the stored `scope_id` in settings: `{}`".format( - settings.get('scope_id')), default=True): - scope_id = settings.get('scope_id') + if ( + not scope_id + and settings.get("scope_id") + and click.confirm( + "Do you wish to use the stored `scope_id` in settings: `{}`".format( + settings.get("scope_id") + ), + default=True, + ) + ): + scope_id = settings.get("scope_id") if not scope_id: scopes = client.scopes() - scope_matcher = [dict(number=i, scope_id=scope.id, scope=scope.name) for i, scope in - zip(range(1, len(scopes)), scopes)] + scope_matcher = [ + dict(number=i, scope_id=scope.id, scope=scope.name) + for i, scope in zip(range(1, len(scopes)), scopes) + ] # nice UI - echo_info('Choose from following scopes:') + echo_info("Choose from following scopes:") for match_dict in scope_matcher: echo_info("{number} | {scope_id:.8} | {scope}".format(**match_dict)) scope_match = None while not scope_match: - scope_guess = click.prompt('Row number, part of Id or Scope') + scope_guess = click.prompt("Row number, part of Id or Scope") scope_match = validate_scopes(scope_guess, scope_matcher) echo_success("Scope selected: '{scope}' ({scope_id})".format(**scope_match)) - scope_id = scope_match['scope_id'] + scope_id = scope_match["scope_id"] scope_to_upload = get_project(url, username, password, token, scope_id=scope_id) # service reupload - service_id = options.get('service_id') or settings.get('service_id') - if options.get('reupload') and not service_id: - echo_failure('Please provide a service id to reupload to.') - elif service_id and not options.get('reupload') and options.get('interactive'): - if click.confirm("Do you wish to *replace* the previously uploaded service: `{}`".format(service_id), - default=True): + service_id = options.get("service_id") or settings.get("service_id") + if options.get("reupload") and not service_id: + echo_failure("Please provide a service id to reupload to.") + elif service_id and not options.get("reupload") and options.get("interactive"): + if click.confirm( + f"Do you wish to *replace* the previously uploaded service: `{service_id}`", + default=True, + ): service_id = service_id else: service_id = None # store to settings - if options.get('store'): - settings.update(dict( - url=url, - username=username, - scope_id=str(scope_id) - )) + if options.get("store"): + settings.update(dict(url=url, username=username, scope_id=str(scope_id))) if service_id: - settings['service_id'] = str(service_id) - save_settings(settings, settings_filename=options.get('settings_filename')) + settings["service_id"] = str(service_id) + save_settings(settings, settings_filename=options.get("settings_filename")) # do upload - build_path = os.path.join(get_package_dir(package_name), settings.get('build_dir')) + build_path = os.path.join(get_package_dir(package_name), settings.get("build_dir")) if not os.path.exists(build_path): - echo_failure('Cannot find build path, please do `kecpkg build` first') + echo_failure("Cannot find build path, please do `kecpkg build` first") sys.exit(400) - upload_package(scope_to_upload, build_path, kecpkg, service_id=service_id, settings=settings, - store_settings=options.get('store'), settings_filename=options.get('settings_filename')) + upload_package( + scope_to_upload, + build_path, + kecpkg, + service_id=service_id, + settings=settings, + store_settings=options.get("store"), + settings_filename=options.get("settings_filename"), + ) -def upload_package(scope, build_path=None, kecpkg_path=None, service_id=None, settings=None, store_settings=True, - settings_filename=None): +def upload_package( + scope, + build_path=None, + kecpkg_path=None, + service_id=None, + settings=None, + store_settings=True, + settings_filename=None, +): """ Upload the package from build_path to the right scope, create a new KE-chain SIM service. @@ -134,21 +214,21 @@ def upload_package(scope, build_path=None, kecpkg_path=None, service_id=None, se else: built_kecpkgs = os.listdir(build_path) - if not kecpkg_path and len(built_kecpkgs) > 1 and settings.get('version'): - built_kecpkgs = [f for f in built_kecpkgs if settings.get('version') in f] + if not kecpkg_path and len(built_kecpkgs) > 1 and settings.get("version"): + built_kecpkgs = [f for f in built_kecpkgs if settings.get("version") in f] if not kecpkg_path and len(built_kecpkgs) == 1: kecpkg_path = os.path.join(build_path, built_kecpkgs[0]) else: - echo_info('Provide correct filename to upload') - echo_info('\n'.join(os.listdir(build_path))) - kecpkg_filename = click.prompt('Filename') + echo_info("Provide correct filename to upload") + echo_info("\n".join(os.listdir(build_path))) + kecpkg_filename = click.prompt("Filename") kecpkg_path = os.path.join(build_path, kecpkg_filename) if kecpkg_path and os.path.exists(kecpkg_path): # ready to upload - echo_info('Ready to upload `{}`'.format(os.path.basename(kecpkg_path))) + echo_info(f"Ready to upload `{os.path.basename(kecpkg_path)}`") else: - echo_failure('Unable to locate kecpkg to upload') + echo_failure("Unable to locate kecpkg to upload") sys.exit(404) # get meta and prepare 2 stage submission @@ -159,36 +239,37 @@ def upload_package(scope, build_path=None, kecpkg_path=None, service_id=None, se service = scope.service(pk=service_id) service.upload(kecpkg_path) service.edit( - name=settings.get('package_name'), - description=settings.get('description', ''), - script_version=settings.get('version', '') + name=settings.get("package_name"), + description=settings.get("description", ""), + script_version=settings.get("version", ""), ) else: # Create new service in KE-chain service = scope.create_service( - name=settings.get('package_name'), - description=settings.get('description', ''), - version=settings.get('version', ''), - service_type='PYTHON SCRIPT', - environment_version=settings.get('python_version'), - pkg_path=kecpkg_path + name=settings.get("package_name"), + description=settings.get("description", ""), + version=settings.get("version", ""), + service_type="PYTHON SCRIPT", + environment_version=settings.get("python_version"), + pkg_path=kecpkg_path, ) # Wrap up party! - echo_success("kecpkg `{}` successfully uploaded to KE-chain.".format(os.path.basename(kecpkg_path))) + echo_success( + f"kecpkg `{os.path.basename(kecpkg_path)}` successfully uploaded to KE-chain." + ) # noinspection PyProtectedMember success_url = "{api_root}/#scopes/{scope_id}/scripts/{service_id}".format( - api_root=scope._client.api_root, - scope_id=scope.id, - service_id=service.id + api_root=scope._client.api_root, scope_id=scope.id, service_id=service.id ) - echo_success("To view the newly created service, go to: `{}`".format(success_url)) + echo_success(f"To view the newly created service, go to: `{success_url}`") # update settings if store_settings: - settings['service_id'] = str(service.id) + settings["service_id"] = str(service.id) from datetime import datetime - settings['last_upload'] = str(datetime.now().isoformat()) + + settings["last_upload"] = str(datetime.now().isoformat()) save_settings(settings, settings_filename=settings_filename) @@ -197,15 +278,18 @@ def validate_scopes(scope_guess, scope_matcher): scope_matches = [] for scope_match in scope_matcher: # order is important as '1' can also be in UUID and Name, so we use exclusive if statements - if scope_guess == str(scope_match['number']): + if scope_guess == str(scope_match["number"]): scope_matches.append(scope_match) - elif len(scope_guess) >= 2 and scope_guess.lower() in scope_match['scope_id'].lower(): + elif ( + len(scope_guess) >= 2 + and scope_guess.lower() in scope_match["scope_id"].lower() + ): scope_matches.append(scope_match) - elif scope_guess.lower() in scope_match['scope'].lower(): + elif scope_guess.lower() in scope_match["scope"].lower(): scope_matches.append(scope_match) # only return when a single scope is matched if len(scope_matches) == 1: return scope_matches[0] - print('Could not match; be more specific') + print("Could not match; be more specific") return None diff --git a/kecpkg/commands/utils.py b/kecpkg/commands/utils.py index 48e89dd..72c06f7 100644 --- a/kecpkg/commands/utils.py +++ b/kecpkg/commands/utils.py @@ -1,7 +1,4 @@ -CONTEXT_SETTINGS = { - 'help_option_names': ['-h', '--help'], - 'max_content_width': 110 -} +CONTEXT_SETTINGS = {"help_option_names": ["-h", "--help"], "max_content_width": 110} UNKNOWN_OPTIONS = { - 'ignore_unknown_options': True, + "ignore_unknown_options": True, }.update(CONTEXT_SETTINGS) diff --git a/kecpkg/create.py b/kecpkg/create.py index f728967..06459b6 100644 --- a/kecpkg/create.py +++ b/kecpkg/create.py @@ -1,5 +1,3 @@ -from __future__ import print_function - import os import subprocess import sys @@ -7,8 +5,15 @@ import six from kecpkg.files.rendering import render_to_file -from kecpkg.utils import (ensure_dir_exists, get_proper_python, NEED_SUBPROCESS_SHELL, venv, - echo_success, echo_failure, echo_info) +from kecpkg.utils import ( + ensure_dir_exists, + get_proper_python, + NEED_SUBPROCESS_SHELL, + venv, + echo_success, + echo_failure, + echo_info, +) def create_package(package_dir, settings): @@ -25,24 +30,38 @@ def create_package(package_dir, settings): :param settings: settings dict """ ensure_dir_exists(package_dir) - render_to_file('README.md', content=settings, target_dir=package_dir) - render_to_file('requirements.txt', content=settings, target_dir=package_dir) - render_to_file('package_info.json', content=dict(requirements_txt='requirements.txt', - entrypoint_script=settings.get('entrypoint_script'), - entrypoint_func=settings.get('entrypoint_func')), - target_dir=package_dir) - render_to_file('.gitignore', content=dict(), target_dir=package_dir) - render_to_file('.env', content=dict(), target_dir=package_dir) + render_to_file("README.md", content=settings, target_dir=package_dir) + render_to_file("requirements.txt", content=settings, target_dir=package_dir) + render_to_file( + "package_info.json", + content=dict( + requirements_txt="requirements.txt", + entrypoint_script=settings.get("entrypoint_script"), + entrypoint_func=settings.get("entrypoint_func"), + ), + target_dir=package_dir, + ) + render_to_file(".gitignore", content=dict(), target_dir=package_dir) + render_to_file(".env", content=dict(), target_dir=package_dir) # runconfigurations - run_configurations_path = os.path.join(package_dir, '.idea', 'runConfigurations') + run_configurations_path = os.path.join(package_dir, ".idea", "runConfigurations") ensure_dir_exists(run_configurations_path) - render_to_file('Upload_the_kecpkg.xml', content=dict(), target_dir=run_configurations_path) - render_to_file('Build_the_kecpkg.xml', content=dict(), target_dir=run_configurations_path) + render_to_file( + "Upload_the_kecpkg.xml", content=dict(), target_dir=run_configurations_path + ) + render_to_file( + "Build_the_kecpkg.xml", content=dict(), target_dir=run_configurations_path + ) - script_filename = '{}.py'.format(settings.get('entrypoint_script')) + script_filename = "{}.py".format(settings.get("entrypoint_script")) - render_to_file(script_filename, content=settings, template='script.py.template', target_dir=package_dir) + render_to_file( + script_filename, + content=settings, + template="script.py.template", + target_dir=package_dir, + ) def create_venv(package_dir, settings, pypath=None, use_global=False, verbose=False): @@ -57,21 +76,24 @@ def create_venv(package_dir, settings, pypath=None, use_global=False, verbose=Fa :param package_dir: the full path to the package directory :param settings: the settings dict (including the venv_dir name to create the right venv) - :param pypath: absolute path to the python binary interpreter to create the virtual environment with - :param use_global: Use global sysem site packages when creating virtual environment (default False) + :param pypath: absolute path to the python binary interpreter to create the virtual + environment with + :param use_global: Use global sysem site packages when creating virtual environment + (default False) :param verbose: Use verbosity (default False) """ - venv_dir = os.path.join(package_dir, settings.get('venv_dir')) + venv_dir = os.path.join(package_dir, settings.get("venv_dir")) if not pypath: - from distutils.spawn import find_executable - pypath = find_executable(get_proper_python()) + import shutil + + pypath = shutil.which(get_proper_python()) - command = [sys.executable, '-m', 'virtualenv', venv_dir, '-p', pypath] + command = [sys.executable, "-m", "virtualenv", venv_dir, "-p", pypath] if use_global: # no cov - command.append('--system-site-packages') + command.append("--system-site-packages") if not verbose: # no cov - command.append('-qqq') + command.append("-qqq") if six.PY3: result = subprocess.run(command, shell=NEED_SUBPROCESS_SHELL) return result.returncode @@ -88,32 +110,40 @@ def pip_install_venv(package_dir, settings, verbose=False): :param settings: the settings dict (incluing the venv_dir name) :param verbose: (optional) be more verbose if set to True, defaults to False """ - venv_dir = os.path.join(package_dir, settings.get('venv_dir')) + venv_dir = os.path.join(package_dir, settings.get("venv_dir")) if not os.path.exists(venv_dir): - echo_failure('virtual environment directory `{}` does not exists, nothing to install'.format(venv_dir)) + echo_failure( + f"virtual environment directory `{venv_dir}` does not exists, nothing to install" + ) sys.exit(1) - if not os.path.exists(os.path.join(package_dir, settings.get('requirements_filename'))): - echo_failure('could not find requirements.txt to install, check if `{}` exists or update settings'.format( - settings.get('requirements_filename'))) + if not os.path.exists( + os.path.join(package_dir, settings.get("requirements_filename")) + ): + echo_failure(f"could not find requirements.txt to install, check if " + f"`{settings.get('requirements_filename')}` exists or update settings") sys.exit(1) - install_command = [sys.executable, '-m', 'pip', 'install', '-r', - os.path.join(package_dir, settings.get('requirements_filename'))] + install_command = [ + sys.executable, + "-m", + "pip", + "install", + "-r", + os.path.join(package_dir, settings.get("requirements_filename")), + ] if not verbose: # no cov - install_command.append('-qqq') + install_command.append("-qqq") with venv(venv_dir): - echo_info('Installing requirements from `{}` into the virtual environment `{}`'. - format(settings.get('requirements_filename'), settings.get('venv_dir'))) + echo_info( + "Installing requirements from `{}` into the virtual environment `{}`".format( + settings.get("requirements_filename"), settings.get("venv_dir") + ) + ) result = None - if six.PY3: - result = subprocess.run(install_command, shell=NEED_SUBPROCESS_SHELL) - return result.returncode - elif six.PY2: - result = subprocess.check_output(install_command, shell=NEED_SUBPROCESS_SHELL) - return result and 0 or -1 + result = subprocess.run(install_command, shell=NEED_SUBPROCESS_SHELL) if result: echo_success(str(result)) diff --git a/kecpkg/files/rendering.py b/kecpkg/files/rendering.py index ef2d209..5a3f194 100644 --- a/kecpkg/files/rendering.py +++ b/kecpkg/files/rendering.py @@ -11,7 +11,11 @@ def get_environment(): """Retrieve the jinja 2 environment as this is a singleton.""" global __environment if not __environment: - __environment = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates'))) + __environment = Environment( + loader=FileSystemLoader( + os.path.join(os.path.dirname(__file__), "templates") + ) + ) return __environment @@ -47,7 +51,7 @@ def render_to_file(filename, content, target_dir=None, template=None): :return: None """ # alias for render_to_template - template = template or '{}.template'.format(filename) + template = template or f"{filename}.template" target_dir = target_dir or os.getcwd() filepath = os.path.join(target_dir, filename) diff --git a/kecpkg/files/templates/script.py.template b/kecpkg/files/templates/script.py.template index d9f184a..7640375 100644 --- a/kecpkg/files/templates/script.py.template +++ b/kecpkg/files/templates/script.py.template @@ -6,7 +6,6 @@ This script is part of package `{{ package_name }}` Created by: {{ name }} <{{ email }}> {%- endif -%} """ -from pykechain import get_project import sys __version__ = '{{ version }}' diff --git a/kecpkg/gpg.py b/kecpkg/gpg.py index ad7bf8c..bb57669 100644 --- a/kecpkg/gpg.py +++ b/kecpkg/gpg.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import hashlib import logging import os @@ -5,26 +7,29 @@ import subprocess import sys from datetime import datetime +from typing import Any, Optional -import gnupg import six +from gnupg import GPG from kecpkg.settings import GNUPG_KECPKG_HOME -from kecpkg.utils import ON_LINUX, ON_WINDOWS, ON_MACOS, echo_failure, read_chunks, echo_info, ensure_dir_exists +from kecpkg.utils import ( + ON_LINUX, ON_MACOS, ON_WINDOWS, echo_failure, echo_info, ensure_dir_exists, read_chunks, +) LOGLEVEL = logging.INFO -def hash_of_file(path, algorithm='sha256'): - """Return the hash digest of a file.""" - with open(path, 'rb') as archive: - hash = hashlib.new(algorithm) +def hash_of_file(path, algorithm="sha256"): + """Return the my_hash digest of a file.""" + with open(path, "rb") as archive: + my_hash = hashlib.new(algorithm) for chunk in read_chunks(archive): - hash.update(chunk) - return hash.hexdigest() + my_hash.update(chunk) + return my_hash.hexdigest() -__gpg = None # type: gnupg.GPG or None +__gpg: Optional[GPG] = None def has_gpg(): @@ -41,52 +46,63 @@ def has_gpg(): return True -def get_gpg(): - # type: () -> gnupg.GPG - """Return the GPG objects instantiated with custom KECPKG keyring in custom KECPKG GNUPG home.""" +def get_gpg() -> GPG: + """Return the GPG objects with custom KECPKG keyring in custom KECPKG GNUPG home.""" global __gpg if not __gpg: if six.PY2: - echo_failure('Package signing capability is not available in python 2.7. Please use python 3 or greater.') + echo_failure( + "Package signing capability is not available in python 2.7. " + "Please use python 3 or greater." + ) sys.exit(1) - import gnupg logging.basicConfig(level=LOGLEVEL) - logging.getLogger('gnupg') - gpg_bin = 'gpg' + logging.getLogger("gnupg") + gpg_bin = "gpg" if ON_LINUX: - gpg_bin = subprocess.getoutput('which gpg') + gpg_bin = subprocess.getoutput("which gpg") if ON_WINDOWS: - bin_path_guesses = ["C:\\Program Files (x86)\\GnuPG\\bin\\gpg.exe", - "C:\\Program Files\\GnuPG\\gpg.exe", - "C:\\Program Files (x86)\\GnuPG\\gpg.exe", - "C:\\Program Files\\GnuPG\\bin\\gpg.exe"] + bin_path_guesses = [ + "C:\\Program Files (x86)\\GnuPG\\bin\\gpg.exe", + "C:\\Program Files\\GnuPG\\gpg.exe", + "C:\\Program Files (x86)\\GnuPG\\gpg.exe", + "C:\\Program Files\\GnuPG\\bin\\gpg.exe", + ] gpg_bins = [p for p in bin_path_guesses if os.path.exists(p)] if gpg_bins is not None: gpg_bin = gpg_bins[0] else: gpg_bin = bin_path_guesses[0] elif ON_MACOS: - gpg_bin = '/usr/local/bin/gpg' + gpg_bin = "/usr/local/bin/gpg" if not os.path.exists(gpg_bin): - echo_failure("Unable to detect installed GnuPG executable. Ensure you have it installed. " - "We checked: '{}'".format(gpg_bin)) - echo_failure("- For Linux please install GnuPG using your package manager. In Ubuntu/Debian this can be " - "achieved with `sudo apt install gnupg`.") + echo_failure( + "Unable to detect installed GnuPG executable. Ensure you have it installed. " + "We checked: '{}'".format(gpg_bin) + ) + echo_failure( + "- For Linux please install GnuPG using your package manager. In " + "Ubuntu/Debian this can be " + "achieved with `sudo apt install gnupg`." + ) echo_failure("- For Mac OSX please install GnuPG using `brew install gpg`.") - echo_failure("- For Windows please install GnuPG using the downloads via: https://gnupg.org/download/") + echo_failure( + "- For Windows please install GnuPG using the downloads via: " + "https://gnupg.org/download/" + ) sys.exit(1) if not os.path.exists(GNUPG_KECPKG_HOME): # create the GNUPG_KECPKG_HOME when not exist, otherwise the GPG will fail ensure_dir_exists(GNUPG_KECPKG_HOME) - __gpg = gnupg.GPG(gpgbinary=gpg_bin, gnupghome=GNUPG_KECPKG_HOME) + __gpg = GPG(gpgbinary=gpg_bin, gnupghome=GNUPG_KECPKG_HOME) return __gpg -def list_keys(gpg): +def list_keys(gpg) -> list[list[str | None | Any]]: """ List all keys from the KECPKG keystore and return it as a list of list. @@ -96,19 +112,19 @@ def list_keys(gpg): result = gpg.list_keys(secret=True) key_list = [] for r in result: - uids = parse_key_uids(r.get('uids')) + uids = parse_key_uids(r.get("uids")) row = [ - uids.get('name'), - uids.get('comment'), - uids.get('email'), - str(datetime.fromtimestamp(int(r.get('expires')))), - r.get('fingerprint') + uids.get("name"), + uids.get("comment"), + uids.get("email"), + str(datetime.fromtimestamp(int(r.get("expires")))), + r.get("fingerprint"), ] key_list.append(row) return key_list -def tabulate_keys(gpg, explain=False): +def tabulate_keys(gpg: GPG, explain: Optional[bool] = False) -> None: """ List all keys in a table for printing on the CLI. @@ -116,21 +132,30 @@ def tabulate_keys(gpg, explain=False): If explain = Truem, it will exit with returncode 1 when no keys are present. :param gpg: GPG objects - :param explain: With explain is True, more text is added and will exit(1) when no keys are present. + :param explain: With explain is True, more text is added and will exit(1) when no + keys are present. :return: None. """ result = gpg.list_keys(secret=True) if len(result): from tabulate import tabulate - print(tabulate(list_keys(gpg=gpg), headers=("Name", "Comment", "E-mail", "Expires", "Fingerprint"))) + + print( + tabulate( + list_keys(gpg=gpg), + headers=("Name", "Comment", "E-mail", "Expires", "Fingerprint"), + ) + ) else: - echo_info("No keys found in KECPKG keyring. Use `--import-key` or `--create-key` to add a " - "secret key to the KECPKG keyring in order to sign KECPKG's.") + echo_info( + "No keys found in KECPKG keyring. Use `--import-key` or `--create-key` to add a " + "secret key to the KECPKG keyring in order to sign KECPKG's." + ) if explain: sys.exit(1) -def parse_key_uids(uids): +def parse_key_uids(uids: str) -> dict[str, str]: """ Parse GPG key uids into a dictionary with Name, Comment and email. diff --git a/kecpkg/settings.py b/kecpkg/settings.py index ffba260..ba30d88 100644 --- a/kecpkg/settings.py +++ b/kecpkg/settings.py @@ -10,38 +10,78 @@ from kecpkg.utils import ensure_dir_exists, create_file, get_package_dir, echo_failure -SETTINGS_FILENAME = '.kecpkg_settings.json' +SETTINGS_FILENAME = ".kecpkg_settings.json" SETTINGS_FILE = os.path.join(os.getcwd(), SETTINGS_FILENAME) -ARTIFACTS_FILENAME = 'ARTIFACTS' -ARTIFACTS_SIG_FILENAME = 'ARTIFACTS.SIG' +ARTIFACTS_FILENAME = "ARTIFACTS" +ARTIFACTS_SIG_FILENAME = "ARTIFACTS.SIG" # using the appdirs.user_data_dir to manage user data on various platforms. -GNUPG_KECPKG_HOME = os.path.join(user_data_dir('kecpkg', 'KE-works BV'), '.gnupg') - -DEFAULT_SETTINGS = OrderedDict([ - ('version', '0.0.1'), - ('pyversions', ['2.7', '3.5', '3.6']), - ('python_version', '3.5'), - ('venv_dir', 'venv'), - ('entrypoint_script', 'script'), - ('entrypoint_func', 'main'), - ('build_dir', 'dist'), - ('requirements_filename', 'requirements.txt'), - ('artifacts_filename', ARTIFACTS_FILENAME), - ('artifacts_sig_filename', ARTIFACTS_SIG_FILENAME), - ('hash_algorithm', 'sha256') -]) +GNUPG_KECPKG_HOME = os.path.join(user_data_dir("kecpkg", "KE-works BV"), ".gnupg") + +DEFAULT_SETTINGS = OrderedDict( + [ + ("version", "0.1.0"), + ("pyversions", ["3.9", "3.12"]), + ("python_version", "3.12"), + ("venv_dir", "venv"), + ("entrypoint_script", "script"), + ("entrypoint_func", "main"), + ("build_dir", "dist"), + ("requirements_filename", "requirements.txt"), + ("artifacts_filename", ARTIFACTS_FILENAME), + ("artifacts_sig_filename", ARTIFACTS_SIG_FILENAME), + ("hash_algorithm", "sha256"), + ] +) EXCLUDE_DIRS_IN_BUILD = [ - 'venv', 'dist', '.idea', '.tox', '.cache', '.git', 'venv*', '_venv*', '.env', '__pycache__', 'develop-eggs', - 'downloads', 'eggs', 'lib', 'lib64', 'sdist', 'wheels', '.hypothesis', '.ipynb_checkpoints', '.mypy_cache', - '.vscode' + "venv", + "dist", + ".idea", + ".tox", + ".cache", + ".git", + "venv*", + "_venv*", + ".env", + "__pycache__", + "develop-eggs", + "downloads", + "eggs", + "lib", + "lib64", + "sdist", + "wheels", + ".hypothesis", + ".ipynb_checkpoints", + ".mypy_cache", + ".vscode", ] EXCLUDE_PATHS_IN_BUILD = [ - '.gitignore', '*.pyc', '*.pyo', '*.pyd', '*$py.class', '*.egg-info', '.installed.cfg', '.coveragerc', '*.egg', - 'pip-log.txt', '*.log', 'pip-delete-this-directory.txt', '.coverage*', 'nosetests.xml', 'coverage.xml', '*.cover', - 'env.bak', 'venv.bak', 'pip-selfcheck.json', '*.so', '*-dist', '.*.swp', '*.asc' + ".gitignore", + "*.pyc", + "*.pyo", + "*.pyd", + "*$py.class", + "*.egg-info", + ".installed.cfg", + ".coveragerc", + "*.egg", + "pip-log.txt", + "*.log", + "pip-delete-this-directory.txt", + ".coverage*", + "nosetests.xml", + "coverage.xml", + "*.cover", + "env.bak", + "venv.bak", + "pip-selfcheck.json", + "*.so", + "*-dist", + ".*.swp", + "*.asc", ] EXCLUDE_IN_BUILD = EXCLUDE_DIRS_IN_BUILD + EXCLUDE_PATHS_IN_BUILD @@ -65,10 +105,10 @@ def load_settings(lazy=False, package_dir=None, settings_filename=None): if lazy and not os.path.exists(settings_filepath): return {} elif not os.path.exists(settings_filepath): - echo_failure('Could not find a settingsfile in path: {}'.format(settings_filepath)) + echo_failure(f"Could not find a settingsfile in path: {settings_filepath}") sys.exit(404) else: - with open(settings_filepath, 'r') as f: + with open(settings_filepath) as f: return json.loads(f.read(), object_pairs_hook=OrderedDict) @@ -97,8 +137,8 @@ def save_settings(settings, package_dir=None, settings_filename=None): :param package_dir: (optional) package_dir to save to :return: None """ - if settings.get('package_name') and not package_dir: - package_dir = get_package_dir(settings.get('package_name')) + if settings.get("package_name") and not package_dir: + package_dir = get_package_dir(settings.get("package_name")) settings_filepath = get_settings_filepath(package_dir, settings_filename) ensure_dir_exists(os.path.dirname(settings_filepath)) @@ -120,5 +160,5 @@ def restore_settings(package_dir=None, settings_filename=None): settings = copy_default_settings() if package_dir: package_name = os.path.dirname(package_dir) - settings['package_name'] = package_name + settings["package_name"] = package_name save_settings(settings=settings, settings_filename=settings_filename) diff --git a/kecpkg/utils.py b/kecpkg/utils.py index c94bda4..f8b8ca2 100644 --- a/kecpkg/utils.py +++ b/kecpkg/utils.py @@ -10,7 +10,6 @@ import platform import re import shutil -import six import sys from contextlib import contextmanager @@ -33,8 +32,9 @@ def create_file(filepath, content=None, overwrite=True): Will overwrite file already in place if overwrite flag is set. - If a list is provided each line in the list is written on a new line in the file (`fp.writelines`) - otherwise the string will be written as such and newline characters (`\\\\n`) will be respected. + If a list is provided each line in the list is written on a new line in the file ( + `fp.writelines`) otherwise the string will be written as such and newline characters ( + `\\\\n`) will be respected. :param filepath: full path to a file to create :param content: textual content. @@ -47,14 +47,14 @@ def create_file(filepath, content=None, overwrite=True): # if overwrite is set to True overwrite file, otherwise if file exist, exit. if not os.path.exists(filepath) or (os.path.exists(filepath) and overwrite): - with open(filepath, 'w') as fd: + with open(filepath, "w") as fd: # os.utime(filepath, times=None) if isinstance(content, list): fd.writelines(content) else: fd.write(content) else: - echo_failure("File '{}' already exists.".format(filepath)) + echo_failure(f"File '{filepath}' already exists.") sys.exit(1) @@ -66,11 +66,11 @@ def copy_path(sourcepath, destpath): :param destpath: destination path to copy to """ if os.path.isdir(sourcepath): - if six.PY3: - shutil.copytree(sourcepath, os.path.join(destpath, basepath(sourcepath)), - copy_function=shutil.copy) - else: - shutil.copytree(sourcepath, os.path.join(destpath, basepath(sourcepath))) + shutil.copytree( + sourcepath, + os.path.join(destpath, basepath(sourcepath)), + copy_function=shutil.copy, + ) else: shutil.copy(sourcepath, destpath) @@ -84,10 +84,10 @@ def remove_path(path): """ try: shutil.rmtree(path) - except (IOError, OSError): + except OSError: try: os.remove(path) - except IOError: + except OSError: pass @@ -121,7 +121,7 @@ def get_package_dir(package_name=None, fail=True): def _inner(d): if os.path.exists(os.path.join(d, SETTINGS_FILENAME)): return d - elif os.path.exists(os.path.join(d, 'package_info.json')): + elif os.path.exists(os.path.join(d, "package_info.json")): return d else: return None @@ -132,8 +132,10 @@ def _inner(d): if not package_dir and package_name is not None: package_dir = _inner(package_name) if not package_dir and package_name is not None: - echo_failure('This does not seem to be a package in path `{}` - please check that there is a ' - '`package_info.json` or a `{}`'.format(package_dir, SETTINGS_FILENAME)) + echo_failure( + "This does not seem to be a package in path `{}` - please check that there is a " + "`package_info.json` or a `{}`".format(package_dir, SETTINGS_FILENAME) + ) if fail: sys.exit(1) return package_dir @@ -152,7 +154,9 @@ def get_package_name(): return None -def get_artifacts_on_disk(root_path, additional_exclude_paths=None, default_exclude_paths=None, verbose=False): +def get_artifacts_on_disk( + root_path, additional_exclude_paths=None, default_exclude_paths=None, verbose=False +): # type: (str, list, list, bool) -> set """ Retrieve all artifacts on disk. @@ -167,18 +171,19 @@ def get_artifacts_on_disk(root_path, additional_exclude_paths=None, default_excl :rtype: set """ from kecpkg.settings import EXCLUDE_IN_BUILD + exclude_paths = default_exclude_paths or EXCLUDE_IN_BUILD if verbose: - echo_info("basic excluded paths are: `{}`".format(exclude_paths)) + echo_info(f"basic excluded paths are: `{exclude_paths}`") # get additional exclude paths from the settings file if additional_exclude_paths and isinstance(additional_exclude_paths, list): exclude_paths.extend(additional_exclude_paths) if verbose: - echo_info("additional exclude paths are: `{}`".format(additional_exclude_paths)) + echo_info(f"additional exclude paths are: `{additional_exclude_paths}`") if not os.path.exists(root_path): - echo_failure("The root path: '{}' does not exist".format(root_path)) + echo_failure(f"The root path: '{root_path}' does not exist") sys.exit(1) # getting all attachments @@ -188,22 +193,23 @@ def get_artifacts_on_disk(root_path, additional_exclude_paths=None, default_excl if exclude_path in dirs: dirs.remove(exclude_path) if verbose: - echo_warning("Ignored path `{}`".format(exclude_path)) + echo_warning(f"Ignored path `{exclude_path}`") for filename in filenames: # print([(filename,ptrn,fnmatch.fnmatch(filename, ptrn)) for ptrn in exclude_paths]) if not any([fnmatch.fnmatch(filename, ptrn) for ptrn in exclude_paths]): - full_artifact_subpath = '{}{}{}'.format(root, os.path.sep, filename). \ - replace('{}{}'.format(root_path, os.path.sep), '') + full_artifact_subpath = f"{root}{os.path.sep}{filename}".replace( + f"{root_path}{os.path.sep}", "" + ) artifacts.append(full_artifact_subpath) if verbose: - echo_info('Found `{}`'.format(full_artifact_subpath)) + echo_info(f"Found `{full_artifact_subpath}`") else: if verbose: - echo_warning('Ignored `{}`'.format(filename)) + echo_warning(f"Ignored `{filename}`") if verbose: - echo_info('{}'.format(artifacts)) + echo_info(f"{artifacts}") return set(artifacts) @@ -215,21 +221,26 @@ def render_package_info(settings, package_dir, backup=True): :param backup: (optional) if set to True the original package_info will be backed-up :return: """ - package_info_filename = 'package_info.json' + package_info_filename = "package_info.json" package_info_path = os.path.join(package_dir, package_info_filename) if backup and os.path.exists(package_info_path): - if os.path.exists("{}-dist".format(package_info_path)): - os.remove("{}-dist".format(package_info_path)) - os.rename(package_info_path, "{}-dist".format(package_info_path)) + if os.path.exists(f"{package_info_path}-dist"): + os.remove(f"{package_info_path}-dist") + os.rename(package_info_path, f"{package_info_path}-dist") elif os.path.exists(package_info_path): os.remove(package_info_path) from kecpkg.files.rendering import render_to_file - render_to_file(package_info_filename, - content=dict(requirements_txt=settings.get('requirements_filename', 'requirements.txt'), - entrypoint_script=settings.get('entrypoint_script'), - entrypoint_func=settings.get('entrypoint_func')), - target_dir=package_dir) + + render_to_file( + package_info_filename, + content=dict( + requirements_txt=settings.get("requirements_filename", "requirements.txt"), + entrypoint_script=settings.get("entrypoint_script"), + entrypoint_func=settings.get("entrypoint_func"), + ), + target_dir=package_dir, + ) def unzip_package(package_path, target_path): @@ -245,7 +256,8 @@ def unzip_package(package_path, target_path): :param target_path: target path to unzip the package into """ import zipfile - with zipfile.ZipFile(package_path, 'r') as zip_file: + + with zipfile.ZipFile(package_path, "r") as zip_file: zip_file.extractall(target_path) @@ -253,13 +265,13 @@ def unzip_package(package_path, target_path): # Graceously borrowed From hatch package. __platform = platform.system() -ON_LINUX = os.name == 'posix' or __platform == 'Linux' -ON_MACOS = os.name == 'mac' or __platform == 'Darwin' -ON_WINDOWS = NEED_SUBPROCESS_SHELL = os.name == 'nt' or __platform == 'Windows' +ON_LINUX = os.name == "posix" or __platform == "Linux" +ON_MACOS = os.name == "mac" or __platform == "Darwin" +ON_WINDOWS = NEED_SUBPROCESS_SHELL = os.name == "nt" or __platform == "Windows" VENV_FLAGS = { - '_HATCHING_', - 'VIRTUAL_ENV', - 'CONDA_PREFIX', + "_HATCHING_", + "VIRTUAL_ENV", + "CONDA_PREFIX", } @@ -269,7 +281,7 @@ def venv_ignored(): Graceously borrowed From hatch package. """ - return os.environ.get('_IGNORE_VENV_') == '1' + return os.environ.get("_IGNORE_VENV_") == "1" def venv_active(): @@ -288,12 +300,12 @@ def get_proper_python(): # no cov Graceously borrowed From hatch package. """ if not venv_active(): - default_python = os.environ.get('_DEFAULT_PYTHON_', None) + default_python = os.environ.get("_DEFAULT_PYTHON_", None) if default_python: return default_python elif not ON_WINDOWS: - return 'python3' - return 'python' + return "python3" + return "python" def get_proper_pip(): # no cov @@ -303,12 +315,12 @@ def get_proper_pip(): # no cov Graceously borrowed From hatch package. """ if not venv_active(): - default_pip = os.environ.get('_DEFAULT_PIP_', None) + default_pip = os.environ.get("_DEFAULT_PIP_", None) if default_pip: return default_pip elif not ON_WINDOWS: - return 'pip3' - return 'pip' + return "pip3" + return "pip" def locate_exe_dir(d, check=True): @@ -317,9 +329,11 @@ def locate_exe_dir(d, check=True): Graceously borrowed From hatch package. """ - exe_dir = os.path.join(d, 'Scripts') if ON_WINDOWS else os.path.join(d, 'bin') + exe_dir = os.path.join(d, "Scripts") if ON_WINDOWS else os.path.join(d, "bin") if check and not os.path.isdir(exe_dir): - raise OSError('Unable to locate python virtual environment executables directory.') + raise OSError( + "Unable to locate python virtual environment executables directory." + ) return exe_dir @@ -375,13 +389,13 @@ def venv(venv_path, evars=None): venv_exe_dir = locate_exe_dir(venv_path) evars = evars or {} - evars['_HATCHING_'] = '1' - evars['VIRTUAL_ENV'] = venv_path - evars['PATH'] = '{}{}{}'.format( - venv_exe_dir, os.pathsep, os.environ.get('PATH', '') + evars["_HATCHING_"] = "1" + evars["VIRTUAL_ENV"] = venv_path + evars["PATH"] = "{}{}{}".format( + venv_exe_dir, os.pathsep, os.environ.get("PATH", "") ) - with env_vars(evars, ignore={'__PYVENV_LAUNCHER__'}): + with env_vars(evars, ignore={"__PYVENV_LAUNCHER__"}): yield venv_exe_dir @@ -392,7 +406,7 @@ def echo_success(text, nl=True): :param text: string to write :param nl: add newline """ - click.secho(text, fg='cyan', bold=True, nl=nl) + click.secho(text, fg="cyan", bold=True, nl=nl) def echo_failure(text, nl=True): @@ -402,7 +416,7 @@ def echo_failure(text, nl=True): :param text: string to write :param nl: add newline """ - click.secho(text, fg='red', bold=True, nl=nl) + click.secho(text, fg="red", bold=True, nl=nl) def echo_warning(text, nl=True): @@ -412,7 +426,7 @@ def echo_warning(text, nl=True): :param text: string to write :param nl: add newline """ - click.secho(text, fg='yellow', bold=True, nl=nl) + click.secho(text, fg="yellow", bold=True, nl=nl) def echo_waiting(text, nl=True): @@ -422,7 +436,7 @@ def echo_waiting(text, nl=True): :param text: string to write :param nl: add newline """ - click.secho(text, fg='magenta', bold=True, nl=nl) + click.secho(text, fg="magenta", bold=True, nl=nl) def echo_info(text, nl=True): diff --git a/pyproject.toml.depr b/pyproject.toml.depr deleted file mode 100644 index c96a0ed..0000000 --- a/pyproject.toml.depr +++ /dev/null @@ -1,19 +0,0 @@ -[metadata] -name = 'kecpkg-tools' -version = '0.0.1' -description = 'packaging tools to assist in the creation of new kecpkg KE-chain executable SIM packages.' -author = 'Jochem Berends' -author_email = 'jochem.berends@ke-works.com' -license = 'Apache-2.0' -url = 'https://github.com/_/kecpkg-tools' - -[requires] -python_version = ['2.7', '3.5', '3.6', 'pypy', 'pypy3'] -requires = ['click', 'atomicwrites', 'jinja2', 'hatch', 'pykechain>=1.13', 'python-gnupg'] -testing_requires = ['coverage', 'pytest', 'toml', 'flake8', ] - -[build-system] -requires = ['setuptools', 'wheel'] - -[tool.hatch.commands] -prerelease = 'hatch build' diff --git a/setup.py b/setup.py index 565fba8..8213a84 100755 --- a/setup.py +++ b/setup.py @@ -3,82 +3,69 @@ import os from setuptools import find_packages, setup -PACKAGE_NAME = 'kecpkg' +PACKAGE_NAME = "kecpkg" HERE = os.path.abspath(os.path.dirname(__file__)) -with open(os.path.join(HERE, 'README.md'), encoding='utf-8') as f: +with open(os.path.join(HERE, "README.md"), encoding="utf-8") as f: long_description = f.read() ABOUT = {} -with open(os.path.join(PACKAGE_NAME, '__init__.py'), 'r') as f: +with open(os.path.join(PACKAGE_NAME, "__init__.py"), "r") as f: exec(f.read(), ABOUT) setup( - name='kecpkg-tools', - version=ABOUT.get('__version__'), - description='', + name="kecpkg-tools", + version=ABOUT.get("__version__"), + description="", long_description=long_description, - long_description_content_type='text/markdown', - author='Jochem Berends', - author_email='jochem.berends@ke-works.com', - maintainer='Jochem Berends', - maintainer_email='jochem.berends@ke-works.com', - url='https://github.com/jberends/kecpkg-tools', - license='Apache-2.0', - + long_description_content_type="text/markdown", + author="Jochem Berends", + author_email="jochem.berends@ke-works.com", + maintainer="Jochem Berends", + maintainer_email="jochem.berends@ke-works.com", + url="https://github.com/jberends/kecpkg-tools", + license="Apache-2.0", keywords=( - 'python', - 'package tools', - 'pykechain', - 'KE-chain', - 'Services Integration Module', - 'SIM', - 'KECPKG', - 'GPG' + "python", + "package tools", + "pykechain", + "KE-chain", + "Services Integration Module", + "SIM", + "KECPKG", + "GPG", ), - classifiers=( - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Natural Language :: English', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: Implementation :: CPython', - 'Programming Language :: Python :: Implementation :: PyPy', + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", ), - install_requires=( - 'click', - 'atomicwrites', - 'jinja2', - 'pykechain>=3.0', - 'appdirs', - 'tabulate', - 'python-gnupg' - ), - - tests_require=( - 'coverage', - 'pytest', - 'flake8', - 'virtualenv' + "click", + "atomicwrites", + "jinja2", + "pykechain>=3.0", + "appdirs", + "tabulate", + "python-gnupg", ), - - packages=find_packages(exclude=['tests']), - + tests_require=("coverage", "pytest", "flake8", "virtualenv"), + packages=find_packages(exclude=["tests"]), # to include the templates in the bdist wheel, we need to add package_data here package_data={ - 'kecpkg': ['files/templates/*.template', - 'files/templates/.*.template'] + "kecpkg": ["files/templates/*.template", "files/templates/.*.template"] }, - entry_points={ - 'console_scripts': ( - 'kecpkg = kecpkg.cli:kecpkg', - ), - } + "console_scripts": ("kecpkg = kecpkg.cli:kecpkg",), + }, ) diff --git a/snapcraft.yaml b/snapcraft.yaml deleted file mode 100644 index 749430f..0000000 --- a/snapcraft.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: kecpkg -version: git -version-script: git describe --tags -summary: A set of tools to easily create and manage KE-chain packages. -description: | - kecpkg provide a set of tools to easily create and manage KE-chain packages. - These are executable python scripts aimed for execution on the KE-chain SIM platform. - A valid user license of KE-chain is needed. See https://ke-chain.com for more information. -license: Apache-2.0 - -grade: devel # must be 'stable' to release into candidate/stable channels -confinement: devmode # use 'strict' once you have the right plugs and slots -base: core18 - -parts: - kecpkg: - plugin: python - python-version: python3 - source: . - stage-packages: - - python-six - - gpg - - gpg-agent -apps: - kecpkg: - command: bin/kecpkg - environment: - LC_ALL: C.UTF-8 - LANG: C.UTF-8 - plugs: - - gpg-keys - - gpg-public-keys - - network-bind - diff --git a/tests/commands/test_build.py b/tests/commands/test_build.py index 65352fe..3ac40a2 100644 --- a/tests/commands/test_build.py +++ b/tests/commands/test_build.py @@ -4,82 +4,91 @@ from click.testing import CliRunner from kecpkg.cli import kecpkg -from kecpkg.settings import load_settings, save_settings, copy_default_settings -from kecpkg.utils import get_package_dir, ensure_dir_exists -from tests.utils import temp_chdir, BaseTestCase, touch_file +from kecpkg.settings import copy_default_settings, save_settings +from kecpkg.utils import ensure_dir_exists, get_package_dir +from tests.utils import BaseTestCase, temp_chdir, touch_file class TestCommandPurge(BaseTestCase): def test_build_non_interactive(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) package_dir = get_package_dir(pkgname) self.assertTrue(os.path.exists(package_dir)) os.chdir(package_dir) - result = runner.invoke(kecpkg, ['build', pkgname]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) - self.assertExists(os.path.join(package_dir, 'dist')) - + result = runner.invoke(kecpkg, ["build", pkgname]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) + self.assertExists(os.path.join(package_dir, "dist")) # check if dist is filled - package_dir_contents = os.listdir(os.path.join(package_dir, 'dist')) + package_dir_contents = os.listdir(os.path.join(package_dir, "dist")) self.assertTrue(len(package_dir_contents), 1) def test_build_with_prune(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) package_dir = get_package_dir(pkgname) self.assertTrue(os.path.exists(package_dir)) os.chdir(package_dir) - result = runner.invoke(kecpkg, ['build', pkgname]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) - self.assertExists(os.path.join(package_dir, 'dist')) + result = runner.invoke(kecpkg, ["build", pkgname]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) + self.assertExists(os.path.join(package_dir, "dist")) # check if dist is filled - package_dir_contents = os.listdir(os.path.join(package_dir, 'dist')) + package_dir_contents = os.listdir(os.path.join(package_dir, "dist")) self.assertTrue(len(package_dir_contents), 1) # restart the build, with prune and check if dist still has 1 - result = runner.invoke(kecpkg, ['build', pkgname, '--prune']) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) - self.assertExists(os.path.join(package_dir, 'dist')) + result = runner.invoke(kecpkg, ["build", pkgname, "--prune"]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) + self.assertExists(os.path.join(package_dir, "dist")) # check if dist is filled - package_dir_contents = os.listdir(os.path.join(package_dir, 'dist')) + package_dir_contents = os.listdir(os.path.join(package_dir, "dist")) self.assertTrue(len(package_dir_contents), 1) - - def test_build_with_extra_ignores(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) package_dir = get_package_dir(pkgname) self.assertTrue(os.path.exists(package_dir)) - ensure_dir_exists(os.path.join(package_dir, 'data')) + ensure_dir_exists(os.path.join(package_dir, "data")) # add additional files (to exclude for building later) - touch_file(os.path.join(package_dir, 'data','somefile.txt')) - touch_file(os.path.join(package_dir, 'local_extra_file.someext.txt')) + touch_file(os.path.join(package_dir, "data", "somefile.txt")) + touch_file(os.path.join(package_dir, "local_extra_file.someext.txt")) os.chdir(package_dir) # check if those files exists package_dir_contents = os.listdir(os.path.join(package_dir)) - self.assertTrue('local_extra_file.someext.txt' in package_dir_contents) - self.assertTrue('data' in package_dir_contents) + self.assertTrue("local_extra_file.someext.txt" in package_dir_contents) + self.assertTrue("data" in package_dir_contents) # set exclude_paths in settings settings = copy_default_settings() @@ -87,41 +96,53 @@ def test_build_with_extra_ignores(self): save_settings(settings, package_dir=package_dir) # run the builder - result = runner.invoke(kecpkg, ['build', pkgname, '--verbose']) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) - self.assertExists(os.path.join(package_dir, 'dist')) + result = runner.invoke(kecpkg, ["build", pkgname, "--verbose"]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) + self.assertExists(os.path.join(package_dir, "dist")) # check the zip such that the extra files are not packaged - dist_list = os.listdir(os.path.join(package_dir, 'dist')) - zipfile = ZipFile(os.path.join(package_dir, 'dist', dist_list[0]), 'r') + dist_list = os.listdir(os.path.join(package_dir, "dist")) + zipfile = ZipFile(os.path.join(package_dir, "dist", dist_list[0]), "r") contents = zipfile.namelist() - self.assertFalse('local_extra_file.someext.txt' in contents) - self.assertFalse('data' in contents) + self.assertFalse("local_extra_file.someext.txt" in contents) + self.assertFalse("data" in contents) def test_build_with_alternate_config(self): - pkgname = 'new_pkg' - alt_settings = 'alt-settings.json' + pkgname = "new_pkg" + alt_settings = "alt-settings.json" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) package_dir = get_package_dir(pkgname) self.assertTrue(os.path.exists(package_dir)) # set alternative settings path settings = copy_default_settings() settings["package_name"] = pkgname - save_settings(settings, package_dir=package_dir, settings_filename=alt_settings) + save_settings( + settings, package_dir=package_dir, settings_filename=alt_settings + ) os.chdir(package_dir) - result = runner.invoke(kecpkg, ['build', pkgname, '--config', alt_settings]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) - self.assertExists(os.path.join(package_dir, 'dist')) + result = runner.invoke(kecpkg, ["build", pkgname, "--config", alt_settings]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) + self.assertExists(os.path.join(package_dir, "dist")) - dist_dir_contents = os.listdir(os.path.join(package_dir, 'dist')) + dist_dir_contents = os.listdir(os.path.join(package_dir, "dist")) self.assertTrue(len(dist_dir_contents), 1) - self.assertTrue(pkgname in dist_dir_contents[0], - "the name of the pkg `{}` should be in the name of " - "the built kecpkg `{}`".format(pkgname, dist_dir_contents[0])) + self.assertTrue( + pkgname in dist_dir_contents[0], + "the name of the pkg `{}` should be in the name of " + "the built kecpkg `{}`".format(pkgname, dist_dir_contents[0]), + ) diff --git a/tests/commands/test_new.py b/tests/commands/test_new.py index be48882..86b29f0 100644 --- a/tests/commands/test_new.py +++ b/tests/commands/test_new.py @@ -8,65 +8,93 @@ class TestCommandNew(BaseTestCase): def test_new_non_interactive_no_venv(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) self.assertEqual(result.exit_code, 0) self.assertTrue(os.path.exists(os.path.join(d, pkgname))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'script.py'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'package_info.json'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'README.md'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'requirements.txt'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, '.gitignore'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, '.kecpkg_settings.json'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, '.idea'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, '.env'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, '.idea', 'runConfigurations'))) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, "script.py"))) self.assertTrue( - os.path.exists(os.path.join(d, pkgname, '.idea', 'runConfigurations', 'Build_the_kecpkg.xml'))) + os.path.exists(os.path.join(d, pkgname, "package_info.json")) + ) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, "README.md"))) self.assertTrue( - os.path.exists(os.path.join(d, pkgname, '.idea', 'runConfigurations', 'Upload_the_kecpkg.xml'))) - - self.assertFalse(os.path.exists(os.path.join(d, pkgname, 'venv'))) + os.path.exists(os.path.join(d, pkgname, "requirements.txt")) + ) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, ".gitignore"))) + self.assertTrue( + os.path.exists(os.path.join(d, pkgname, ".kecpkg_settings.json")) + ) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, ".idea"))) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, ".env"))) + self.assertTrue( + os.path.exists(os.path.join(d, pkgname, ".idea", "runConfigurations")) + ) + self.assertTrue( + os.path.exists( + os.path.join( + d, pkgname, ".idea", "runConfigurations", "Build_the_kecpkg.xml" + ) + ) + ) + self.assertTrue( + os.path.exists( + os.path.join( + d, + pkgname, + ".idea", + "runConfigurations", + "Upload_the_kecpkg.xml", + ) + ) + ) + + self.assertFalse(os.path.exists(os.path.join(d, pkgname, "venv"))) def test_new_non_interactive_with_venv(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname]) + result = runner.invoke(kecpkg, ["new", pkgname]) e = result.exception self.assertEqual(result.exit_code, 0) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'venv'))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'venv', 'bin', 'activate'))) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, "venv"))) + self.assertTrue( + os.path.exists(os.path.join(d, pkgname, "venv", "bin", "activate")) + ) def test_new_non_iteractive_with_alternate_script(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--script', 'foobar', '--no-venv']) + result = runner.invoke( + kecpkg, ["new", pkgname, "--script", "foobar", "--no-venv"] + ) self.assertEqual(result.exit_code, 0) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, 'foobar.py'))) + self.assertTrue(os.path.exists(os.path.join(d, pkgname, "foobar.py"))) def test_new_non_interactive_with_alternate_venv_name(self): - pkgname = 'new_pkg' - venv_name = '_v' + pkgname = "new_pkg" + venv_name = "_v" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--venv', venv_name]) + result = runner.invoke(kecpkg, ["new", pkgname, "--venv", venv_name]) self.assertEqual(result.exit_code, 0) self.assertTrue(os.path.exists(os.path.join(d, pkgname, venv_name))) - self.assertTrue(os.path.exists(os.path.join(d, pkgname, venv_name, 'bin', 'activate'))) + self.assertTrue( + os.path.exists(os.path.join(d, pkgname, venv_name, "bin", "activate")) + ) diff --git a/tests/commands/test_purge.py b/tests/commands/test_purge.py index b9317ae..bcfbfcd 100644 --- a/tests/commands/test_purge.py +++ b/tests/commands/test_purge.py @@ -9,15 +9,13 @@ class TestCommandPurge(BaseTestCase): def test_purge_non_interactive(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) package_dir = get_package_dir(pkgname) self.assertTrue(os.path.exists(package_dir)) - result = runner.invoke(kecpkg, ['purge', pkgname, '--force']) + result = runner.invoke(kecpkg, ["purge", pkgname, "--force"]) self.assertFalse(os.path.exists(package_dir)) - - diff --git a/tests/commands/test_sign.py b/tests/commands/test_sign.py index d7ad186..d50fae2 100644 --- a/tests/commands/test_sign.py +++ b/tests/commands/test_sign.py @@ -38,77 +38,133 @@ TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT = "8D092FCC060BCC1E97CEC48987A177AAB2371E68" -@skipIf(six.PY2, reason="Skipping tests for python 2.7, as PGP signing cannot be provided") -@skipIf(not has_gpg(), reason='GPG not found on the system or python version is < 3.') +@skipIf( + six.PY2, reason="Skipping tests for python 2.7, as PGP signing cannot be provided" +) +@skipIf(not has_gpg(), reason="GPG not found on the system or python version is < 3.") class TestCommandSign(BaseTestCase): - def _import_test_key(self): with self.runner.isolated_filesystem() as d: - create_file('TESTKEY.asc', TEST_THIS_IS_NOT_A_SECRET_KEY__NO_REALLY_NOT) - self.runner.invoke(kecpkg, ['sign', '--import-key', 'TESTKEY.asc']) + create_file("TESTKEY.asc", TEST_THIS_IS_NOT_A_SECRET_KEY__NO_REALLY_NOT) + self.runner.invoke(kecpkg, ["sign", "--import-key", "TESTKEY.asc"]) def tearDown(self): super(TestCommandSign, self).tearDown() - self.runner.invoke(kecpkg, ['sign', '--delete-key', TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT]) + self.runner.invoke( + kecpkg, ["sign", "--delete-key", TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT] + ) def test_sign_list_keys(self): self._import_test_key() - result = self.runner.invoke(kecpkg, ['sign', '--list']) - self.assertIn(result.exit_code, [0, 1], "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke(kecpkg, ["sign", "--list"]) + self.assertIn( + result.exit_code, + [0, 1], + "Results of the run were: \n---\n{}\n---".format(result.output), + ) def test_import_key(self): with temp_chdir() as d: - create_file('TESTKEY.asc', TEST_THIS_IS_NOT_A_SECRET_KEY__NO_REALLY_NOT) - result = self.runner.invoke(kecpkg, ['sign', '--import-key', 'TESTKEY.asc']) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) + create_file("TESTKEY.asc", TEST_THIS_IS_NOT_A_SECRET_KEY__NO_REALLY_NOT) + result = self.runner.invoke(kecpkg, ["sign", "--import-key", "TESTKEY.asc"]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) # teardown - result = self.runner.invoke(kecpkg, ['sign', '--delete-key', TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke( + kecpkg, + ["sign", "--delete-key", TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT], + ) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) def test_delete_key(self): self._import_test_key() - result = self.runner.invoke(kecpkg, ['sign', '--delete-key', TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke( + kecpkg, ["sign", "--delete-key", TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT] + ) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) def test_delete_key_wrong_fingerprint(self): self._import_test_key() - result = self.runner.invoke(kecpkg, ['sign', '--delete-key', 'THISISAWRONGFINGERPRINT']) - self.assertEqual(result.exit_code, 1, "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke( + kecpkg, ["sign", "--delete-key", "THISISAWRONGFINGERPRINT"] + ) + self.assertEqual( + result.exit_code, + 1, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) def test_create_key(self): - result = self.runner.invoke(kecpkg, ['sign', '--create-key'], - input="Testing\n" - "KECPKG TESTING CREATE KEY\n" - "no-reply@ke-works.com\n" - "1\n" - "pass\n" - "pass\n") - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke( + kecpkg, + ["sign", "--create-key"], + input="Testing\n" + "KECPKG TESTING CREATE KEY\n" + "no-reply@ke-works.com\n" + "1\n" + "pass\n" + "pass\n", + ) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) keys = list_keys(get_gpg()) last_key = keys[-1] fingerprint = last_key[-1] - result = self.runner.invoke(kecpkg, ['sign', '--delete-key', fingerprint]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke(kecpkg, ["sign", "--delete-key", fingerprint]) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) def test_export_key(self): self._import_test_key() with self.runner.isolated_filesystem() as d: - result = self.runner.invoke(kecpkg, ['sign', - '--export-key', 'out.asc', - '--keyid', TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT]) - self.assertEqual(result.exit_code, 0, "Results of the run were: \n---\n{}\n---".format(result.output)) - self.assertExists('out.asc') - - -@skipIf(not has_gpg(), reason='GPG not found on the system or python version is < 3.') + result = self.runner.invoke( + kecpkg, + [ + "sign", + "--export-key", + "out.asc", + "--keyid", + TEST_THIS_IS_NOT_A_SECRET_KEY_FINGERPRINT, + ], + ) + self.assertEqual( + result.exit_code, + 0, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) + self.assertExists("out.asc") + + +@skipIf(not has_gpg(), reason="GPG not found on the system or python version is < 3.") @skipIf(six.PY3, reason="These tests are for python 2 only.") class TestCommandSign27(BaseTestCase): def test_sign_capability_unavailable(self): - result = self.runner.invoke(kecpkg, ['sign', '--list']) - self.assertEqual(result.exit_code, 1, "Results of the run were: \n---\n{}\n---".format(result.output)) + result = self.runner.invoke(kecpkg, ["sign", "--list"]) + self.assertEqual( + result.exit_code, + 1, + "Results of the run were: \n---\n{}\n---".format(result.output), + ) diff --git a/tests/commands/test_upload.py b/tests/commands/test_upload.py index 6dba6bb..22e51f8 100644 --- a/tests/commands/test_upload.py +++ b/tests/commands/test_upload.py @@ -10,50 +10,67 @@ @skipIf(running_on_ci(), reason="Test do not work on a CI environment") -@skipIf("os.getenv('KECHAIN_URL') is None", - reason="Skipping test as the KECHAIN_URL is not available as environment variable. Cannot upload kecpkg to " - "test this functionality. Provice a `.env` file locally to enable these tests.") +@skipIf( + "os.getenv('KECHAIN_URL') is None", + reason="Skipping test as the KECHAIN_URL is not available as environment variable. " + "Cannot upload kecpkg to " + "test this functionality. Provice a `.env` file locally to enable these tests.", +) class TestCommandUpload(BaseTestCase): def test_upload_non_interactive(self): - pkgname = 'new_pkg' + pkgname = "new_pkg" env = Env.read_envfile() - self.assertTrue(os.environ.get('KECHAIN_URL'), - "KECHAIN_URL is not set in environment, cannot perform this test") + self.assertTrue( + os.environ.get("KECHAIN_URL"), + "KECHAIN_URL is not set in environment, cannot perform this test", + ) with temp_chdir() as d: runner = CliRunner() - result = runner.invoke(kecpkg, ['new', pkgname, '--no-venv']) + result = runner.invoke(kecpkg, ["new", pkgname, "--no-venv"]) package_dir = get_package_dir(pkgname) self.assertTrue(os.path.exists(package_dir)) os.chdir(package_dir) - result = runner.invoke(kecpkg, ['build', pkgname]) + result = runner.invoke(kecpkg, ["build", pkgname]) self.assertEqual(result.exit_code, 0) - self.assertExists(os.path.join(package_dir, 'dist')) - pkgpath = os.path.join(package_dir, 'dist', '{}-0.0.1-py3.5.kecpkg'.format(pkgname)) + self.assertExists(os.path.join(package_dir, "dist")) + pkgpath = os.path.join( + package_dir, "dist", "{}-0.0.1-py3.5.kecpkg".format(pkgname) + ) self.assertExists(pkgpath) - result = runner.invoke(kecpkg, [ - 'upload', pkgname, - '--url', os.environ.get('KECHAIN_URL'), - '--token', os.environ.get('KECHAIN_TOKEN'), - '--scope-id', os.environ.get('KECHAIN_SCOPE_ID'), - '--kecpkg', pkgpath, - '--store' # store the service_id in the settings (for teardown) - ]) + result = runner.invoke( + kecpkg, + [ + "upload", + pkgname, + "--url", + os.environ.get("KECHAIN_URL"), + "--token", + os.environ.get("KECHAIN_TOKEN"), + "--scope-id", + os.environ.get("KECHAIN_SCOPE_ID"), + "--kecpkg", + pkgpath, + "--store", # store the service_id in the settings (for teardown) + ], + ) self.assertEqual(result.exit_code, 0) # teardown the just uploaded service from kecpkg.settings import load_settings + settings = load_settings(package_dir=get_package_dir(pkgname)) from pykechain import get_project + scope = get_project( - url=os.environ.get('KECHAIN_URL'), - token=os.environ.get('KECHAIN_TOKEN'), - scope_id=os.environ.get('KECHAIN_SCOPE_ID') + url=os.environ.get("KECHAIN_URL"), + token=os.environ.get("KECHAIN_TOKEN"), + scope_id=os.environ.get("KECHAIN_SCOPE_ID"), ) - service = scope.service(pk=settings.get('service_id')) + service = scope.service(pk=settings.get("service_id")) service.delete() diff --git a/tests/utils.py b/tests/utils.py index 9dd1828..b7ad023 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,7 +3,6 @@ from contextlib import contextmanager from unittest import TestCase -import six from click.testing import CliRunner @@ -12,7 +11,7 @@ def setUp(self): self.runner = CliRunner() def assertExists(self, path): - self.assertTrue(os.path.exists(path), "Path `{}` does not exists".format(path)) + self.assertTrue(os.path.exists(path), f"Path `{path}` does not exists") def is_travis(): @@ -23,28 +22,20 @@ def is_travis(): def is_python27(): """Predicate to determine if the runtime version of python is version 2.""" import sys + return sys.version_info <= (2, 7) @contextmanager def temp_chdir(cwd=None): - if six.PY3: - from tempfile import TemporaryDirectory - with TemporaryDirectory(prefix="kecpkg_") as tempwd: - origin = cwd or os.getcwd() - os.chdir(tempwd) - - try: - yield tempwd if os.path.exists(tempwd) else '' - finally: - os.chdir(origin) - else: - from tempfile import mkdtemp - tempwd = mkdtemp(prefix="kecpkg_") + from tempfile import TemporaryDirectory + + with TemporaryDirectory(prefix="kecpkg_") as tempwd: origin = cwd or os.getcwd() os.chdir(tempwd) + try: - yield tempwd if os.path.exists(tempwd) else '' + yield tempwd if os.path.exists(tempwd) else "" finally: os.chdir(origin) @@ -54,7 +45,7 @@ def touch_file(path): :param path: path (filename) """ - with open(path, 'a'): + with open(path, "a"): os.utime(path, None) @@ -65,17 +56,21 @@ def connected_to_internet(): # no cov return True try: # Test availability of DNS first - host = socket.gethostbyname('ke-works.com') + host = socket.gethostbyname("ke-works.com") # Test connection socket.create_connection((host, 80), 2) return True - except: + except OSError: return False def running_on_ci(): # no cov # type: () -> bool """If the system is running on a CI platform.""" - if os.environ.get('CI') or os.environ.get('TRAVIS') or os.environ.get('GITHUB_ACTIONS'): + if ( + os.environ.get("CI") + or os.environ.get("TRAVIS") + or os.environ.get("GITHUB_ACTIONS") + ): return True return False diff --git a/tox.ini b/tox.ini index 9d7b53b..93cca90 100644 --- a/tox.ini +++ b/tox.ini @@ -1,16 +1,17 @@ [tox] envlist = - py27, - py35, - py36, py37, - pypy, + py38, + py39, + py310, + py311, + py312, pypy3, dist_and_docs [travis] python = - 3.6: py36, dist_and_docs + 3.12: py312, dist_and_docs [testenv] passenv = * @@ -31,7 +32,7 @@ commands = [testenv:dist_and_docs] passenv = * -basepython = python3.6 +basepython = python3.12 deps = check-manifest readme_renderer