From 29e1dc30d262ff8188d54b82d4db74a747f1a770 Mon Sep 17 00:00:00 2001 From: Mathieu Larose Date: Thu, 8 Aug 2024 12:38:03 -0400 Subject: [PATCH] feat: run metrics improvements - better handling of cgroup limits edge cases - don't push invalid memory_limit - remove low value cfs metrics - push calculated cpu usage rate for future use --- .../_core/execution/run_metrics_thread.py | 38 +++++++---- .../test_run_metrics_thread.py | 67 +++++++++++++++++-- 2 files changed, 86 insertions(+), 19 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py b/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py index de2b29f152f54..739df5fe30a6e 100644 --- a/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py +++ b/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py @@ -10,7 +10,10 @@ from dagster._core.execution.telemetry import RunTelemetryData from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRun -from dagster._utils.container import retrieve_containerized_utilization_metrics +from dagster._utils.container import ( + UNCONSTRAINED_CGROUP_MEMORY_LIMIT, + retrieve_containerized_utilization_metrics, +) DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 30.0 DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 15 @@ -63,41 +66,48 @@ def _get_container_metrics( cpu_usage = metrics.get("cpu_usage") cpu_usage_ms = None if cpu_usage is not None: - cpu_usage_ms = cpu_usage * 1000 # convert from seconds to milliseconds + cpu_usage_ms = check.not_none(cpu_usage) * 1000 # convert from seconds to milliseconds cpu_limit_ms = None if cpu_quota_us and cpu_quota_us > 0 and cpu_period_us and cpu_period_us > 0: # Why the 1000 factor is a bit counterintuitive: # quota / period -> fraction of cpu per unit of time # 1000 * quota / period -> ms/sec of cpu - cpu_limit_ms = (1000.0 * cpu_quota_us) / cpu_period_us + cpu_limit_ms = (1000.0 * check.not_none(cpu_quota_us)) / check.not_none(cpu_period_us) cpu_usage_rate_ms = None measurement_timestamp = metrics.get("measurement_timestamp") - if ( - previous_cpu_usage_ms - and cpu_usage_ms - and previous_measurement_timestamp + if (previous_cpu_usage_ms and cpu_usage_ms and cpu_usage_ms >= previous_cpu_usage_ms) and ( + previous_measurement_timestamp and measurement_timestamp + and measurement_timestamp > previous_measurement_timestamp ): cpu_usage_rate_ms = (cpu_usage_ms - previous_cpu_usage_ms) / ( measurement_timestamp - previous_measurement_timestamp ) cpu_percent = None - if cpu_limit_ms and cpu_limit_ms > 0 and cpu_usage_rate_ms and cpu_usage_rate_ms > 0: - cpu_percent = 100.0 * cpu_usage_rate_ms / cpu_limit_ms + if (cpu_limit_ms and cpu_limit_ms > 0) and (cpu_usage_rate_ms and cpu_usage_rate_ms > 0): + cpu_percent = 100.0 * check.not_none(cpu_usage_rate_ms) / check.not_none(cpu_limit_ms) memory_percent = None - memory_limit = metrics.get("memory_limit") + memory_limit = None + cgroup_memory_limit = metrics.get("memory_limit") memory_usage = metrics.get("memory_usage") - if memory_limit and memory_limit > 0 and memory_usage and memory_usage > 0: - memory_percent = 100.0 * memory_usage / memory_limit + if (cgroup_memory_limit and 0 < cgroup_memory_limit < UNCONSTRAINED_CGROUP_MEMORY_LIMIT) and ( + memory_usage and memory_usage >= 0 + ): + memory_limit = cgroup_memory_limit + memory_percent = 100.0 * check.not_none(memory_usage) / check.not_none(memory_limit) return { + # TODO - eventually we should replace and remove cpu_usage_ms and only send cpu_usage_rate_ms + # We need to ensure that the UI and GraphQL queries can handle the change + # and that we have 15 days of runs to ensure continuity. + # Why? the derivative calculated by datadog on cpu_usage_ms is sometimes wonky, resulting + # in a perplexing graph for the end user. "container.cpu_usage_ms": cpu_usage_ms, - "container.cpu_cfs_period_us": cpu_period_us, - "container.cpu_cfs_quota_us": cpu_quota_us, + "container.cpu_usage_rate_ms": cpu_usage_rate_ms, "container.cpu_limit_ms": cpu_limit_ms, "container.cpu_percent": cpu_percent, "container.memory_usage": memory_usage, diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py index c0835cc822f98..d346427c032e4 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py @@ -5,6 +5,7 @@ import dagster._core.execution.run_metrics_thread as run_metrics_thread from dagster import DagsterInstance, DagsterRun from dagster._core.execution.telemetry import RunTelemetryData +from dagster._utils.container import UNCONSTRAINED_CGROUP_MEMORY_LIMIT from pytest import fixture, mark @@ -56,9 +57,8 @@ def test_get_container_metrics(mock_containerized_utilization_metrics): assert metrics assert isinstance(metrics, dict) assert metrics["container.cpu_usage_ms"] == 50000 + assert metrics["container.cpu_usage_rate_ms"] == 80 assert metrics["container.cpu_limit_ms"] == 100 - assert metrics["container.cpu_cfs_period_us"] == 10000 - assert metrics["container.cpu_cfs_quota_us"] == 1000 assert metrics["container.memory_usage"] == 16384 assert metrics["container.memory_limit"] == 65536 assert metrics["container.memory_percent"] == 25 @@ -81,20 +81,58 @@ def test_get_container_metrics_missing_limits(mock_containerized_utilization_met return_value=mock_containerized_utilization_metrics, ): # should not throw - metrics = run_metrics_thread._get_container_metrics() # noqa: SLF001 + metrics = run_metrics_thread._get_container_metrics( # noqa: SLF001 + previous_cpu_usage_ms=49200, previous_measurement_timestamp=1000 + ) assert metrics assert isinstance(metrics, dict) assert metrics["container.cpu_usage_ms"] == 50000 + assert metrics["container.cpu_usage_rate_ms"] == 80 # see previous test for calculation assert metrics["container.cpu_percent"] is None assert metrics["container.cpu_limit_ms"] is None - assert metrics["container.cpu_cfs_period_us"] is None - assert metrics["container.cpu_cfs_quota_us"] is None assert metrics["container.memory_usage"] == 16384 assert metrics["container.memory_limit"] is None assert metrics["container.memory_percent"] is None +@mark.parametrize( + "cpu_cfs_quota_us, cpu_cfs_period_us, cgroup_memory_limit, expected_cpu_limit_ms, expected_memory_limit", + [ + (None, None, None, None, None), + (0, 0, 0, None, None), + (10, -1, -1, None, None), + (-1, 10, UNCONSTRAINED_CGROUP_MEMORY_LIMIT, None, None), + ], +) +def test_get_container_metrics_edge_conditions( + mock_containerized_utilization_metrics, + cpu_cfs_quota_us, + cpu_cfs_period_us, + cgroup_memory_limit, + expected_cpu_limit_ms, + expected_memory_limit, +): + """These limits are not valid if none, negative or zero values are provided and we should ignore them.""" + mock_containerized_utilization_metrics["cpu_cfs_quota_us"] = cpu_cfs_quota_us + mock_containerized_utilization_metrics["cpu_cfs_period_us"] = cpu_cfs_period_us + mock_containerized_utilization_metrics["memory_limit"] = cgroup_memory_limit + + with patch( + "dagster._core.execution.run_metrics_thread.retrieve_containerized_utilization_metrics", + return_value=mock_containerized_utilization_metrics, + ): + # should not throw + metrics = run_metrics_thread._get_container_metrics( # noqa: SLF001 + previous_cpu_usage_ms=49200, previous_measurement_timestamp=1000 + ) + + assert metrics + assert isinstance(metrics, dict) + assert metrics["container.cpu_limit_ms"] == expected_cpu_limit_ms + assert metrics["container.memory_limit"] == expected_memory_limit + + @mark.parametrize( "test_case, platform_name, is_file, readable, link_path, expected", [ @@ -164,6 +202,25 @@ def test_start_run_metrics_thread(dagster_instance, dagster_run, mock_container_ assert "Shutting down metrics capture thread" in caplog.messages[-1] +def test_start_run_metrics_thread_without_run_storage_support( + dagster_instance, dagster_run, mock_container_metrics, caplog +): + logger = logging.getLogger("test_run_metrics") + logger.setLevel(logging.DEBUG) + + with patch.object(dagster_instance.run_storage, "supports_run_telemetry", return_value=False): + thread, shutdown = run_metrics_thread.start_run_metrics_thread( + dagster_instance, + dagster_run, + logger=logger, + polling_interval=2.0, + ) + + assert thread is None + assert shutdown is None + assert "Run telemetry is not supported" in caplog.messages[-1] + + def test_report_run_metrics(dagster_instance: DagsterInstance, dagster_run: DagsterRun): with patch.object(dagster_instance.run_storage, "add_run_telemetry") as mock_add_run_telemetry: metrics = {