Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run metrics improvements #23510

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from dagster._core.execution.telemetry import RunTelemetryData
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun
from dagster._utils.container import retrieve_containerized_utilization_metrics
from dagster._utils.container import (
UNCONSTRAINED_CGROUP_MEMORY_LIMIT,
retrieve_containerized_utilization_metrics,
)

DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 30.0
DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 15
Expand Down Expand Up @@ -63,41 +66,48 @@ def _get_container_metrics(
cpu_usage = metrics.get("cpu_usage")
cpu_usage_ms = None
if cpu_usage is not None:
cpu_usage_ms = cpu_usage * 1000 # convert from seconds to milliseconds
cpu_usage_ms = check.not_none(cpu_usage) * 1000 # convert from seconds to milliseconds

cpu_limit_ms = None
if cpu_quota_us and cpu_quota_us > 0 and cpu_period_us and cpu_period_us > 0:
# Why the 1000 factor is a bit counterintuitive:
# quota / period -> fraction of cpu per unit of time
# 1000 * quota / period -> ms/sec of cpu
cpu_limit_ms = (1000.0 * cpu_quota_us) / cpu_period_us
cpu_limit_ms = (1000.0 * check.not_none(cpu_quota_us)) / check.not_none(cpu_period_us)

cpu_usage_rate_ms = None
measurement_timestamp = metrics.get("measurement_timestamp")
if (
previous_cpu_usage_ms
and cpu_usage_ms
and previous_measurement_timestamp
if (previous_cpu_usage_ms and cpu_usage_ms and cpu_usage_ms >= previous_cpu_usage_ms) and (
previous_measurement_timestamp
and measurement_timestamp
and measurement_timestamp > previous_measurement_timestamp
):
cpu_usage_rate_ms = (cpu_usage_ms - previous_cpu_usage_ms) / (
measurement_timestamp - previous_measurement_timestamp
)

cpu_percent = None
if cpu_limit_ms and cpu_limit_ms > 0 and cpu_usage_rate_ms and cpu_usage_rate_ms > 0:
cpu_percent = 100.0 * cpu_usage_rate_ms / cpu_limit_ms
if (cpu_limit_ms and cpu_limit_ms > 0) and (cpu_usage_rate_ms and cpu_usage_rate_ms > 0):
cpu_percent = 100.0 * check.not_none(cpu_usage_rate_ms) / check.not_none(cpu_limit_ms)

memory_percent = None
memory_limit = metrics.get("memory_limit")
memory_limit = None
cgroup_memory_limit = metrics.get("memory_limit")
memory_usage = metrics.get("memory_usage")
if memory_limit and memory_limit > 0 and memory_usage and memory_usage > 0:
memory_percent = 100.0 * memory_usage / memory_limit
if (cgroup_memory_limit and 0 < cgroup_memory_limit < UNCONSTRAINED_CGROUP_MEMORY_LIMIT) and (
memory_usage and memory_usage >= 0
):
memory_limit = cgroup_memory_limit
memory_percent = 100.0 * check.not_none(memory_usage) / check.not_none(memory_limit)

return {
# TODO - eventually we should replace and remove cpu_usage_ms and only send cpu_usage_rate_ms
# We need to ensure that the UI and GraphQL queries can handle the change
# and that we have 15 days of runs to ensure continuity.
# Why? the derivative calculated by datadog on cpu_usage_ms is sometimes wonky, resulting
# in a perplexing graph for the end user.
"container.cpu_usage_ms": cpu_usage_ms,
"container.cpu_cfs_period_us": cpu_period_us,
"container.cpu_cfs_quota_us": cpu_quota_us,
"container.cpu_usage_rate_ms": cpu_usage_rate_ms,
"container.cpu_limit_ms": cpu_limit_ms,
"container.cpu_percent": cpu_percent,
"container.memory_usage": memory_usage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dagster._core.execution.run_metrics_thread as run_metrics_thread
from dagster import DagsterInstance, DagsterRun
from dagster._core.execution.telemetry import RunTelemetryData
from dagster._utils.container import UNCONSTRAINED_CGROUP_MEMORY_LIMIT
from pytest import fixture, mark


Expand Down Expand Up @@ -56,9 +57,8 @@ def test_get_container_metrics(mock_containerized_utilization_metrics):
assert metrics
assert isinstance(metrics, dict)
assert metrics["container.cpu_usage_ms"] == 50000
assert metrics["container.cpu_usage_rate_ms"] == 80
assert metrics["container.cpu_limit_ms"] == 100
assert metrics["container.cpu_cfs_period_us"] == 10000
assert metrics["container.cpu_cfs_quota_us"] == 1000
assert metrics["container.memory_usage"] == 16384
assert metrics["container.memory_limit"] == 65536
assert metrics["container.memory_percent"] == 25
Expand All @@ -81,20 +81,58 @@ def test_get_container_metrics_missing_limits(mock_containerized_utilization_met
return_value=mock_containerized_utilization_metrics,
):
# should not throw
metrics = run_metrics_thread._get_container_metrics() # noqa: SLF001
metrics = run_metrics_thread._get_container_metrics( # noqa: SLF001
previous_cpu_usage_ms=49200, previous_measurement_timestamp=1000
)

assert metrics
assert isinstance(metrics, dict)
assert metrics["container.cpu_usage_ms"] == 50000
assert metrics["container.cpu_usage_rate_ms"] == 80 # see previous test for calculation
assert metrics["container.cpu_percent"] is None
assert metrics["container.cpu_limit_ms"] is None
assert metrics["container.cpu_cfs_period_us"] is None
assert metrics["container.cpu_cfs_quota_us"] is None
assert metrics["container.memory_usage"] == 16384
assert metrics["container.memory_limit"] is None
assert metrics["container.memory_percent"] is None


@mark.parametrize(
"cpu_cfs_quota_us, cpu_cfs_period_us, cgroup_memory_limit, expected_cpu_limit_ms, expected_memory_limit",
[
(None, None, None, None, None),
(0, 0, 0, None, None),
(10, -1, -1, None, None),
(-1, 10, UNCONSTRAINED_CGROUP_MEMORY_LIMIT, None, None),
],
)
def test_get_container_metrics_edge_conditions(
mock_containerized_utilization_metrics,
cpu_cfs_quota_us,
cpu_cfs_period_us,
cgroup_memory_limit,
expected_cpu_limit_ms,
expected_memory_limit,
):
"""These limits are not valid if none, negative or zero values are provided and we should ignore them."""
mock_containerized_utilization_metrics["cpu_cfs_quota_us"] = cpu_cfs_quota_us
mock_containerized_utilization_metrics["cpu_cfs_period_us"] = cpu_cfs_period_us
mock_containerized_utilization_metrics["memory_limit"] = cgroup_memory_limit

with patch(
"dagster._core.execution.run_metrics_thread.retrieve_containerized_utilization_metrics",
return_value=mock_containerized_utilization_metrics,
):
# should not throw
metrics = run_metrics_thread._get_container_metrics( # noqa: SLF001
previous_cpu_usage_ms=49200, previous_measurement_timestamp=1000
)

assert metrics
assert isinstance(metrics, dict)
assert metrics["container.cpu_limit_ms"] == expected_cpu_limit_ms
assert metrics["container.memory_limit"] == expected_memory_limit


@mark.parametrize(
"test_case, platform_name, is_file, readable, link_path, expected",
[
Expand Down Expand Up @@ -164,6 +202,25 @@ def test_start_run_metrics_thread(dagster_instance, dagster_run, mock_container_
assert "Shutting down metrics capture thread" in caplog.messages[-1]


def test_start_run_metrics_thread_without_run_storage_support(
dagster_instance, dagster_run, mock_container_metrics, caplog
):
logger = logging.getLogger("test_run_metrics")
logger.setLevel(logging.DEBUG)

with patch.object(dagster_instance.run_storage, "supports_run_telemetry", return_value=False):
thread, shutdown = run_metrics_thread.start_run_metrics_thread(
dagster_instance,
dagster_run,
logger=logger,
polling_interval=2.0,
)

assert thread is None
assert shutdown is None
assert "Run telemetry is not supported" in caplog.messages[-1]


def test_report_run_metrics(dagster_instance: DagsterInstance, dagster_run: DagsterRun):
with patch.object(dagster_instance.run_storage, "add_run_telemetry") as mock_add_run_telemetry:
metrics = {
Expand Down