Skip to content

Commit

Permalink
Merge pull request #558 from opensafely-core/tick-metrics
Browse files Browse the repository at this point in the history
feat: track memory usage in tick traces.
  • Loading branch information
bloodearnest authored Jan 19, 2023
2 parents dacc6fe + 9982154 commit 5e3e981
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 190 deletions.
50 changes: 0 additions & 50 deletions jobrunner/cli/extract_stats.py

This file was deleted.

4 changes: 0 additions & 4 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,6 @@ def parse_job_resource_weights(config_file):
JOB_RESOURCE_WEIGHTS = parse_job_resource_weights("job-resource-weights.ini")


STATS_DATABASE_FILE = os.environ.get("STATS_DATABASE_FILE")
if STATS_DATABASE_FILE:
STATS_DATABASE_FILE = Path(STATS_DATABASE_FILE)

STATS_POLL_INTERVAL = float(os.environ.get("STATS_POLL_INTERVAL", "10"))
MAINTENANCE_POLL_INTERVAL = float(
os.environ.get("MAINTENANCE_POLL_INTERVAL", "300")
Expand Down
19 changes: 6 additions & 13 deletions jobrunner/lib/docker_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,26 @@
DEFAULT_TIMEOUT = 10


def get_volume_and_container_sizes(timeout=DEFAULT_TIMEOUT):
response = subprocess_run(
["docker", "system", "df", "--verbose", "--format", "{{json .}}"],
capture_output=True,
check=True,
timeout=timeout,
)
data = json.loads(response.stdout)
volumes = {row["Name"]: _parse_size(row["Size"]) for row in data["Volumes"]}
containers = {row["Names"]: _parse_size(row["Size"]) for row in data["Containers"]}
return volumes, containers
def get_job_stats(timeout=DEFAULT_TIMEOUT):
# TODO: add volume sizes
return get_container_stats(DEFAULT_TIMEOUT)


def get_container_stats(timeout=DEFAULT_TIMEOUT):
response = subprocess_run(
["docker", "stats", "--no-stream", "--format", "{{json .}}"],
["docker", "stats", "--no-stream", "--no-trunc", "--format", "{{json .}}"],
capture_output=True,
check=True,
timeout=timeout,
)
data = [json.loads(line) for line in response.stdout.splitlines()]
return {
row["Name"]: {
row["Name"].lstrip("os-job-"): {
"cpu_percentage": float(row["CPUPerc"].rstrip("%")),
"memory_used": _parse_size(row["MemUsage"].split()[0]),
}
for row in data
if row["Name"].startswith("os-job-")
}


Expand Down
85 changes: 19 additions & 66 deletions jobrunner/record_stats.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""
Super crude docker/system stats logger
"""
import datetime
import json
import logging
import sqlite3
import subprocess
import sys
import time
Expand All @@ -13,76 +10,25 @@

from jobrunner import config, models, tracing
from jobrunner.lib import database
from jobrunner.lib.docker_stats import (
get_container_stats,
get_volume_and_container_sizes,
)
from jobrunner.lib.docker_stats import get_job_stats
from jobrunner.lib.log_utils import configure_logging


SCHEMA_SQL = """
CREATE TABLE stats (
timestamp TEXT,
data TEXT
);
"""


log = logging.getLogger(__name__)
tracer = trace.get_tracer("ticks")


def main():
database_file = config.STATS_DATABASE_FILE
if not database_file:
log.info("STATS_DATABASE_FILE not set; not polling for system stats")
return
log.info(f"Logging system stats to: {database_file}")
connection = get_database_connection(database_file)
last_run = None
while True:
before = time.time()
last_run = record_tick_trace(last_run)
log_stats(connection)
time.sleep(config.STATS_POLL_INTERVAL)


def get_database_connection(filename):
filename.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(filename)
# Enable autocommit
conn.isolation_level = None
schema_count = list(conn.execute("SELECT COUNT(*) FROM sqlite_master"))[0][0]
if schema_count == 0:
conn.executescript(SCHEMA_SQL)
return conn


def log_stats(connection):
try:
stats = get_all_stats()
# If no containers are running then don't log anything
if not stats["containers"]:
return
timestamp = datetime.datetime.utcnow().isoformat()
connection.execute(
"INSERT INTO stats (timestamp, data) VALUES (?, ?)",
[timestamp, json.dumps(stats)],
)
except subprocess.TimeoutExpired:
log.exception("Getting docker stats timed out")


def get_all_stats():
volume_sizes, container_sizes = get_volume_and_container_sizes()
containers = get_container_stats()
for name, container in containers.items():
container["disk_used"] = container_sizes.get(name)
return {
"containers": containers,
"volumes": volume_sizes,
}


tracer = trace.get_tracer("ticks")
# record_tick_trace might have take a while, so sleep the remainding interval
# enforce a minimum time of 3s to ensure we don't hammer honeycomb or
# the docker api
elapsed = time.time() - before
time.sleep(max(2, config.STATS_POLL_INTERVAL - elapsed))


def record_tick_trace(last_run):
Expand All @@ -96,10 +42,17 @@ def record_tick_trace(last_run):
Not that this will emit number of active jobs + 1 events every call, so we
don't want to call it on too tight a loop.
"""
now = time.time_ns()

if last_run is None:
return now
return time.time_ns()

try:
stats = get_job_stats()
except subprocess.TimeoutExpired:
log.exception("Getting docker stats timed out")

# record time once stats call has completed, as it can take a while
now = time.time_ns()

# every span has the same timings
start_time = last_run
Expand All @@ -112,8 +65,8 @@ def record_tick_trace(last_run):
with tracer.start_as_current_span("TICK", start_time=start_time):
for job in active_jobs:
span = tracer.start_span(job.status_code.name, start_time=start_time)
# TODO add cpu/memory as attributes?
tracing.set_span_metadata(span, job, tick=True)
metrics = stats.get(job.id, {})
tracing.set_span_metadata(span, job, **metrics)
span.end(end_time)

return end_time
Expand Down
42 changes: 5 additions & 37 deletions tests/lib/test_docker_stats.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,11 @@
import time

import pytest

from jobrunner.lib import docker
from jobrunner.lib.docker_stats import (
get_container_stats,
get_volume_and_container_sizes,
)
from jobrunner.lib import docker, docker_stats


@pytest.mark.needs_docker
# This runs fine locally but fails in CI, despite the retry logic added below.
# Don't have time to diagnose this properly and this isn't core functionality
# in any case
@pytest.mark.xfail
def test_get_container_stats(docker_cleanup):
docker.run("test_container1", [docker.MANAGEMENT_CONTAINER_IMAGE, "sh"])
# It can sometimes take a while before the container actually appears :(
for _ in range(10):
containers = get_container_stats()
if "test_container1" not in containers:
time.sleep(1)
else:
break
assert containers["test_container1"] == {"cpu_percentage": 0, "memory_used": 0}


@pytest.mark.needs_docker
@pytest.mark.slow_test
def test_get_volume_and_container_sizes(tmp_path, docker_cleanup):
half_meg_file = tmp_path / "halfmeg"
half_meg_file.write_bytes(b"0" * 500000)
docker.create_volume("test_volume1")
docker.copy_to_volume("test_volume1", half_meg_file, "halfmeg")
docker.run(
"test_container2",
[docker.MANAGEMENT_CONTAINER_IMAGE, "cp", "/workspace/halfmeg", "/halfmeg"],
volume=("test_volume1", "/workspace"),
)
volumes, containers = get_volume_and_container_sizes()
assert volumes["test_volume1"] == 500000
assert containers["test_container2"] == 500000
docker.run("os-job-test", [docker.MANAGEMENT_CONTAINER_IMAGE, "sleep", "10"])
containers = docker_stats.get_container_stats()
assert isinstance(containers["test"]["cpu_percentage"], float)
assert isinstance(containers["test"]["memory_used"], int)
36 changes: 16 additions & 20 deletions tests/test_record_stats.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import logging
import subprocess

from jobrunner import record_stats
from jobrunner.models import State, StatusCode
from tests.conftest import get_trace
from tests.factories import job_factory


def test_record_tick_trace(db, freezer):

def test_record_tick_trace(db, freezer, monkeypatch):
jobs = []
jobs.append(job_factory(status_code=StatusCode.CREATED))
jobs.append(job_factory(status_code=StatusCode.WAITING_ON_DEPENDENCIES))
jobs.append(job_factory(status_code=StatusCode.PREPARING))
jobs.append(job_factory(status_code=StatusCode.EXECUTING))
running_job = job_factory(status_code=StatusCode.EXECUTING)
jobs.append(running_job)
jobs.append(job_factory(status_code=StatusCode.FINALIZING))

metrics = {
running_job.id: {
"cpu_percentage": 50.0,
"memory_used": 1000,
}
}

monkeypatch.setattr(record_stats, "get_job_stats", lambda: metrics)

# this should not be tick'd
job_factory(state=State.SUCCEEDED, status_code=StatusCode.SUCCEEDED)

Expand All @@ -38,21 +44,11 @@ def test_record_tick_trace(db, freezer):
assert span.name == job.status_code.name
assert span.start_time == last_run1
assert span.end_time == last_run2
assert span.attributes["tick"] is True
assert span.attributes["job"] == job.id
assert span.parent.span_id == root.context.span_id

assert "SUCCEEDED" not in [s.name for s in spans]


def test_log_stats(db, caplog, monkeypatch):
def error():
raise subprocess.TimeoutExpired("test me", 10)

caplog.set_level(logging.INFO)
monkeypatch.setattr(record_stats, "get_all_stats", error)
if job is running_job:
assert span.attributes["cpu_percentage"] == 50.0
assert span.attributes["memory_used"] == 1000

record_stats.log_stats(None)

assert caplog.records[-1].msg == "Getting docker stats timed out"
assert "test me" in caplog.records[-1].exc_text
assert "SUCCEEDED" not in [s.name for s in spans]

0 comments on commit 5e3e981

Please sign in to comment.