From aaa9c2da289e07058326137cd5c706998f2a9113 Mon Sep 17 00:00:00 2001 From: Scott Ssuyi Huang Date: Tue, 19 Sep 2023 00:15:08 +0800 Subject: [PATCH] Extract PBI subscription metadata (#599) * Extract PBI subscription metadata * Update models version * Improve test coverage * Improve test coverage --- metaphor/azure_data_factory/extractor.py | 20 +++++- metaphor/common/api_request.py | 2 +- metaphor/power_bi/extractor.py | 76 +++++++++++++++++++++ metaphor/power_bi/power_bi_client.py | 55 +++++++++++++++ poetry.lock | 8 +-- pyproject.toml | 4 +- tests/azure_data_factory/test_extractor.py | 2 +- tests/power_bi/data/user_subscriptions.json | 29 ++++++++ tests/power_bi/expected.json | 18 ++++- tests/power_bi/test_extractor.py | 43 ++++++++++++ tests/power_bi/test_power_bi_client.py | 64 +++++++++++++++++ 11 files changed, 311 insertions(+), 10 deletions(-) create mode 100644 tests/power_bi/data/user_subscriptions.json create mode 100644 tests/power_bi/test_power_bi_client.py diff --git a/metaphor/azure_data_factory/extractor.py b/metaphor/azure_data_factory/extractor.py index 4716c650..3f66a9e5 100644 --- a/metaphor/azure_data_factory/extractor.py +++ b/metaphor/azure_data_factory/extractor.py @@ -28,6 +28,7 @@ Dataset, DatasetLogicalID, DatasetUpstream, + DependencyCondition, Pipeline, PipelineLogicalID, PipelineType, @@ -470,6 +471,20 @@ def _get_last_pipeline_run( return response.value[0] return None + @staticmethod + def _map_dependency_conditions(conditions: list) -> List[DependencyCondition]: + result: List[DependencyCondition] = [] + for condition in conditions: + if isinstance(condition, str): + try: + result.append(DependencyCondition(condition)) + except ValueError: + logger.warn( + f"Invalid enum value for DependencyCondition: {condition}" + ) + continue + return result + @staticmethod def _process_activities( pipeline_entity_id: str, @@ -478,12 +493,15 @@ def _process_activities( factory_data_flows: Dict[str, DfModels.DataFlowResource], ) -> Tuple[List[AzureDataFactoryActivity], List[str], List[str]]: metaphor_activities, sources, sinks = [], [], [] + for activity in activities: metaphor_activities.append( AzureDataFactoryActivity( depends_on=[ ActivityDependency( - dependency_conditions=dependent.dependency_conditions, + dependency_conditions=AzureDataFactoryExtractor._map_dependency_conditions( + dependent.dependency_conditions + ), name=dependent.activity, ) for dependent in (activity.depends_on or []) diff --git a/metaphor/common/api_request.py b/metaphor/common/api_request.py index 13c7385d..f0576f5a 100644 --- a/metaphor/common/api_request.py +++ b/metaphor/common/api_request.py @@ -33,7 +33,7 @@ def get_request( if result.status_code == 200: # Add JSON response to log.zip file_name = ( - f"{urlparse(url).path[1:].replace('/', u'__')}_{secrets.token_hex(1)}" + f"{urlparse(url).path[1:].replace('/', u'__')}_{secrets.token_hex(4)}" ) # Avoid file name too long error and truncate prefix to avoid duplicate file name # 250 is the lowest default maximum charactors file name length limit acrocess major file systems diff --git a/metaphor/power_bi/extractor.py b/metaphor/power_bi/extractor.py index 434582c3..37e768e5 100644 --- a/metaphor/power_bi/extractor.py +++ b/metaphor/power_bi/extractor.py @@ -32,6 +32,10 @@ ) from metaphor.models.metadata_change_event import PowerBIDatasetTable, PowerBIInfo from metaphor.models.metadata_change_event import PowerBIMeasure as PbiMeasure +from metaphor.models.metadata_change_event import PowerBISubscription as Subscription +from metaphor.models.metadata_change_event import ( + PowerBISubscriptionUser as SubscriptionUser, +) from metaphor.models.metadata_change_event import PowerBIWorkspace as PbiWorkspace from metaphor.models.metadata_change_event import ( SourceInfo, @@ -45,6 +49,8 @@ PowerBIClient, PowerBIPage, PowerBIRefresh, + PowerBISubscription, + PowerBiSubscriptionUser, PowerBITile, WorkspaceInfo, ) @@ -98,6 +104,8 @@ async def extract(self) -> Collection[ENTITY_TYPES]: self.map_wi_reports_to_dashboard(workspace, app_map) self.map_wi_dashboards_to_dashboard(workspace, app_map) + self.extract_subscriptions(workspaces) + self.dedupe_app_version_dashboards() entities: List[ENTITY_TYPES] = [] @@ -336,6 +344,74 @@ def dedupe_app_version_dashboards(self): ) del self._dashboards[dashboard_id] + def extract_subscriptions(self, workspaces: List[WorkspaceInfo]): + users = set( + user + for workspace in workspaces + for user in workspace.users + # Skipping report without datasetId + if user.principalType == "User" + ) + subscriptions: Dict[str, PowerBISubscription] = {} + + for user in users: + user_id = user.graphId + subscription_user = PowerBiSubscriptionUser( + emailAddress=user.emailAddress, displayName=user.displayName + ) + user_subscriptions = self._client.get_user_subscriptions(user_id) + + for user_subscription in user_subscriptions: + subscription = subscriptions.setdefault( + user_subscription.id, user_subscription + ) + subscription.users.append(subscription_user) + + for subscription in subscriptions.values(): + dashboard = self._dashboards.get(subscription.artifactId) + + if dashboard is None: + logger.warn( + f"Can't found related artifact for subscription: {subscription.id}" + ) + continue + + power_bi_info = dashboard.dashboard_info.power_bi + if power_bi_info.subscriptions is None: + power_bi_info.subscriptions = [] + + def safe_parse_date(datetime_str: Optional[str]) -> Optional[datetime]: + # Example date time 9/12/2023 12:00:00 AM + datetime_format = "%m/%d/%Y %I:%M:%S %p" + if not datetime_str: + return None + try: + return datetime.strptime(datetime_str, datetime_format).replace( + tzinfo=timezone.utc + ) + except ValueError: + logger.warn(f"Unable to parse time: {datetime_str}") + return None + + power_bi_info.subscriptions.append( + Subscription( + artifact_display_name=subscription.artifactDisplayName, + end_date=safe_parse_date(subscription.endDate), + start_date=safe_parse_date(subscription.startDate), + sub_artifact_display_name=subscription.subArtifactDisplayName, + frequency=subscription.frequency, + title=subscription.title, + id=subscription.id, + users=[ + SubscriptionUser( + email_address=user.emailAddress, + display_name=user.displayName, + ) + for user in subscription.users + ], + ) + ) + @staticmethod def _make_power_bi_info( type: PowerBIDashboardType, diff --git a/metaphor/power_bi/power_bi_client.py b/metaphor/power_bi/power_bi_client.py index e25a869f..d7fbb434 100644 --- a/metaphor/power_bi/power_bi_client.py +++ b/metaphor/power_bi/power_bi_client.py @@ -124,6 +124,17 @@ class WorkspaceInfoReport(BaseModel): description: str = "" +class WorkspaceInfoUser(BaseModel): + emailAddress: Optional[str] + groupUserAccessRight: str + displayName: Optional[str] + graphId: str + principalType: str + + def __hash__(self): + return hash(self.graphId) + + class WorkspaceInfo(BaseModel): id: str name: Optional[str] @@ -132,6 +143,29 @@ class WorkspaceInfo(BaseModel): reports: List[WorkspaceInfoReport] = [] datasets: List[WorkspaceInfoDataset] = [] dashboards: List[WorkspaceInfoDashboard] = [] + users: List[WorkspaceInfoUser] = [] + + +class PowerBiSubscriptionUser(BaseModel): + emailAddress: str + displayName: str + + +class PowerBISubscription(BaseModel): + id: str + artifactId: str + title: str + frequency: Optional[str] = None + endDate: Optional[str] = None + startDate: Optional[str] = None + artifactDisplayName: Optional[str] = None + subArtifactDisplayName: Optional[str] = None + users: List[PowerBiSubscriptionUser] = [] + + +class SubscriptionsByUserResponse(BaseModel): + SubscriptionEntities: List[PowerBISubscription] + continuationUri: Optional[str] class AccessTokenError(Exception): @@ -238,6 +272,27 @@ def get_reports(self, group_id: str) -> List[PowerBIReport]: url, List[PowerBIReport], transform_response=lambda r: r.json()["value"] ) + def get_user_subscriptions(self, user_id: str) -> List[PowerBISubscription]: + # https://learn.microsoft.com/en-us/rest/api/power-bi/admin/users-get-user-subscriptions-as-admin + url = f"{self.API_ENDPOINT}/admin/users/{user_id}/subscriptions" + + continuationUri: Optional[str] = url + subscriptions: List[PowerBISubscription] = [] + while continuationUri: + try: + response = self._call_get(continuationUri, SubscriptionsByUserResponse) + except EntityNotFoundError: + logger.error(f"Unable to find user {user_id} in workspace.") + break + + continuationUri = response.continuationUri + chunk = response.SubscriptionEntities + if len(chunk) == 0: + break + subscriptions += chunk + + return subscriptions + def get_pages(self, group_id: str, report_id: str) -> List[PowerBIPage]: # https://docs.microsoft.com/en-us/rest/api/power-bi/reports/get-pages-in-group url = f"{self.API_ENDPOINT}/groups/{group_id}/reports/{report_id}/pages" diff --git a/poetry.lock b/poetry.lock index f4f95151..8887f100 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3034,13 +3034,13 @@ files = [ [[package]] name = "metaphor-models" -version = "0.26.7" +version = "0.26.9" description = "" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "metaphor_models-0.26.7-py3-none-any.whl", hash = "sha256:2ae428a118dd56dcfde5499eb7ee15f332fea4328347a09850ad65eb8de83878"}, - {file = "metaphor_models-0.26.7.tar.gz", hash = "sha256:c07f78fea7d410ed7c587cf4a39f0c1e8247195dcc5e5f90ba140e291e7ad258"}, + {file = "metaphor_models-0.26.9-py3-none-any.whl", hash = "sha256:0c5376dff06eff15aea886848b7e590a721a2839e1231f83a2951cb691496dea"}, + {file = "metaphor_models-0.26.9.tar.gz", hash = "sha256:9113b331dc4a56f39ddb7ea2d9704f46b07c63cd79ae76f9796f82024592c75f"}, ] [[package]] @@ -5404,4 +5404,4 @@ unity-catalog = ["databricks-cli"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "902b043a745cde73b379355888921192aedaf851a2a03b8c6e5e9f7c6ca3c494" +content-hash = "40030235c74b57d909dd619acd23c8e1bec2734ab3b8198e7579f43343d8b366" diff --git a/pyproject.toml b/pyproject.toml index 17e90733..1189d143 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.12.45" +version = "0.12.46" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] @@ -29,7 +29,7 @@ google-cloud-logging = { version = "^3.5.0", optional = true } jsonschema = "^4.18.6" lkml = { version = "^1.3.1", optional = true } looker-sdk = { version = "^23.6.0", optional = true } -metaphor-models = "0.26.7" +metaphor-models = "0.26.9" msal = { version = "^1.20.0", optional = true } pycarlo = { version = "^0.8.1", optional = true } pydantic = "^1.10.0" diff --git a/tests/azure_data_factory/test_extractor.py b/tests/azure_data_factory/test_extractor.py index fc051d59..c27e9840 100644 --- a/tests/azure_data_factory/test_extractor.py +++ b/tests/azure_data_factory/test_extractor.py @@ -177,7 +177,7 @@ def mock_list_pipelines(factory_name, resource_group_name): "dependsOn": [ { "activity": "Data flow1", - "dependencyConditions": ["Succeeded"], + "dependencyConditions": ["Succeeded", "Foo"], } ], "userProperties": [], diff --git a/tests/power_bi/data/user_subscriptions.json b/tests/power_bi/data/user_subscriptions.json new file mode 100644 index 00000000..e7a6baa2 --- /dev/null +++ b/tests/power_bi/data/user_subscriptions.json @@ -0,0 +1,29 @@ +{ + "SubscriptionEntities": [ + { + "id": "some-uuid", + "title": "Subscription title", + "artifactId": "dashboard-uuid", + "artifactDisplayName": "Report Name", + "subArtifactDisplayName": "Page Name", + "artifactType": "Report", + "isEnabled": true, + "frequency": "Daily", + "startDate": "9/12/2023 12:00:00 AM", + "endDate": "9/12/2024 12:00:00 AM", + "linkToContent": true, + "previewImage": true, + "attachmentFormat": "PNG", + "owner": { + "emailAddress": "test@testdomain.abc", + "displayName": "Test account", + "identifier": "test@testdomain.abc", + "graphId": "user-uuid", + "principalType": "User" + }, + "users": [] + } + ], + "continuationUri": "https://continuationUri.url/", + "continuationToken": "token" +} diff --git a/tests/power_bi/expected.json b/tests/power_bi/expected.json index a95ca662..3b24e816 100644 --- a/tests/power_bi/expected.json +++ b/tests/power_bi/expected.json @@ -306,7 +306,23 @@ "workspace": { "id": "workspace-1", "name": "Workspace" - } + }, + "subscriptions": [ + { + "artifactDisplayName": "Dashboard A", + "endDate": "2000-09-06T00:13:52+00:00", + "startDate": "1998-11-30T17:05:52+00:00", + "frequency": "Daily", + "id": "subscription-1", + "title": "First Subscription", + "users": [ + { + "displayName": "Metaphor", + "emailAddress": "powerbi@metaphor.io" + } + ] + } + ] }, "title": "Dashboard A" }, diff --git a/tests/power_bi/test_extractor.py b/tests/power_bi/test_extractor.py index aae802c2..656e6687 100644 --- a/tests/power_bi/test_extractor.py +++ b/tests/power_bi/test_extractor.py @@ -14,6 +14,7 @@ PowerBIPage, PowerBIRefresh, PowerBIReport, + PowerBISubscription, PowerBITable, PowerBITableColumn, PowerBITableMeasure, @@ -22,6 +23,7 @@ WorkspaceInfoDashboard, WorkspaceInfoDataset, WorkspaceInfoReport, + WorkspaceInfoUser, ) from tests.test_utils import load_json @@ -216,6 +218,8 @@ async def test_extractor(mock_client: MagicMock, test_root_dir: str): ], ) ], + upstreamDataflows=None, + upstreamDatasets=None, ), WorkspaceInfoDataset( configuredBy="bob@foo.com", @@ -250,6 +254,8 @@ async def test_extractor(mock_client: MagicMock, test_root_dir: str): ], ), ], + upstreamDataflows=None, + upstreamDatasets=None, ), ], dashboards=[ @@ -264,6 +270,22 @@ async def test_extractor(mock_client: MagicMock, test_root_dir: str): ), WorkspaceInfoDashboard(displayName="Dashboard B", id=dashboard2_id), ], + users=[ + WorkspaceInfoUser( + emailAddress="powerbi@metaphor.io", + groupUserAccessRight="Viewer", + displayName="Metaphor", + graphId="user-id", + principalType="User", + ), + WorkspaceInfoUser( + emailAddress=None, + groupUserAccessRight="Viewer", + displayName="Group", + graphId="group-id", + principalType="Group", + ), + ], ) ] ) @@ -289,6 +311,26 @@ def fake_get_refreshes(workspace_id: str, dataset_id: str) -> List[PowerBIRefres def fake_get_apps() -> List[PowerBIApp]: return [app1, app2] + def fake_get_user_subscriptions(user_id: str) -> List[PowerBISubscription]: + return [ + PowerBISubscription( + id="subscription-1", + artifactId=dashboard1.id, + title="First Subscription", + frequency="Daily", + endDate="9/6/2000 12:13:52 AM", + startDate="11/30/1998 5:05:52 PM", + artifactDisplayName=dashboard1.displayName, + subArtifactDisplayName=None, + users=[], + ), + PowerBISubscription( + id="subscription-2", + artifactId="some-random-id", + title="", + ), + ] + mock_instance.get_datasets.side_effect = fake_get_datasets mock_instance.get_reports.side_effect = fake_get_reports mock_instance.get_dashboards.side_effect = fake_get_dashboards @@ -296,6 +338,7 @@ def fake_get_apps() -> List[PowerBIApp]: mock_instance.get_pages.side_effect = fake_get_pages mock_instance.get_refreshes.side_effect = fake_get_refreshes mock_instance.get_apps.side_effect = fake_get_apps + mock_instance.get_user_subscriptions = fake_get_user_subscriptions mock_client.return_value = mock_instance diff --git a/tests/power_bi/test_power_bi_client.py b/tests/power_bi/test_power_bi_client.py new file mode 100644 index 00000000..59bbad29 --- /dev/null +++ b/tests/power_bi/test_power_bi_client.py @@ -0,0 +1,64 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from metaphor.common.base_config import OutputConfig +from metaphor.power_bi.config import PowerBIRunConfig +from metaphor.power_bi.power_bi_client import PowerBIClient, PowerBISubscription +from tests.test_utils import load_json + + +class MockResponse: + def __init__(self, json_data, status_code=200): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + + def raise_for_status(self): + return + + +@patch("requests.get") +@patch("msal.ConfidentialClientApplication") +@pytest.mark.asyncio +async def test_extractor( + mock_msal_app: MagicMock, mock_get_method: MagicMock, test_root_dir: str +): + mock_msal_app = MagicMock() + mock_msal_app.acquire_token_silent = MagicMock( + return_value={"access_token": "token"} + ) + + mock_get_method.side_effect = [ + MockResponse( + load_json(f"{test_root_dir}/power_bi/data/user_subscriptions.json") + ), + MockResponse({"SubscriptionEntities": []}), + ] + client = PowerBIClient( + PowerBIRunConfig( + tenant_id="tenant-id", + client_id="client-id", + secret="secret", + output=OutputConfig(), + ) + ) + + subscriptions = client.get_user_subscriptions("user_id") + assert len(subscriptions) == 1 + assert ( + subscriptions[0].dict() + == PowerBISubscription( + id="some-uuid", + artifactId="dashboard-uuid", + title="Subscription title", + frequency="Daily", + endDate="9/12/2024 12:00:00 AM", + startDate="9/12/2023 12:00:00 AM", + artifactDisplayName="Report Name", + subArtifactDisplayName="Page Name", + users=[], + ).dict() + )