diff --git a/CHANGES b/CHANGES new file mode 100644 index 0000000..45228fa --- /dev/null +++ b/CHANGES @@ -0,0 +1 @@ +v0.1.0, Thu Feb 20 16:52:38 2020 -- Initial Release. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9561fb1 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include README.rst diff --git a/README.md b/README.md index f08412e..8fc5400 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # Airflow KubernetesJobOperator -An airflow job operator that executes a task as a Kubernetes job on a cluster, given -a job yaml configuration or an image uri. +An airflow job operator that executes a task as a Kubernetes job on a cluster, given a job yaml configuration or an image uri. ### Two operators are available: diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..af27a26 --- /dev/null +++ b/README.rst @@ -0,0 +1,6 @@ +airflow_kubernetes_job_operator +=============================== + +An airflow job operator that executes a task as a Kubernetes job on a cluster, given a job yaml configuration or an image uri. + +Please see README.md for more info. \ No newline at end of file diff --git a/src/__init__.py b/airflow_kubernetes_job_operator/__init__.py similarity index 100% rename from src/__init__.py rename to airflow_kubernetes_job_operator/__init__.py diff --git a/src/job_runner.py b/airflow_kubernetes_job_operator/job_runner.py similarity index 100% rename from src/job_runner.py rename to airflow_kubernetes_job_operator/job_runner.py diff --git a/src/kubernetes_job_operator.py b/airflow_kubernetes_job_operator/kubernetes_job_operator.py similarity index 100% rename from src/kubernetes_job_operator.py rename to airflow_kubernetes_job_operator/kubernetes_job_operator.py diff --git a/src/kubernetes_job_operator.py.default.yaml b/airflow_kubernetes_job_operator/kubernetes_job_operator.py.default.yaml similarity index 100% rename from src/kubernetes_job_operator.py.default.yaml rename to airflow_kubernetes_job_operator/kubernetes_job_operator.py.default.yaml diff --git a/src/kubernetes_legacy_job_operator.py b/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py similarity index 100% rename from src/kubernetes_legacy_job_operator.py rename to airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py diff --git a/src/watchers/__init__.py b/airflow_kubernetes_job_operator/watchers/__init__.py similarity index 100% rename from src/watchers/__init__.py rename to airflow_kubernetes_job_operator/watchers/__init__.py diff --git a/src/watchers/event_handler.py b/airflow_kubernetes_job_operator/watchers/event_handler.py similarity index 100% rename from src/watchers/event_handler.py rename to airflow_kubernetes_job_operator/watchers/event_handler.py diff --git a/src/watchers/threaded_kubernetes_resource_watchers.py b/airflow_kubernetes_job_operator/watchers/threaded_kubernetes_resource_watchers.py similarity index 100% rename from src/watchers/threaded_kubernetes_resource_watchers.py rename to airflow_kubernetes_job_operator/watchers/threaded_kubernetes_resource_watchers.py diff --git a/src/watchers/threaded_kubernetes_watch.py b/airflow_kubernetes_job_operator/watchers/threaded_kubernetes_watch.py similarity index 100% rename from src/watchers/threaded_kubernetes_watch.py rename to airflow_kubernetes_job_operator/watchers/threaded_kubernetes_watch.py diff --git a/experimental/core_tester/test_job_runner.py b/experimental/core_tester/test_job_runner.py index cc135f1..bee05d3 100644 --- a/experimental/core_tester/test_job_runner.py +++ b/experimental/core_tester/test_job_runner.py @@ -3,7 +3,7 @@ import yaml from utils import logging, load_raw_formatted_file from datetime import datetime -from src.job_runner import JobRunner +from airflow_kubernetes_job_operator.job_runner import JobRunner logging.basicConfig(level="INFO") CUR_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) diff --git a/experimental/core_tester/test_threaded_kubernetes_ns_watch.py b/experimental/core_tester/test_threaded_kubernetes_ns_watch.py index 4298040..5de5c16 100644 --- a/experimental/core_tester/test_threaded_kubernetes_ns_watch.py +++ b/experimental/core_tester/test_threaded_kubernetes_ns_watch.py @@ -2,7 +2,7 @@ import os import yaml from utils import logging -from src.watchers.threaded_kubernetes_resource_watchers import ( +from airflow_kubernetes_job_operator.watchers.threaded_kubernetes_resource_watchers import ( ThreadedKubernetesNamespaceResourcesWatcher, ) diff --git a/experimental/core_tester/test_threaded_kubernetes_watch.py b/experimental/core_tester/test_threaded_kubernetes_watch.py index 6bcf435..39dbed0 100644 --- a/experimental/core_tester/test_threaded_kubernetes_watch.py +++ b/experimental/core_tester/test_threaded_kubernetes_watch.py @@ -2,7 +2,7 @@ import os import yaml from utils import logging -from src.watchers.threaded_kubernetes_watch import ThreadedKubernetesWatchNamspeace +from airflow_kubernetes_job_operator.watchers.threaded_kubernetes_watch import ThreadedKubernetesWatchNamspeace logging.basicConfig(level="INFO") CUR_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) diff --git a/experimental/core_tester/test_threaded_kubernetes_watch_pod_log.py b/experimental/core_tester/test_threaded_kubernetes_watch_pod_log.py index a3ab947..95a2b01 100644 --- a/experimental/core_tester/test_threaded_kubernetes_watch_pod_log.py +++ b/experimental/core_tester/test_threaded_kubernetes_watch_pod_log.py @@ -1,7 +1,7 @@ import kubernetes import os from .utils import logging -from src.watchers.threaded_kubernetes_watch import ( +from airflow_kubernetes_job_operator.watchers.threaded_kubernetes_watch import ( ThreadedKubernetesWatchPodLog, ThreadedKubernetesWatchNamspeace, ) diff --git a/experimental/core_tester/utils.py b/experimental/core_tester/utils.py index 0c6c79d..0a974cb 100644 --- a/experimental/core_tester/utils.py +++ b/experimental/core_tester/utils.py @@ -9,6 +9,6 @@ def load_raw_formatted_file(fpath): text = "" with open(fpath, "r", encoding="utf-8") as src: - text = src.read() + text = airflow_kubernetes_job_operator.read() return text diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..3c6e79c --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[bdist_wheel] +universal=1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..015f626 --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + setup.py + ~~~~~~~~ + + An airflow job operator that executes a task as a Kubernetes job on a cluster, +given a job yaml configuration or an image uri. + + :copyright: (c) 2020 by zav. + :license: see LICENSE for more details. +""" + +import codecs +import os +import re +from setuptools import setup + +here = os.path.abspath(os.path.dirname(__file__)) + +setup( + name="airflow_kubernetes_job_operator", + version="0.1.0", + description="An airflow job operator that executes a task as a Kubernetes job on a cluster, given a job yaml configuration or an image uri.", + long_description="Please see readme.md", + classifiers=[], + author="Zav Shotan", + author_email="", + url="https://github.com/LamaAni/KubernetesJobOperator", + packages=["airflow_kubernetes_job_operator"], + platforms="any", + license="LICENSE", + install_requires=["PyYAML>=5.0", "kubernetes>=9.0.0", "urllib3>=1.25.0"], + python_requires=">=3.6", +) diff --git a/src/utils.py b/src/utils.py deleted file mode 100644 index 755da87..0000000 --- a/src/utils.py +++ /dev/null @@ -1,76 +0,0 @@ -import random -import re - - -def get_yaml_path_value(yaml: dict, path_names): - value = yaml - cur_path = [] - for name in path_names: - cur_path.append(name) - path_string = ".".join(map(lambda v: str(v), path_names)) - - if isinstance(value, dict): - assert name in value, "Missing path:" + path_string - elif isinstance(value, list): - assert len(value) > name, "Missing path:" + path_string - else: - raise Exception("Expected path " + path_string + " to be a list or a dictionary") - - value = value[name] - return value - - -def set_yaml_path_value(yaml: dict, path_names: list, value, if_not_exists=False): - name_to_set = path_names[-1] - col = get_yaml_path_value(yaml, path_names[:-1]) - - if isinstance(col, list): - assert isinstance(name_to_set, int), "To set a list value you must have an integer key." - if name_to_set > -1 and len(col) > name_to_set: - if if_not_exists: - return - col[name_to_set] = value - else: - col.append(value) - else: - if if_not_exists and name_to_set in col: - return - col[name_to_set] = value - - -def randomString(stringLength=10): - """Create a random string - - Keyword Arguments: - stringLength {int} -- The length of the string (default: {10}) - - Returns: - string -- A random string - """ - letters = "abcdefghijklmnopqrstvwxyz0123456789" - return "".join(random.choice(letters) for i in range(stringLength)) - - -def to_kubernetes_valid_name(name, max_length=50, start_trim_offset=10): - """Returns a kubernetes valid name, and truncates, after a start - offset, any exccess chars. - - Arguments: - name {[type]} -- [description] - - Keyword Arguments: - max_length {int} -- [description] (default: {50}) - start_trim_offset {int} -- [description] (default: {10}) - - Returns: - [type] -- [description] - """ - assert start_trim_offset < max_length, "start_trim_offset must be smaller then max_length" - name = re.sub(r"[^a-z0-9]", "-", name.lower()) - - if len(name) > max_length: - first_part = name[0:start_trim_offset] if start_trim_offset > 0 else "" - second_part = name[start_trim_offset:] - second_part = second_part[-max_length + start_trim_offset + 2 :] - name = first_part + "--" + second_part - return name diff --git a/tests/airflow-webserver.pid b/tests/airflow-webserver.pid deleted file mode 100644 index e4985ee..0000000 --- a/tests/airflow-webserver.pid +++ /dev/null @@ -1 +0,0 @@ -9260 diff --git a/tests/dags/test_job_operator.py b/tests/dags/test_job_operator.py index 6efb3a0..2e4033d 100644 --- a/tests/dags/test_job_operator.py +++ b/tests/dags/test_job_operator.py @@ -1,5 +1,5 @@ from airflow import DAG -from src.kubernetes_job_operator import KubernetesJobOperator +from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator from airflow.utils.dates import days_ago default_args = {"owner": "tester", "start_date": days_ago(2), "retries": 0} diff --git a/tests/dags/test_legacy_job_operator.py b/tests/dags/test_legacy_job_operator.py index dde27e1..02a9ff6 100644 --- a/tests/dags/test_legacy_job_operator.py +++ b/tests/dags/test_legacy_job_operator.py @@ -1,5 +1,5 @@ from airflow import DAG -from src.kubernetes_legacy_job_operator import KubernetesLegacyJobOperator +from airflow_kubernetes_job_operator.kubernetes_legacy_job_operator import KubernetesLegacyJobOperator # from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago diff --git a/tests/unittests.cfg b/tests/unittests.cfg index 47a09a7..497c483 100644 --- a/tests/unittests.cfg +++ b/tests/unittests.cfg @@ -1,25 +1,26 @@ [core] unit_test_mode = True -dags_folder = /c/Code/repos/KubernetesJobOperator/tests/dags -plugins_folder = /c/Code/repos/KubernetesJobOperator/tests/plugins -base_log_folder = /c/Code/repos/KubernetesJobOperator/tests/logs +dags_folder = /mnt/c/code/zav_public/KubernetesJobOperator/tests/dags +plugins_folder = /mnt/c/code/zav_public/KubernetesJobOperator/tests/plugins +base_log_folder = /mnt/c/code/zav_public/KubernetesJobOperator/tests/logs logging_level = INFO fab_logging_level = WARN log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log log_processor_filename_template = {{ filename }}.log -dag_processor_manager_log_location = /c/Code/repos/KubernetesJobOperator/tests/logs/dag_processor_manager/dag_processor_manager.log +dag_processor_manager_log_location = /mnt/c/code/zav_public/KubernetesJobOperator/tests/logs/dag_processor_manager/dag_processor_manager.log executor = SequentialExecutor -sql_alchemy_conn = sqlite:////c/Code/repos/KubernetesJobOperator/tests/unittests.db +sql_alchemy_conn = sqlite:////mnt/c/code/zav_public/KubernetesJobOperator/tests/unittests.db load_examples = True donot_pickle = False dag_concurrency = 16 dags_are_paused_at_creation = False -fernet_key = iGswVuTlT6qH8ybxuPPiof7RzaHFg5PSf6_moHYXyyU= +fernet_key = huuYOpfjB9zLrQG1Hk2K-HxdwiHDRbXshbAtqrsQSIk= enable_xcom_pickling = False killed_task_cleanup_time = 5 secure_mode = False hostname_callable = socket:getfqdn worker_precheck = False +default_task_retries = 0 [cli] api_client = airflow.api.client.local_client @@ -94,5 +95,10 @@ host = log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} end_of_log_mark = end_of_log +[elasticsearch_configs] + +use_ssl = False +verify_certs = True + [kubernetes] dags_volume_claim = default \ No newline at end of file