Skip to content

Commit

Permalink
feat: run metrics improvements
Browse files Browse the repository at this point in the history
- better handling of cgroup limits edge cases
- don't push invalid memory_limit
- remove low value cfs metrics
- push calculated cpu usage rate for future use
  • Loading branch information
mlarose committed Sep 13, 2024
1 parent 4287757 commit d1b32ad
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from dagster._core.execution.telemetry import RunTelemetryData
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun
from dagster._utils.container import retrieve_containerized_utilization_metrics
from dagster._utils.container import (
UNCONSTRAINED_CGROUP_MEMORY_LIMIT,
retrieve_containerized_utilization_metrics,
)

DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 30.0
DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 15
Expand Down Expand Up @@ -63,41 +66,48 @@ def _get_container_metrics(
cpu_usage = metrics.get("cpu_usage")
cpu_usage_ms = None
if cpu_usage is not None:
cpu_usage_ms = cpu_usage * 1000 # convert from seconds to milliseconds
cpu_usage_ms = check.not_none(cpu_usage) * 1000 # convert from seconds to milliseconds

cpu_limit_ms = None
if cpu_quota_us and cpu_quota_us > 0 and cpu_period_us and cpu_period_us > 0:
# Why the 1000 factor is a bit counterintuitive:
# quota / period -> fraction of cpu per unit of time
# 1000 * quota / period -> ms/sec of cpu
cpu_limit_ms = (1000.0 * cpu_quota_us) / cpu_period_us
cpu_limit_ms = (1000.0 * check.not_none(cpu_quota_us)) / check.not_none(cpu_period_us)

cpu_usage_rate_ms = None
measurement_timestamp = metrics.get("measurement_timestamp")
if (
previous_cpu_usage_ms
and cpu_usage_ms
and previous_measurement_timestamp
if (previous_cpu_usage_ms and cpu_usage_ms and cpu_usage_ms > previous_cpu_usage_ms) and (
previous_measurement_timestamp
and measurement_timestamp
and measurement_timestamp > previous_measurement_timestamp
):
cpu_usage_rate_ms = (cpu_usage_ms - previous_cpu_usage_ms) / (
measurement_timestamp - previous_measurement_timestamp
)

cpu_percent = None
if cpu_limit_ms and cpu_limit_ms > 0 and cpu_usage_rate_ms and cpu_usage_rate_ms > 0:
cpu_percent = 100.0 * cpu_usage_rate_ms / cpu_limit_ms
if (cpu_limit_ms and cpu_limit_ms > 0) and (cpu_usage_rate_ms and cpu_usage_rate_ms > 0):
cpu_percent = 100.0 * check.not_none(cpu_usage_rate_ms) / check.not_none(cpu_limit_ms)

memory_percent = None
memory_limit = metrics.get("memory_limit")
memory_limit = None
cgroup_memory_limit = metrics.get("memory_limit")
memory_usage = metrics.get("memory_usage")
if memory_limit and memory_limit > 0 and memory_usage and memory_usage > 0:
memory_percent = 100.0 * memory_usage / memory_limit
if (cgroup_memory_limit and 0 < cgroup_memory_limit < UNCONSTRAINED_CGROUP_MEMORY_LIMIT) and (
memory_usage and memory_usage >= 0
):
memory_limit = cgroup_memory_limit
memory_percent = 100.0 * check.not_none(memory_usage) / check.not_none(memory_limit)

return {
# TODO - eventually we should replace and remove cpu_usage_ms and only send cpu_usage_rate_ms
# We need to ensure that the UI and GraphQL queries can handle the change
# and that we have 15 days of runs to ensure continuity.
# Why? the derivative calculated by datadog on cpu_usage_ms is sometimes wonky, resulting
# in a perplexing graph for the end user.
"container.cpu_usage_ms": cpu_usage_ms,
"container.cpu_cfs_period_us": cpu_period_us,
"container.cpu_cfs_quota_us": cpu_quota_us,
"container.cpu_usage_rate_ms": cpu_usage_rate_ms,
"container.cpu_limit_ms": cpu_limit_ms,
"container.cpu_percent": cpu_percent,
"container.memory_usage": memory_usage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dagster._core.execution.run_metrics_thread as run_metrics_thread
from dagster import DagsterInstance, DagsterRun
from dagster._core.execution.telemetry import RunTelemetryData
from dagster._utils.container import UNCONSTRAINED_CGROUP_MEMORY_LIMIT
from pytest import fixture, mark


Expand Down Expand Up @@ -56,9 +57,8 @@ def test_get_container_metrics(mock_containerized_utilization_metrics):
assert metrics
assert isinstance(metrics, dict)
assert metrics["container.cpu_usage_ms"] == 50000
assert metrics["container.cpu_usage_rate_ms"] == 80
assert metrics["container.cpu_limit_ms"] == 100
assert metrics["container.cpu_cfs_period_us"] == 10000
assert metrics["container.cpu_cfs_quota_us"] == 1000
assert metrics["container.memory_usage"] == 16384
assert metrics["container.memory_limit"] == 65536
assert metrics["container.memory_percent"] == 25
Expand All @@ -81,20 +81,58 @@ def test_get_container_metrics_missing_limits(mock_containerized_utilization_met
return_value=mock_containerized_utilization_metrics,
):
# should not throw
metrics = run_metrics_thread._get_container_metrics() # noqa: SLF001
metrics = run_metrics_thread._get_container_metrics( # noqa: SLF001
previous_cpu_usage_ms=49200, previous_measurement_timestamp=1000
)

assert metrics
assert isinstance(metrics, dict)
assert metrics["container.cpu_usage_ms"] == 50000
assert metrics["container.cpu_usage_rate_ms"] == 80 # see previous test for calculation
assert metrics["container.cpu_percent"] is None
assert metrics["container.cpu_limit_ms"] is None
assert metrics["container.cpu_cfs_period_us"] is None
assert metrics["container.cpu_cfs_quota_us"] is None
assert metrics["container.memory_usage"] == 16384
assert metrics["container.memory_limit"] is None
assert metrics["container.memory_percent"] is None


@mark.parametrize(
"cpu_cfs_quota_us, cpu_cfs_period_us, cgroup_memory_limit, expected_cpu_limit_ms, expected_memory_limit",
[
(None, None, None, None, None),
(0, 0, 0, None, None),
(10, -1, -1, None, None),
(-1, 10, UNCONSTRAINED_CGROUP_MEMORY_LIMIT, None, None),
],
)
def test_get_container_metrics_edge_conditions(
mock_containerized_utilization_metrics,
cpu_cfs_quota_us,
cpu_cfs_period_us,
cgroup_memory_limit,
expected_cpu_limit_ms,
expected_memory_limit,
):
"""These limits are not valid if none, negative or zero values are provided and we should ignore them."""
mock_containerized_utilization_metrics["cpu_cfs_quota_us"] = cpu_cfs_quota_us
mock_containerized_utilization_metrics["cpu_cfs_period_us"] = cpu_cfs_period_us
mock_containerized_utilization_metrics["memory_limit"] = cgroup_memory_limit

with patch(
"dagster._core.execution.run_metrics_thread.retrieve_containerized_utilization_metrics",
return_value=mock_containerized_utilization_metrics,
):
# should not throw
metrics = run_metrics_thread._get_container_metrics( # noqa: SLF001
previous_cpu_usage_ms=49200, previous_measurement_timestamp=1000
)

assert metrics
assert isinstance(metrics, dict)
assert metrics["container.cpu_limit_ms"] == expected_cpu_limit_ms
assert metrics["container.memory_limit"] == expected_memory_limit


@mark.parametrize(
"test_case, platform_name, is_file, readable, link_path, expected",
[
Expand Down Expand Up @@ -164,6 +202,25 @@ def test_start_run_metrics_thread(dagster_instance, dagster_run, mock_container_
assert "Shutting down metrics capture thread" in caplog.messages[-1]


def test_start_run_metrics_thread_without_run_storage_support(
dagster_instance, dagster_run, mock_container_metrics, caplog
):
logger = logging.getLogger("test_run_metrics")
logger.setLevel(logging.DEBUG)

with patch.object(dagster_instance.run_storage, "supports_run_telemetry", return_value=False):
thread, shutdown = run_metrics_thread.start_run_metrics_thread(
dagster_instance,
dagster_run,
logger=logger,
polling_interval=2.0,
)

assert thread is None
assert shutdown is None
assert "Run telemetry is not supported" in caplog.messages[-1]


def test_report_run_metrics(dagster_instance: DagsterInstance, dagster_run: DagsterRun):
with patch.object(dagster_instance.run_storage, "add_run_telemetry") as mock_add_run_telemetry:
metrics = {
Expand Down

0 comments on commit d1b32ad

Please sign in to comment.