Skip to content

Commit

Permalink
Use start_date instead of execution_date for ongoing duration met…
Browse files Browse the repository at this point in the history
…rics (#19278)

* Use start_date instead of execution_date for ongoing duration metrics

* Add changelog

* Improve test
  • Loading branch information
Kyle-Neale authored Dec 18, 2024
1 parent bc90221 commit 79783c0
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 4 deletions.
6 changes: 6 additions & 0 deletions airflow/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ files:
description: The URL used to connect to the Airflow instance (use the Airflow web server REST API endpoint).
value:
type: string
- name: collect_ongoing_duration
required: false
description: Collect ongoing duration metric for DAG task instances.
value:
type: boolean
example: true
- template: instances/http
- template: instances/default
- template: logs
Expand Down
1 change: 1 addition & 0 deletions airflow/changelog.d/19278.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use `start_date` instead of `execution_date` for ongoing duration metrics
8 changes: 4 additions & 4 deletions airflow/datadog_checks/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, name, init_config, instances):

self._url = self.instance.get('url', '')
self._tags = self.instance.get('tags', [])

self._collect_ongoing_duration = self.instance.get('collect_ongoing_duration', True)
# The Agent only makes one attempt to instantiate each AgentCheck so any errors occurring
# in `__init__` are logged just once, making it difficult to spot. Therefore, we emit
# potential configuration errors as part of the check run phase.
Expand Down Expand Up @@ -51,7 +51,7 @@ def check(self, _):
else:
submit_metrics(resp, tags)
# Only calculate task duration for stable API
if target_url is url_stable:
if target_url is url_stable and self._collect_ongoing_duration:
task_instances = self._get_all_task_instances(url_stable_task_instances, tags)
if task_instances:
self._calculate_task_ongoing_duration(task_instances, tags)
Expand Down Expand Up @@ -118,14 +118,14 @@ def _calculate_task_ongoing_duration(self, tasks, tags):
dag_task_tags = copy(tags)
task_id = task.get('task_id')
dag_id = task.get('dag_id')
execution_date = task.get('execution_date')
start_date = task.get('start_date')

# Add tags for each task
dag_task_tags.append('dag_id:{}'.format(dag_id))
dag_task_tags.append('task_id:{}'.format(task_id))

# Calculate ongoing duration
ongoing_duration = get_timestamp() - datetime.fromisoformat((execution_date)).timestamp()
ongoing_duration = get_timestamp() - datetime.fromisoformat((start_date)).timestamp()
self.gauge('airflow.dag.task.ongoing_duration', ongoing_duration, tags=dag_task_tags)

def _parse_config(self):
Expand Down
4 changes: 4 additions & 0 deletions airflow/datadog_checks/airflow/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def instance_auth_type():
return 'basic'


def instance_collect_ongoing_duration():
return True


def instance_disable_generic_tags():
return False

Expand Down
1 change: 1 addition & 0 deletions airflow/datadog_checks/airflow/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class InstanceConfig(BaseModel):
aws_host: Optional[str] = None
aws_region: Optional[str] = None
aws_service: Optional[str] = None
collect_ongoing_duration: Optional[bool] = None
connect_timeout: Optional[float] = None
disable_generic_tags: Optional[bool] = None
empty_default_hostname: Optional[bool] = None
Expand Down
5 changes: 5 additions & 0 deletions airflow/datadog_checks/airflow/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ instances:
#
- url: <URL>

## @param collect_ongoing_duration - boolean - optional - default: true
## Collect ongoing duration metric for DAG task instances.
#
# collect_ongoing_duration: true

## @param proxy - mapping - optional
## This overrides the `proxy` setting in `init_config`.
##
Expand Down
41 changes: 41 additions & 0 deletions airflow/tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,44 @@ def test_dag_task_ongoing_duration(aggregator, task_instance):
tags=['key:my-tag', 'url:http://localhost:8080', 'dag_id:tutorial', 'task_id:sleep'],
count=1,
)


@pytest.mark.parametrize(
"collect_ongoing_duration, should_call_method",
[
pytest.param(
True,
[
mock.call(
'http://localhost:8080/api/v1/dags/~/dagRuns/~/taskInstances?state=running',
['url:http://localhost:8080', 'key:my-tag'],
)
],
id="collect",
),
pytest.param(
False,
[],
id="don't collect",
),
],
)
def test_config_collect_ongoing_duration(collect_ongoing_duration, should_call_method):
instance = {**common.FULL_CONFIG['instances'][0], 'collect_ongoing_duration': collect_ongoing_duration}
check = AirflowCheck('airflow', common.FULL_CONFIG, [instance])

with mock.patch('datadog_checks.airflow.airflow.AirflowCheck._get_version', return_value='2.6.2'):
with mock.patch('datadog_checks.base.utils.http.requests') as req:
mock_resp = mock.MagicMock(status_code=200)
mock_resp.json.side_effect = [
{'metadatabase': {'status': 'healthy'}, 'scheduler': {'status': 'healthy'}},
]
req.get.return_value = mock_resp

with mock.patch(
'datadog_checks.airflow.airflow.AirflowCheck._get_all_task_instances'
) as mock_get_all_task_instances:
check.check(None)

# Assert method calls
mock_get_all_task_instances.assert_has_calls(should_call_method, any_order=False)

0 comments on commit 79783c0

Please sign in to comment.