A small development environment for airflow. Follow the instructions to start a local airflow environment. As soon as
the local airflow starts, any changes inside dags
or plugins
directory will be automatically reflected (no need to
restart!).
- docker & docker compose
At the root of the project run
docker compose -f build/docker-compose.airflow.yml up
After a while you can access airflow at http://localhost:8080, with
username/password user
/bitnami123
.
Create a secret with name airflow
:
kubectx --kubeconfig=./build/kubeconfig.yaml create secret generic airflow
Secret only needs to be created once.
To add values to the secret run:
kubectl --kubeconfig=./build/kubeconfig.yaml edit secret airflow
Specify the values you'd like to add. More details on how to manage secrets is available here.
This is an example DAG that can be used for testing installation. Just copy the code in a python file under airflow/dags/mattermost_dags.
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_kubernetes_dag():
"""
### Example kubernetes operator DAG.
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API together with Kubernetes operator. Uses the k3s cluster deployed using docker
compose.
Based on TaskFlow API examples available
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html).
"""
@task.kubernetes(
image="python:3.8-slim-buster",
name="k8s_test",
namespace="default",
in_cluster=False,
config_file="/opt/kube/airflow-kube.yaml",
)
def execute_in_k8s_pod():
import time
print("Hello from k8s pod")
time.sleep(2)
@task.kubernetes(image="python:3.8-slim-buster", namespace="default", in_cluster=False, config_file="/opt/kube/airflow-kube.yaml",)
def print_pattern():
n = 5
for i in range(n):
# inner loop to handle number of columns
# values changing acc. to outer loop
for j in range(i + 1):
# printing stars
print("* ", end="")
# ending line after each row
print("\r")
execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()
example_kubernetes_dag()
Tasks need to define the following configuration while running the tasks locally:
in_cluster
toFalse
andconfig_file``/opt/kube/airflow-kube.yaml
.
This is not required if airflow is running inside a Kubernetes cluster.