Skip to content

Commit

Permalink
refactor: run metric threads startup and shutdown code
Browse files Browse the repository at this point in the history
  • Loading branch information
mlarose committed Oct 8, 2024
1 parent 75414fa commit 0e33604
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 75 deletions.
89 changes: 46 additions & 43 deletions python_modules/dagster/dagster/_cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,12 @@ def _execute_run_command_body(
f"Run with id '{run_id}' does not include an origin.",
)

start_metric_thread = _should_start_metrics_thread(dagster_run)
if start_metric_thread:
if _should_start_metrics_thread(dagster_run):
logger = logging.getLogger("run_metrics")
polling_interval = _metrics_polling_interval(dagster_run, logger=logger)
metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread(
instance,
dagster_run,
container_metrics_enabled=True,
python_metrics_enabled=_enable_python_runtime_metrics(dagster_run),
polling_interval=polling_interval,
logger=logger,
Expand Down Expand Up @@ -178,31 +176,50 @@ def _execute_run_command_body(
# relies on core_execute_run writing failures to the event log before raising
run_worker_failed = True
finally:
if metrics_thread and metrics_thread_shutdown_event:
stopped = stop_run_metrics_thread(metrics_thread, metrics_thread_shutdown_event)
if not stopped:
instance.report_engine_event("Metrics thread did not shutdown properly")

if instance.should_start_background_run_thread:
cancellation_thread_shutdown_event = check.not_none(cancellation_thread_shutdown_event)
cancellation_thread = check.not_none(cancellation_thread)
cancellation_thread_shutdown_event.set()
if cancellation_thread.is_alive():
cancellation_thread.join(timeout=15)
if cancellation_thread.is_alive():
instance.report_engine_event(
"Cancellation thread did not shutdown gracefully",
dagster_run,
)

instance.report_engine_event(
f"Process for run exited (pid: {pid}).",
_shutdown_threads(
instance,
dagster_run,
metrics_thread,
metrics_thread_shutdown_event,
cancellation_thread,
cancellation_thread_shutdown_event,
)

return 1 if (run_worker_failed and set_exit_code_on_failure) else 0


def _shutdown_threads(
instance: DagsterInstance,
dagster_run: DagsterRun,
metrics_thread: Optional[threading.Thread],
metrics_thread_shutdown_event: Optional[threading.Event],
cancellation_thread: Optional[threading.Thread],
cancellation_thread_shutdown_event: Optional[threading.Event],
):
pid = os.getpid()
if metrics_thread and metrics_thread_shutdown_event:
stopped = stop_run_metrics_thread(metrics_thread, metrics_thread_shutdown_event)
if not stopped:
instance.report_engine_event("Metrics thread did not shutdown properly")

if instance.should_start_background_run_thread:
cancellation_thread_shutdown_event = check.not_none(cancellation_thread_shutdown_event)
cancellation_thread = check.not_none(cancellation_thread)
cancellation_thread_shutdown_event.set()
if cancellation_thread.is_alive():
cancellation_thread.join(timeout=15)
if cancellation_thread.is_alive():
instance.report_engine_event(
"Cancellation thread did not shutdown gracefully",
dagster_run,
)

instance.report_engine_event(
f"Process for run exited (pid: {pid}).",
dagster_run,
)


@api_cli.command(
name="resume_run",
help=(
Expand Down Expand Up @@ -253,14 +270,12 @@ def _resume_run_command_body(
f"Run with id '{run_id}' does not include an origin.",
)

start_metric_thread = _should_start_metrics_thread(dagster_run)
if start_metric_thread:
if _should_start_metrics_thread(dagster_run):
logger = logging.getLogger("run_metrics")
polling_interval = _metrics_polling_interval(dagster_run, logger=logger)
metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread(
instance,
dagster_run,
container_metrics_enabled=True,
python_metrics_enabled=_enable_python_runtime_metrics(dagster_run),
polling_interval=polling_interval,
logger=logger,
Expand Down Expand Up @@ -295,25 +310,13 @@ def _resume_run_command_body(
# relies on core_execute_run writing failures to the event log before raising
run_worker_failed = True
finally:
if metrics_thread and metrics_thread_shutdown_event:
stopped = stop_run_metrics_thread(metrics_thread, metrics_thread_shutdown_event)
if not stopped:
instance.report_engine_event("Metrics thread did not shutdown properly")

if instance.should_start_background_run_thread:
cancellation_thread_shutdown_event = check.not_none(cancellation_thread_shutdown_event)
cancellation_thread = check.not_none(cancellation_thread)
cancellation_thread_shutdown_event.set()
if cancellation_thread.is_alive():
cancellation_thread.join(timeout=15)
if cancellation_thread.is_alive():
instance.report_engine_event(
"Cancellation thread did not shutdown gracefully",
dagster_run,
)
instance.report_engine_event(
f"Process for job exited (pid: {pid}).",
_shutdown_threads(
instance,
dagster_run,
metrics_thread,
metrics_thread_shutdown_event,
cancellation_thread,
cancellation_thread_shutdown_event,
)

return 1 if (run_worker_failed and set_exit_code_on_failure) else 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from dagster._core.storage.dagster_run import DagsterRun
from dagster._utils.container import retrieve_containerized_utilization_metrics

DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 15.0
DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 30
DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS = 30.0
DEFAULT_RUN_METRICS_SHUTDOWN_SECONDS = 15


def _get_platform_name() -> str:
Expand Down Expand Up @@ -137,6 +137,8 @@ def _report_run_metrics(

telemetry_data = RunTelemetryData(run_id=dagster_run.run_id, datapoints=datapoints)

# TODO - this should throw an exception or return a control value if the telemetry is not enabled server side
# so that we can catch and stop the thread.
instance._run_storage.add_run_telemetry( # noqa: SLF001
telemetry_data, tags=run_tags
)
Expand Down Expand Up @@ -211,22 +213,26 @@ def _capture_metrics(
def start_run_metrics_thread(
instance: DagsterInstance,
dagster_run: DagsterRun,
container_metrics_enabled: Optional[bool] = True,
python_metrics_enabled: Optional[bool] = False,
logger: Optional[logging.Logger] = None,
polling_interval: float = DEFAULT_RUN_METRICS_POLL_INTERVAL_SECONDS,
) -> Tuple[threading.Thread, threading.Event]:
) -> Tuple[Optional[threading.Thread], Optional[threading.Event]]:
check.inst_param(instance, "instance", DagsterInstance)
check.inst_param(dagster_run, "dagster_run", DagsterRun)
check.opt_inst_param(logger, "logger", logging.Logger)
check.opt_bool_param(container_metrics_enabled, "container_metrics_enabled")
check.opt_bool_param(python_metrics_enabled, "python_metrics_enabled")
check.float_param(polling_interval, "polling_interval")

container_metrics_enabled = container_metrics_enabled and _process_is_containerized()
if not instance.run_storage.supports_run_telemetry():
if logger:
logger.debug("Run telemetry is not supported, skipping run metrics thread")
return None, None

# TODO - ensure at least one metrics source is enabled
assert container_metrics_enabled or python_metrics_enabled, "No metrics enabled"
container_metrics_enabled = _process_is_containerized()
if not container_metrics_enabled and not python_metrics_enabled:
if logger:
logger.debug("No collectable metrics, skipping run metrics thread")
return None, None

if logger:
logger.debug("Starting run metrics thread")
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ def add_daemon_heartbeat(self, daemon_heartbeat: DaemonHeartbeat) -> None:
def get_daemon_heartbeats(self) -> Mapping[str, DaemonHeartbeat]:
"""Latest heartbeats of all daemon types."""

def supports_run_telemetry(self) -> bool:
"""Whether the storage supports run telemetry."""
return False

def add_run_telemetry(
self,
run_telemetry: RunTelemetryData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,32 +135,33 @@ def test_start_run_metrics_thread(dagster_instance, dagster_run, mock_container_
logger = logging.getLogger("test_run_metrics")
logger.setLevel(logging.DEBUG)

with patch(
"dagster._core.execution.run_metrics_thread._get_container_metrics",
return_value=mock_container_metrics,
):
with patch.object(dagster_instance.run_storage, "supports_run_telemetry", return_value=True):
with patch(
"dagster._core.execution.run_metrics_thread._process_is_containerized",
return_value=True,
"dagster._core.execution.run_metrics_thread._get_container_metrics",
return_value=mock_container_metrics,
):
thread, shutdown = run_metrics_thread.start_run_metrics_thread(
dagster_instance,
dagster_run,
logger=logger,
container_metrics_enabled=True,
polling_interval=2.0,
)

time.sleep(0.1)

assert thread.is_alive()

time.sleep(0.1)
shutdown.set()

thread.join()
assert thread.is_alive() is False
assert "Starting run metrics thread" in caplog.messages[0]
with patch(
"dagster._core.execution.run_metrics_thread._process_is_containerized",
return_value=True,
):
thread, shutdown = run_metrics_thread.start_run_metrics_thread(
dagster_instance,
dagster_run,
logger=logger,
polling_interval=2.0,
)

time.sleep(0.1)

assert thread.is_alive()
assert "Starting run metrics thread" in caplog.messages[0]

time.sleep(0.1)
shutdown.set()

thread.join()
assert thread.is_alive() is False
assert "Shutting down metrics capture thread" in caplog.messages[-1]


def test_report_run_metrics(dagster_instance: DagsterInstance, dagster_run: DagsterRun):
Expand Down

0 comments on commit 0e33604

Please sign in to comment.