Skip to content

Commit

Permalink
[dagster-tableau] Detach spec loading, assets def building from Table…
Browse files Browse the repository at this point in the history
…au resource (#25459)

## Summary

As part of migrating to the APIs mocked out in
https://www.notion.so/dagster/Asset-integration-customization-options-11a18b92e462807c94c2f84675021120
and
https://www.notion.so/dagster/Power-BI-semantic-model-API-options-11918b92e4628006a25af3f58a6b48d9,
detaches the asset spec loading process from the Tableau resource, and
moves building asset definitions explicitly into user code.

```python
resource = TableauCloudWorkspace(
    connected_app_client_id=EnvVar("CONNECTED_APP_CLIENT_ID"),
    connected_app_secret_id=EnvVar("CONNECTED_APP_SECRET_ID"),
    connected_app_secret_value=EnvVar("CONNECTED_APP_SECRET_VALUE"),
    username=EnvVar("USERNAME"),
    site_name=EnvVar("SITE_NAME"),
    pod_name=EnvVar("POD_NAME"),
)

tableau_specs = load_tableau_asset_specs(
    workspace=resource,
)

non_executable_asset_specs = [
    spec for spec in tableau_specs if spec.tags.get("dagster-tableau/asset_type") == "data_source"
]

executable_asset_specs = [
    spec
    for spec in tableau_specs
    if spec.tags.get("dagster-tableau/asset_type") in ["dashboard", "sheet"]
]

defs = Definitions(
    assets=[
        build_tableau_executable_assets_definition(
            resource_key="tableau",
            workspace=resource,
            specs=executable_asset_specs,
            refreshable_workbook_ids=["b75fc023-a7ca-4115-857b-4342028640d0"],
        ),
        *non_executable_asset_specs,
    ],
    resources={"tableau": resource},
)

```


## How I Tested These Changes

New unit test, updated existing unit tests.
  • Loading branch information
maximearmstrong authored Oct 24, 2024
1 parent e813170 commit e897e05
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 126 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_tableau.assets import (
build_tableau_executable_assets_definition as build_tableau_executable_assets_definition,
)
from dagster_tableau.resources import (
TableauCloudWorkspace as TableauCloudWorkspace,
TableauServerWorkspace as TableauServerWorkspace,
load_tableau_asset_specs as load_tableau_asset_specs,
)
from dagster_tableau.translator import DagsterTableauTranslator as DagsterTableauTranslator
from dagster_tableau.version import __version__ as __version__
Expand Down
93 changes: 93 additions & 0 deletions python_modules/libraries/dagster-tableau/dagster_tableau/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Optional, Sequence, cast

from dagster import (
AssetExecutionContext,
AssetsDefinition,
AssetSpec,
ObserveResult,
Output,
multi_asset,
)

from dagster_tableau.resources import BaseTableauWorkspace


def build_tableau_executable_assets_definition(
resource_key: str,
specs: Sequence[AssetSpec],
refreshable_workbook_ids: Optional[Sequence[str]] = None,
) -> AssetsDefinition:
"""Returns the AssetsDefinition of the executable assets in the Tableau workspace.
Args:
resource_key (str): The resource key to use for the Tableau resource.
specs (Sequence[AssetSpec]): The asset specs of the executable assets in the Tableau workspace.
refreshable_workbook_ids (Optional[Sequence[str]]): A list of workbook IDs. The workbooks provided must
have extracts as data sources and be refreshable in Tableau.
When materializing your Tableau assets, the workbooks provided are refreshed,
refreshing their sheets and dashboards before pulling their data in Dagster.
This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI
and only works for workbooks for which the data sources are extracts.
See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now
for documentation.
Returns:
AssetsDefinition: The AssetsDefinition of the executable assets in the Tableau workspace.
"""

@multi_asset(
name=f"tableau_sync_site_{resource_key}",
compute_kind="tableau",
can_subset=False,
specs=specs,
required_resource_keys={resource_key},
)
def asset_fn(context: AssetExecutionContext):
tableau = cast(BaseTableauWorkspace, getattr(context.resources, resource_key))
with tableau.get_client() as client:
refreshed_workbooks = set()
for refreshable_workbook_id in refreshable_workbook_ids or []:
refreshed_workbooks.add(client.refresh_and_poll(refreshable_workbook_id))
for spec in specs:
data = client.get_view(spec.metadata.get("id"))
asset_key = spec.key
if (
spec.metadata.get("workbook_id")
and spec.metadata.get("workbook_id") in refreshed_workbooks
):
yield Output(
value=None,
output_name="__".join(asset_key.path),
metadata={
"workbook_id": data.workbook_id,
"owner_id": data.owner_id,
"name": data.name,
"contentUrl": data.content_url,
"createdAt": data.created_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.created_at
else None,
"updatedAt": data.updated_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.updated_at
else None,
},
)
else:
yield ObserveResult(
asset_key=asset_key,
metadata={
"workbook_id": data.workbook_id,
"owner_id": data.owner_id,
"name": data.name,
"contentUrl": data.content_url,
"createdAt": data.created_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.created_at
else None,
"updatedAt": data.updated_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.updated_at
else None,
},
)

return asset_fn
161 changes: 69 additions & 92 deletions python_modules/libraries/dagster-tableau/dagster_tableau/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@
import requests
import tableauserverclient as TSC
from dagster import (
AssetsDefinition,
AssetSpec,
ConfigurableResource,
Definitions,
Failure,
ObserveResult,
Output,
_check as check,
external_assets_from_specs,
get_dagster_logger,
multi_asset,
)
from dagster._annotations import experimental
from dagster._annotations import deprecated, experimental
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._record import record
from dagster._utils.cached_method import cached_method
Expand All @@ -32,6 +28,7 @@
DagsterTableauTranslator,
TableauContentData,
TableauContentType,
TableauTagSet,
TableauWorkspaceData,
)

Expand Down Expand Up @@ -404,6 +401,10 @@ def fetch_tableau_workspace_data(
+ list(data_sources_by_id.values()),
)

@deprecated(
breaking_version="1.9.0",
additional_warn_text="Use dagster_tableau.load_tableau_asset_specs instead",
)
def build_defs(
self,
refreshable_workbook_ids: Optional[Sequence[str]] = None,
Expand All @@ -429,11 +430,60 @@ def build_defs(
Returns:
Definitions: A Definitions object which will build and return the Power BI content.
"""
return TableauWorkspaceDefsLoader(
workspace=self,
from dagster_tableau.assets import build_tableau_executable_assets_definition

resource_key = "tableau"

asset_specs = load_tableau_asset_specs(self, dagster_tableau_translator)

non_executable_asset_specs = [
spec
for spec in asset_specs
if TableauTagSet.extract(spec.tags).asset_type == "data_source"
]

executable_asset_specs = [
spec
for spec in asset_specs
if TableauTagSet.extract(spec.tags).asset_type in ["dashboard", "sheet"]
]

return Definitions(
assets=[
build_tableau_executable_assets_definition(
resource_key=resource_key,
specs=executable_asset_specs,
refreshable_workbook_ids=refreshable_workbook_ids,
),
*non_executable_asset_specs,
],
resources={resource_key: self},
)


def load_tableau_asset_specs(
workspace: BaseTableauWorkspace,
dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Tableau content in the workspace.
Args:
workspace (Union[TableauCloudWorkspace, TableauServerWorkspace]): The Tableau workspace to fetch assets from.
dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use
to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator.
Returns:
List[AssetSpec]: The set of assets representing the Tableau content in the workspace.
"""
return check.is_list(
TableauWorkspaceDefsLoader(
workspace=workspace,
translator_cls=dagster_tableau_translator,
refreshable_workbook_ids=refreshable_workbook_ids or [],
).build_defs()
)
.build_defs()
.assets,
AssetSpec,
)


@experimental
Expand Down Expand Up @@ -478,7 +528,6 @@ def build_client(self) -> None:
class TableauWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: BaseTableauWorkspace
translator_cls: Type[DagsterTableauTranslator]
refreshable_workbook_ids: Sequence[str]

@property
def defs_key(self) -> str:
Expand All @@ -504,86 +553,14 @@ def defs_from_state(self, state: Sequence[Mapping[str, Any]]) -> Definitions:

translator = self.translator_cls(context=workspace_data)

external_assets = external_assets_from_specs(
[
translator.get_asset_spec(content)
for content in workspace_data.data_sources_by_id.values()
]
)

tableau_assets = self._build_tableau_assets_from_workspace_data(
workspace_data=workspace_data,
translator=translator,
)

return Definitions(assets=external_assets + tableau_assets)
all_external_data = [
*workspace_data.data_sources_by_id.values(),
*workspace_data.sheets_by_id.values(),
*workspace_data.dashboards_by_id.values(),
]

def _build_tableau_assets_from_workspace_data(
self,
workspace_data: TableauWorkspaceData,
translator: DagsterTableauTranslator,
) -> List[AssetsDefinition]:
@multi_asset(
name=f"tableau_sync_site_{self.workspace.site_name.replace('-', '_')}",
compute_kind="tableau",
can_subset=False,
specs=[
translator.get_asset_spec(content)
for content in [
*workspace_data.sheets_by_id.values(),
*workspace_data.dashboards_by_id.values(),
]
],
resource_defs={"tableau": self.workspace.get_resource_definition()},
)
def _assets(tableau: BaseTableauWorkspace):
with tableau.get_client() as client:
refreshed_workbooks = set()
for refreshable_workbook_id in self.refreshable_workbook_ids:
refreshed_workbooks.add(client.refresh_and_poll(refreshable_workbook_id))
for view_id, view_content_data in [
*workspace_data.sheets_by_id.items(),
*workspace_data.dashboards_by_id.items(),
]:
data = client.get_view(view_id)
if view_content_data.content_type == TableauContentType.SHEET:
asset_key = translator.get_sheet_asset_key(view_content_data)
elif view_content_data.content_type == TableauContentType.DASHBOARD:
asset_key = translator.get_dashboard_asset_key(view_content_data)
else:
check.assert_never(view_content_data.content_type)
if view_content_data.properties["workbook"]["luid"] in refreshed_workbooks:
yield Output(
value=None,
output_name="__".join(asset_key.path),
metadata={
"workbook_id": data.workbook_id,
"owner_id": data.owner_id,
"name": data.name,
"contentUrl": data.content_url,
"createdAt": data.created_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.created_at
else None,
"updatedAt": data.updated_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.updated_at
else None,
},
)
else:
yield ObserveResult(
asset_key=asset_key,
metadata={
"workbook_id": data.workbook_id,
"owner_id": data.owner_id,
"name": data.name,
"contentUrl": data.content_url,
"createdAt": data.created_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.created_at
else None,
"updatedAt": data.updated_at.strftime("%Y-%m-%dT%H:%M:%S")
if data.updated_at
else None,
},
)
all_external_asset_specs = [
translator.get_asset_spec(content) for content in all_external_data
]

return [_assets]
return Definitions(assets=all_external_asset_specs)
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import re
from enum import Enum
from typing import Any, Mapping, Sequence
from typing import Any, Literal, Mapping, Optional, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet
from dagster._core.definitions.tags.tag_set import NamespacedTagSet
from dagster._record import record

TABLEAU_PREFIX = "tableau/"
Expand Down Expand Up @@ -85,6 +87,23 @@ def from_content_data(
)


class TableauTagSet(NamespacedTagSet):
asset_type: Optional[Literal["dashboard", "data_source", "sheet"]] = None

@classmethod
def namespace(cls) -> str:
return "dagster-tableau"


class TableauMetadataSet(NamespacedMetadataSet):
id: Optional[str] = None
workbook_id: Optional[str] = None

@classmethod
def namespace(cls) -> str:
return "dagster-tableau"


class DagsterTableauTranslator:
"""Translator class which converts raw response data from the Tableau API into AssetSpecs.
Subclass this class to implement custom logic for each type of Tableau content.
Expand Down Expand Up @@ -134,7 +153,12 @@ def get_sheet_spec(self, data: TableauContentData) -> AssetSpec:
return AssetSpec(
key=self.get_sheet_asset_key(data),
deps=data_source_keys if data_source_keys else None,
tags={"dagster/storage_kind": "tableau"},
tags={"dagster/storage_kind": "tableau", **TableauTagSet(asset_type="sheet")},
metadata={
**TableauMetadataSet(
id=data.properties["luid"], workbook_id=data.properties["workbook"]["luid"]
)
},
)

def get_dashboard_asset_key(self, data: TableauContentData) -> AssetKey:
Expand All @@ -160,7 +184,12 @@ def get_dashboard_spec(self, data: TableauContentData) -> AssetSpec:
return AssetSpec(
key=self.get_dashboard_asset_key(data),
deps=sheet_keys if sheet_keys else None,
tags={"dagster/storage_kind": "tableau"},
tags={"dagster/storage_kind": "tableau", **TableauTagSet(asset_type="dashboard")},
metadata={
**TableauMetadataSet(
id=data.properties["luid"], workbook_id=data.properties["workbook"]["luid"]
)
},
)

def get_data_source_asset_key(self, data: TableauContentData) -> AssetKey:
Expand All @@ -169,5 +198,6 @@ def get_data_source_asset_key(self, data: TableauContentData) -> AssetKey:
def get_data_source_spec(self, data: TableauContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
tags={"dagster/storage_kind": "tableau"},
tags={"dagster/storage_kind": "tableau", **TableauTagSet(asset_type="data_source")},
metadata={**TableauMetadataSet(id=data.properties["luid"], workbook_id=None)},
)
Loading

0 comments on commit e897e05

Please sign in to comment.