Skip to content

Commit

Permalink
feat(airflow): add a render_templates config parameter (datahub-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 10, 2024
1 parent 51f5e17 commit 0414443
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| render_templates | true | If true, jinja-templated fields will be automatically rendered to improve the accuracy of SQL statement extraction. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,24 @@ class DatahubLineageConfig(ConfigModel):

capture_executions: bool = False

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

# Note that this field is only respected by the lineage backend.
# The Airflow plugin v2 behaves as if it were set to True.
graceful_exceptions: bool = True

# The remaining config fields are only relevant for the v2 plugin.
enable_extractors: bool = True

# If true, ti.render_templates() will be called in the listener.
# Makes extraction of jinja-templated fields more accurate.
render_templates: bool = True

log_level: Optional[str] = None
debug_emitter: bool = False

disable_openlineage_plugin: bool = True

# Note that this field is only respected by the lineage backend.
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down Expand Up @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
render_templates = conf.get("datahub", "render_templates", fallback=True)
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)
Expand All @@ -102,4 +108,5 @@ def get_lineage_config() -> DatahubLineageConfig:
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
render_templates=render_templates,
)
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ def on_task_instance_running(
f"DataHub listener got notification about task instance start for {task_instance.task_id}"
)

task_instance = _render_templates(task_instance)
if self.config.render_templates:
task_instance = _render_templates(task_instance)

# The type ignore is to placate mypy on Airflow 2.1.x.
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
Expand Down Expand Up @@ -478,7 +479,8 @@ def on_task_instance_finish(
) -> None:
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]

task_instance = _render_templates(task_instance)
if self.config.render_templates:
task_instance = _render_templates(task_instance)

# We must prefer the task attribute, in case modifications to the task's inlets/outlets
# were made by the execute() method.
Expand Down

0 comments on commit 0414443

Please sign in to comment.