diff --git a/tests/dags/templates/test_job.fast_log.yaml b/tests/dags/templates/test_job.fast_log.yaml new file mode 100755 index 0000000..518cdf1 --- /dev/null +++ b/tests/dags/templates/test_job.fast_log.yaml @@ -0,0 +1,29 @@ +apiVersion: batch/v1 +kind: Job +metadata: {} +spec: + template: + spec: + restartPolicy: Never + containers: + - name: job-executor + image: ubuntu + command: + - bash + - -c + - | + #/bin/bash + echo "Fast print in $SLEEP_INTERVAL seconds" + sleep $SLEEP_INTERVAL + for i in "$(seq 1 $LINE_COUNT)"; do + echo "$i" + done + sleep $SLEEP_INTERVAL + echo "Done" + env: + - name: LINE_COUNT + value: '10000' + + - name: SLEEP_INTERVAL + value: '5' + backoffLimit: 0 diff --git a/tests/dags/test_fast_log.py b/tests/dags/test_fast_log.py new file mode 100755 index 0000000..c9947d3 --- /dev/null +++ b/tests/dags/test_fast_log.py @@ -0,0 +1,37 @@ +from utils import default_args, name_from_file +from airflow import DAG +from airflow_kubernetes_job_operator.kubernetes_job_operator import ( + KubernetesJobOperator, +) + +dag = DAG( + name_from_file(__file__), + default_args=default_args, + description="Test base job operator", + schedule_interval=None, + catchup=False, +) + +namespace = None + +envs = { + "PASS_ARG": "a test", + "JINJA_ENV": "{{ ds }}", +} + +default_delete_policy = "IfSucceeded" + +# Job +KubernetesJobOperator( + task_id="test-pod-success", + namespace=namespace, + body_filepath="./templates/test_job.fast_log.yaml", + envs=envs, + dag=dag, + delete_policy=default_delete_policy, +) + +if __name__ == "__main__": + from tests.test_utils import test_dag + + test_dag(dag)