Skip to content

Commit

Permalink
Merge pull request #3014 from open-formulieren/feature/2927-celery-pr…
Browse files Browse the repository at this point in the history
…obes

[#2927] enable readiness + liveness check for celery worker
  • Loading branch information
sergei-maertens authored May 1, 2023
2 parents 8951673 + 45fee0a commit c819217
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ COPY ./bin/celery_worker.sh /celery_worker.sh
COPY ./bin/celery_beat.sh /celery_beat.sh
COPY ./bin/celery_flower.sh /celery_flower.sh
COPY ./bin/dump_configuration.sh /dump_configuration.sh
RUN mkdir /app/bin /app/log /app/media /app/private_media /app/certifi_ca_bundle
RUN mkdir /app/bin /app/log /app/media /app/private_media /app/certifi_ca_bundle /app/tmp
COPY ./bin/check_celery_worker_liveness.py ./bin/check_celery_worker_liveness.py

# prevent writing to the container layer, which would degrade performance.
# This also serves as a hint for the intended volumes.
Expand Down
8 changes: 7 additions & 1 deletion bin/celery_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ CONCURRENCY=${CELERY_WORKER_CONCURRENCY:-1}
QUEUE=${1:-${CELERY_WORKER_QUEUE:=celery}}
WORKER_NAME=${2:-${CELERY_WORKER_NAME:="${QUEUE}"@%n}}

_binary=$(which celery)

if [[ "$ENABLE_COVERAGE" ]]; then
_binary="coverage run $_binary"
fi

echo "Starting celery worker $WORKER_NAME with queue $QUEUE"
exec celery --workdir src --app openforms.celery worker \
exec $_binary --workdir src --app openforms.celery worker \
-Q $QUEUE \
-n $WORKER_NAME \
-l $LOGLEVEL \
Expand Down
62 changes: 62 additions & 0 deletions bin/check_celery_worker_liveness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env python
#
# Check the health of a Celery worker.
#
# The worker process writes and periodically touches a number of files that indicate it
# is available and still healthy. If the worker becomes unhealthy for any reason, the
# timestamp of when the heartbeat file was last touched will not update and the delta
# becomes too big, allowing (container) orchestration to terminate and restart the
# worker process.
#
# Example usage with Kubernetes, as a liveness probe:
#
# .. code-block:: yaml
#
# livenessProbe:
# exec:
# command:
# - python
# - /app/bin/check_celery_worker_liveness.py
# initialDelaySeconds: 10
# periodSeconds: 30 # must be smaller than `MAX_WORKER_LIVENESS_DELTA`
#
# Reference: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
#
# Supported environment variables:
#
# * ``MAX_WORKER_LIVENESS_DELTA``: maximum delta between heartbeats before reporting
# failure, in seconds. Defaults to 60 (one minute).


import os
import sys
import time
from pathlib import Path

HEARTBEAT_FILE = Path(__file__).parent.parent / "tmp" / "celery_worker_heartbeat"
READINESS_FILE = Path(__file__).parent.parent / "tmp" / "celery_worker_ready"
MAX_WORKER_LIVENESS_DELTA = int(os.getenv("MAX_WORKER_LIVENESS_DELTA", 60)) # seconds


# check if worker is ready
if not READINESS_FILE.is_file():
print("Celery worker not ready.")
sys.exit(1)

# check if worker is live
if not HEARTBEAT_FILE.is_file():
print("Celery worker heartbeat not found.")
sys.exit(1)

# check if worker heartbeat satisfies constraint
stats = HEARTBEAT_FILE.stat()
worker_timestamp = stats.st_mtime
current_timestamp = time.time()
time_diff = current_timestamp - worker_timestamp

if time_diff > MAX_WORKER_LIVENESS_DELTA:
print("Celery worker heartbeat: interval exceeds constraint (60s).")
sys.exit(1)

print("Celery worker heartbeat found: OK.")
sys.exit(0)
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ services:
image: openformulieren/open-forms:${TAG:-latest}
environment: *web_env
command: /celery_worker.sh
healthcheck:
test: ["CMD", "python", "/app/bin/check_celery_worker_liveness.py"]
interval: 30s
timeout: 5s
retries: 3
start_period: 10s
depends_on:
- db
- redis
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ known_first_party=openforms
sections=FUTURE,STDLIB,DJANGO,THIRDPARTY,FIRSTPARTY,LOCALFOLDER

[coverage:run]
parallel = True
branch = True
source = src
omit =
Expand Down
46 changes: 45 additions & 1 deletion src/openforms/celery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path

from django.conf import settings

from celery import Celery
from celery import Celery, bootsteps
from celery.signals import worker_ready, worker_shutdown

from .setup import setup_env

Expand All @@ -17,3 +20,44 @@
}

app.autodiscover_tasks()

HEARTBEAT_FILE = Path(settings.BASE_DIR) / "tmp" / "celery_worker_heartbeat"
READINESS_FILE = Path(settings.BASE_DIR) / "tmp" / "celery_worker_ready"


#
# Utilities for checking the health of celery workers
#
class LivenessProbe(bootsteps.StartStopStep):
requires = {"celery.worker.components:Timer"}

def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None

def start(self, worker):
self.tref = worker.timer.call_repeatedly(
10.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)

def stop(self, worker):
HEARTBEAT_FILE.unlink(missing_ok=True)

def update_heartbeat_file(self, worker):
HEARTBEAT_FILE.touch()


@worker_ready.connect
def worker_ready(**_):
READINESS_FILE.touch()


@worker_shutdown.connect
def worker_shutdown(**_):
READINESS_FILE.unlink(missing_ok=True)


app.steps["worker"].add(LivenessProbe)
63 changes: 63 additions & 0 deletions src/openforms/tests/test_celery_health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import subprocess
import time
from pathlib import Path

from django.conf import settings
from django.test import TestCase

from openforms.celery import READINESS_FILE, app

START_WORKER_SCRIPT = Path(settings.BASE_DIR) / "bin" / "celery_worker.sh"


class CeleryTest(TestCase):
def setUp(self):
def shutdown_celery():
app.control.shutdown()
if READINESS_FILE.is_file():
READINESS_FILE.unlink(missing_ok=True)

self.addCleanup(shutdown_celery)

def test_celery_worker_health_check(self):
"""Assert that READINESS_FILE exists after worker has started but not before and not after
the shutdown
"""
assert (
not READINESS_FILE.is_file()
), "Celery worker not started but READINESS_FILE found"

# start Celery worker
process = subprocess.Popen(
[START_WORKER_SCRIPT],
cwd=settings.BASE_DIR,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={**os.environ, "ENABLE_COVERAGE": os.environ.get("COVERAGE_RUN", "")},
)

# wait for READINESS_FILE to be created, break out as soon as possible
start = time.time()
while (time.time() - start) <= 60:
if READINESS_FILE.is_file():
break
# wait a bit longer...
time.sleep(1)
else:
self.fail("READINESS_FILE was not created within 60 seconds")

# stop the worker process
process.terminate() # sends SIGTERM, (warm) shutting down the worker.
process.wait(timeout=60) # wait for process to terminate

# now assert that the READINESS FILE was deleted as part of the shutdown
# procedure
start = time.time()
while (time.time() - start) <= 60:
if not READINESS_FILE.is_file():
break
# wait a bit longer...
time.sleep(1)
else:
self.fail("READINESS_FILE was not cleaned up within 60 seconds")
2 changes: 2 additions & 0 deletions tmp/.gitkeep
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This folder contains temporary files generated by the project,
# e.g. files for testing the readiness/liveness of celery workers

0 comments on commit c819217

Please sign in to comment.