diff --git a/python_modules/dagster/dagster/_cli/api.py b/python_modules/dagster/dagster/_cli/api.py index 285e54469b4a4..82be6a3205652 100644 --- a/python_modules/dagster/dagster/_cli/api.py +++ b/python_modules/dagster/dagster/_cli/api.py @@ -138,14 +138,12 @@ def _execute_run_command_body( f"Run with id '{run_id}' does not include an origin.", ) - start_metric_thread = _should_start_metrics_thread(dagster_run) - if start_metric_thread: + if _should_start_metrics_thread(dagster_run): logger = logging.getLogger("run_metrics") polling_interval = _metrics_polling_interval(dagster_run, logger=logger) metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread( instance, dagster_run, - container_metrics_enabled=True, python_metrics_enabled=_enable_python_runtime_metrics(dagster_run), polling_interval=polling_interval, logger=logger, @@ -178,31 +176,50 @@ def _execute_run_command_body( # relies on core_execute_run writing failures to the event log before raising run_worker_failed = True finally: - if metrics_thread and metrics_thread_shutdown_event: - stopped = stop_run_metrics_thread(metrics_thread, metrics_thread_shutdown_event) - if not stopped: - instance.report_engine_event("Metrics thread did not shutdown properly") - - if instance.should_start_background_run_thread: - cancellation_thread_shutdown_event = check.not_none(cancellation_thread_shutdown_event) - cancellation_thread = check.not_none(cancellation_thread) - cancellation_thread_shutdown_event.set() - if cancellation_thread.is_alive(): - cancellation_thread.join(timeout=15) - if cancellation_thread.is_alive(): - instance.report_engine_event( - "Cancellation thread did not shutdown gracefully", - dagster_run, - ) - - instance.report_engine_event( - f"Process for run exited (pid: {pid}).", + _shutdown_threads( + instance, dagster_run, + metrics_thread, + metrics_thread_shutdown_event, + cancellation_thread, + cancellation_thread_shutdown_event, ) return 1 if (run_worker_failed and set_exit_code_on_failure) else 0 +def _shutdown_threads( + instance: DagsterInstance, + dagster_run: DagsterRun, + metrics_thread: Optional[threading.Thread], + metrics_thread_shutdown_event: Optional[threading.Event], + cancellation_thread: Optional[threading.Thread], + cancellation_thread_shutdown_event: Optional[threading.Event], +): + pid = os.getpid() + if metrics_thread and metrics_thread_shutdown_event: + stopped = stop_run_metrics_thread(metrics_thread, metrics_thread_shutdown_event) + if not stopped: + instance.report_engine_event("Metrics thread did not shutdown properly") + + if instance.should_start_background_run_thread: + cancellation_thread_shutdown_event = check.not_none(cancellation_thread_shutdown_event) + cancellation_thread = check.not_none(cancellation_thread) + cancellation_thread_shutdown_event.set() + if cancellation_thread.is_alive(): + cancellation_thread.join(timeout=15) + if cancellation_thread.is_alive(): + instance.report_engine_event( + "Cancellation thread did not shutdown gracefully", + dagster_run, + ) + + instance.report_engine_event( + f"Process for run exited (pid: {pid}).", + dagster_run, + ) + + @api_cli.command( name="resume_run", help=( @@ -253,14 +270,12 @@ def _resume_run_command_body( f"Run with id '{run_id}' does not include an origin.", ) - start_metric_thread = _should_start_metrics_thread(dagster_run) - if start_metric_thread: + if _should_start_metrics_thread(dagster_run): logger = logging.getLogger("run_metrics") polling_interval = _metrics_polling_interval(dagster_run, logger=logger) metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread( instance, dagster_run, - container_metrics_enabled=True, python_metrics_enabled=_enable_python_runtime_metrics(dagster_run), polling_interval=polling_interval, logger=logger, @@ -295,25 +310,13 @@ def _resume_run_command_body( # relies on core_execute_run writing failures to the event log before raising run_worker_failed = True finally: - if metrics_thread and metrics_thread_shutdown_event: - stopped = stop_run_metrics_thread(metrics_thread, metrics_thread_shutdown_event) - if not stopped: - instance.report_engine_event("Metrics thread did not shutdown properly") - - if instance.should_start_background_run_thread: - cancellation_thread_shutdown_event = check.not_none(cancellation_thread_shutdown_event) - cancellation_thread = check.not_none(cancellation_thread) - cancellation_thread_shutdown_event.set() - if cancellation_thread.is_alive(): - cancellation_thread.join(timeout=15) - if cancellation_thread.is_alive(): - instance.report_engine_event( - "Cancellation thread did not shutdown gracefully", - dagster_run, - ) - instance.report_engine_event( - f"Process for job exited (pid: {pid}).", + _shutdown_threads( + instance, dagster_run, + metrics_thread, + metrics_thread_shutdown_event, + cancellation_thread, + cancellation_thread_shutdown_event, ) return 1 if (run_worker_failed and set_exit_code_on_failure) else 0 diff --git a/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py b/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py index 3f69eaae9e927..de2b29f152f54 100644 --- a/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py +++ b/python_modules/dagster/dagster/_core/execution/run_metrics_thread.py @@ -12,8 +12,8 @@ from dagster._core.storage.dagster_run import DagsterRun from dagster._utils.container import retrieve_containerized_utilization_metrics -DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 15.0 -DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 30 +DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 30.0 +DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 15 def _get_platform_name() -> str: @@ -137,6 +137,8 @@ def _report_run_metrics( telemetry_data = RunTelemetryData(run_id=dagster_run.run_id, datapoints=datapoints) + # TODO - this should throw an exception or return a control value if the telemetry is not enabled server side + # so that we can catch and stop the thread. instance._run_storage.add_run_telemetry( # noqa: SLF001 telemetry_data, tags=run_tags ) @@ -211,22 +213,26 @@ def _capture_metrics( def start_run_metrics_thread( instance: DagsterInstance, dagster_run: DagsterRun, - container_metrics_enabled: Optional[bool] = True, python_metrics_enabled: Optional[bool] = False, logger: Optional[logging.Logger] = None, polling_interval: float = DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS, -) -> Tuple[threading.Thread, threading.Event]: +) -> Tuple[Optional[threading.Thread], Optional[threading.Event]]: check.inst_param(instance, "instance", DagsterInstance) check.inst_param(dagster_run, "dagster_run", DagsterRun) check.opt_inst_param(logger, "logger", logging.Logger) - check.opt_bool_param(container_metrics_enabled, "container_metrics_enabled") check.opt_bool_param(python_metrics_enabled, "python_metrics_enabled") check.float_param(polling_interval, "polling_interval") - container_metrics_enabled = container_metrics_enabled and _process_is_containerized() + if not instance.run_storage.supports_run_telemetry(): + if logger: + logger.debug("Run telemetry is not supported, skipping run metrics thread") + return None, None - # TODO - ensure at least one metrics source is enabled - assert container_metrics_enabled or python_metrics_enabled, "No metrics enabled" + container_metrics_enabled = _process_is_containerized() + if not container_metrics_enabled and not python_metrics_enabled: + if logger: + logger.debug("No collectable metrics, skipping run metrics thread") + return None, None if logger: logger.debug("Starting run metrics thread") diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index 7a1b71044e25c..d78cddd94ab68 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -353,6 +353,10 @@ def add_daemon_heartbeat(self, daemon_heartbeat: DaemonHeartbeat) -> None: def get_daemon_heartbeats(self) -> Mapping[str, DaemonHeartbeat]: """Latest heartbeats of all daemon types.""" + def supports_run_telemetry(self) -> bool: + """Whether the storage supports run telemetry.""" + return False + def add_run_telemetry( self, run_telemetry: RunTelemetryData, diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py index ccf276780b38b..c0835cc822f98 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_run_metrics_thread.py @@ -135,32 +135,33 @@ def test_start_run_metrics_thread(dagster_instance, dagster_run, mock_container_ logger = logging.getLogger("test_run_metrics") logger.setLevel(logging.DEBUG) - with patch( - "dagster._core.execution.run_metrics_thread._get_container_metrics", - return_value=mock_container_metrics, - ): + with patch.object(dagster_instance.run_storage, "supports_run_telemetry", return_value=True): with patch( - "dagster._core.execution.run_metrics_thread._process_is_containerized", - return_value=True, + "dagster._core.execution.run_metrics_thread._get_container_metrics", + return_value=mock_container_metrics, ): - thread, shutdown = run_metrics_thread.start_run_metrics_thread( - dagster_instance, - dagster_run, - logger=logger, - container_metrics_enabled=True, - polling_interval=2.0, - ) - - time.sleep(0.1) - - assert thread.is_alive() - - time.sleep(0.1) - shutdown.set() - - thread.join() - assert thread.is_alive() is False - assert "Starting run metrics thread" in caplog.messages[0] + with patch( + "dagster._core.execution.run_metrics_thread._process_is_containerized", + return_value=True, + ): + thread, shutdown = run_metrics_thread.start_run_metrics_thread( + dagster_instance, + dagster_run, + logger=logger, + polling_interval=2.0, + ) + + time.sleep(0.1) + + assert thread.is_alive() + assert "Starting run metrics thread" in caplog.messages[0] + + time.sleep(0.1) + shutdown.set() + + thread.join() + assert thread.is_alive() is False + assert "Shutting down metrics capture thread" in caplog.messages[-1] def test_report_run_metrics(dagster_instance: DagsterInstance, dagster_run: DagsterRun):