Skip to content

Commit

Permalink
use the run metrics tag to conditionally start the metric thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mlarose committed Sep 17, 2024
1 parent 23482c6 commit 62b8c1d
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions python_modules/dagster/dagster/_cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def send_to_buffer(event):
sys.exit(return_code)


def _should_start_metrics_thread(dagster_run: DagsterRun) -> bool:
return get_boolean_tag_value(dagster_run.tags.get("dagster/run_metrics"))


def _enable_python_runtime_metrics(dagster_run: DagsterRun) -> bool:
return get_boolean_tag_value(dagster_run.tags.get("dagster/python_runtime_metrics"))

Expand Down Expand Up @@ -132,15 +136,18 @@ def _execute_run_command_body(
f"Run with id '{run_id}' does not include an origin.",
)

logger = logging.getLogger("run_metrics")
polling_interval = _metrics_polling_interval(dagster_run, logger=logger)
metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread(
instance,
dagster_run,
python_metrics_enabled=_enable_python_runtime_metrics(dagster_run),
polling_interval=polling_interval,
logger=logger,
)
if _should_start_metrics_thread(dagster_run):
logger = logging.getLogger("run_metrics")
polling_interval = _metrics_polling_interval(dagster_run, logger=logger)
metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread(
instance,
dagster_run,
python_metrics_enabled=_enable_python_runtime_metrics(dagster_run),
polling_interval=polling_interval,
logger=logger,
)
else:
metrics_thread, metrics_thread_shutdown_event = None, None

recon_job = recon_job_from_origin(cast(JobPythonOrigin, dagster_run.job_code_origin))

Expand Down Expand Up @@ -261,15 +268,18 @@ def _resume_run_command_body(
f"Run with id '{run_id}' does not include an origin.",
)

logger = logging.getLogger("run_metrics")
polling_interval = _metrics_polling_interval(dagster_run, logger=logger)
metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread(
instance,
dagster_run,
python_metrics_enabled=_enable_python_runtime_metrics(dagster_run),
polling_interval=polling_interval,
logger=logger,
)
if _should_start_metrics_thread(dagster_run):
logger = logging.getLogger("run_metrics")
polling_interval = _metrics_polling_interval(dagster_run, logger=logger)
metrics_thread, metrics_thread_shutdown_event = start_run_metrics_thread(
instance,
dagster_run,
python_metrics_enabled=_enable_python_runtime_metrics(dagster_run),
polling_interval=polling_interval,
logger=logger,
)
else:
metrics_thread, metrics_thread_shutdown_event = None, None

recon_job = recon_job_from_origin(cast(JobPythonOrigin, dagster_run.job_code_origin))

Expand Down

0 comments on commit 62b8c1d

Please sign in to comment.