Skip to content

Commit

Permalink
Extract PBI subscription metadata (#599)
Browse files Browse the repository at this point in the history
* Extract PBI subscription metadata

* Update models version

* Improve test coverage

* Improve test coverage
  • Loading branch information
elic-eon authored Sep 18, 2023
1 parent 7534fdd commit aaa9c2d
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 10 deletions.
20 changes: 19 additions & 1 deletion metaphor/azure_data_factory/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Dataset,
DatasetLogicalID,
DatasetUpstream,
DependencyCondition,
Pipeline,
PipelineLogicalID,
PipelineType,
Expand Down Expand Up @@ -470,6 +471,20 @@ def _get_last_pipeline_run(
return response.value[0]
return None

@staticmethod
def _map_dependency_conditions(conditions: list) -> List[DependencyCondition]:
result: List[DependencyCondition] = []
for condition in conditions:
if isinstance(condition, str):
try:
result.append(DependencyCondition(condition))
except ValueError:
logger.warn(
f"Invalid enum value for DependencyCondition: {condition}"
)
continue
return result

@staticmethod
def _process_activities(
pipeline_entity_id: str,
Expand All @@ -478,12 +493,15 @@ def _process_activities(
factory_data_flows: Dict[str, DfModels.DataFlowResource],
) -> Tuple[List[AzureDataFactoryActivity], List[str], List[str]]:
metaphor_activities, sources, sinks = [], [], []

for activity in activities:
metaphor_activities.append(
AzureDataFactoryActivity(
depends_on=[
ActivityDependency(
dependency_conditions=dependent.dependency_conditions,
dependency_conditions=AzureDataFactoryExtractor._map_dependency_conditions(
dependent.dependency_conditions
),
name=dependent.activity,
)
for dependent in (activity.depends_on or [])
Expand Down
2 changes: 1 addition & 1 deletion metaphor/common/api_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_request(
if result.status_code == 200:
# Add JSON response to log.zip
file_name = (
f"{urlparse(url).path[1:].replace('/', u'__')}_{secrets.token_hex(1)}"
f"{urlparse(url).path[1:].replace('/', u'__')}_{secrets.token_hex(4)}"
)
# Avoid file name too long error and truncate prefix to avoid duplicate file name
# 250 is the lowest default maximum charactors file name length limit acrocess major file systems
Expand Down
76 changes: 76 additions & 0 deletions metaphor/power_bi/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
)
from metaphor.models.metadata_change_event import PowerBIDatasetTable, PowerBIInfo
from metaphor.models.metadata_change_event import PowerBIMeasure as PbiMeasure
from metaphor.models.metadata_change_event import PowerBISubscription as Subscription
from metaphor.models.metadata_change_event import (
PowerBISubscriptionUser as SubscriptionUser,
)
from metaphor.models.metadata_change_event import PowerBIWorkspace as PbiWorkspace
from metaphor.models.metadata_change_event import (
SourceInfo,
Expand All @@ -45,6 +49,8 @@
PowerBIClient,
PowerBIPage,
PowerBIRefresh,
PowerBISubscription,
PowerBiSubscriptionUser,
PowerBITile,
WorkspaceInfo,
)
Expand Down Expand Up @@ -98,6 +104,8 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
self.map_wi_reports_to_dashboard(workspace, app_map)
self.map_wi_dashboards_to_dashboard(workspace, app_map)

self.extract_subscriptions(workspaces)

self.dedupe_app_version_dashboards()

entities: List[ENTITY_TYPES] = []
Expand Down Expand Up @@ -336,6 +344,74 @@ def dedupe_app_version_dashboards(self):
)
del self._dashboards[dashboard_id]

def extract_subscriptions(self, workspaces: List[WorkspaceInfo]):
users = set(
user
for workspace in workspaces
for user in workspace.users
# Skipping report without datasetId
if user.principalType == "User"
)
subscriptions: Dict[str, PowerBISubscription] = {}

for user in users:
user_id = user.graphId
subscription_user = PowerBiSubscriptionUser(
emailAddress=user.emailAddress, displayName=user.displayName
)
user_subscriptions = self._client.get_user_subscriptions(user_id)

for user_subscription in user_subscriptions:
subscription = subscriptions.setdefault(
user_subscription.id, user_subscription
)
subscription.users.append(subscription_user)

for subscription in subscriptions.values():
dashboard = self._dashboards.get(subscription.artifactId)

if dashboard is None:
logger.warn(
f"Can't found related artifact for subscription: {subscription.id}"
)
continue

power_bi_info = dashboard.dashboard_info.power_bi
if power_bi_info.subscriptions is None:
power_bi_info.subscriptions = []

def safe_parse_date(datetime_str: Optional[str]) -> Optional[datetime]:
# Example date time 9/12/2023 12:00:00 AM
datetime_format = "%m/%d/%Y %I:%M:%S %p"
if not datetime_str:
return None
try:
return datetime.strptime(datetime_str, datetime_format).replace(
tzinfo=timezone.utc
)
except ValueError:
logger.warn(f"Unable to parse time: {datetime_str}")
return None

power_bi_info.subscriptions.append(
Subscription(
artifact_display_name=subscription.artifactDisplayName,
end_date=safe_parse_date(subscription.endDate),
start_date=safe_parse_date(subscription.startDate),
sub_artifact_display_name=subscription.subArtifactDisplayName,
frequency=subscription.frequency,
title=subscription.title,
id=subscription.id,
users=[
SubscriptionUser(
email_address=user.emailAddress,
display_name=user.displayName,
)
for user in subscription.users
],
)
)

@staticmethod
def _make_power_bi_info(
type: PowerBIDashboardType,
Expand Down
55 changes: 55 additions & 0 deletions metaphor/power_bi/power_bi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ class WorkspaceInfoReport(BaseModel):
description: str = ""


class WorkspaceInfoUser(BaseModel):
emailAddress: Optional[str]
groupUserAccessRight: str
displayName: Optional[str]
graphId: str
principalType: str

def __hash__(self):
return hash(self.graphId)


class WorkspaceInfo(BaseModel):
id: str
name: Optional[str]
Expand All @@ -132,6 +143,29 @@ class WorkspaceInfo(BaseModel):
reports: List[WorkspaceInfoReport] = []
datasets: List[WorkspaceInfoDataset] = []
dashboards: List[WorkspaceInfoDashboard] = []
users: List[WorkspaceInfoUser] = []


class PowerBiSubscriptionUser(BaseModel):
emailAddress: str
displayName: str


class PowerBISubscription(BaseModel):
id: str
artifactId: str
title: str
frequency: Optional[str] = None
endDate: Optional[str] = None
startDate: Optional[str] = None
artifactDisplayName: Optional[str] = None
subArtifactDisplayName: Optional[str] = None
users: List[PowerBiSubscriptionUser] = []


class SubscriptionsByUserResponse(BaseModel):
SubscriptionEntities: List[PowerBISubscription]
continuationUri: Optional[str]


class AccessTokenError(Exception):
Expand Down Expand Up @@ -238,6 +272,27 @@ def get_reports(self, group_id: str) -> List[PowerBIReport]:
url, List[PowerBIReport], transform_response=lambda r: r.json()["value"]
)

def get_user_subscriptions(self, user_id: str) -> List[PowerBISubscription]:
# https://learn.microsoft.com/en-us/rest/api/power-bi/admin/users-get-user-subscriptions-as-admin
url = f"{self.API_ENDPOINT}/admin/users/{user_id}/subscriptions"

continuationUri: Optional[str] = url
subscriptions: List[PowerBISubscription] = []
while continuationUri:
try:
response = self._call_get(continuationUri, SubscriptionsByUserResponse)
except EntityNotFoundError:
logger.error(f"Unable to find user {user_id} in workspace.")
break

continuationUri = response.continuationUri
chunk = response.SubscriptionEntities
if len(chunk) == 0:
break
subscriptions += chunk

return subscriptions

def get_pages(self, group_id: str, report_id: str) -> List[PowerBIPage]:
# https://docs.microsoft.com/en-us/rest/api/power-bi/reports/get-pages-in-group
url = f"{self.API_ENDPOINT}/groups/{group_id}/reports/{report_id}/pages"
Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.12.45"
version = "0.12.46"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down Expand Up @@ -29,7 +29,7 @@ google-cloud-logging = { version = "^3.5.0", optional = true }
jsonschema = "^4.18.6"
lkml = { version = "^1.3.1", optional = true }
looker-sdk = { version = "^23.6.0", optional = true }
metaphor-models = "0.26.7"
metaphor-models = "0.26.9"
msal = { version = "^1.20.0", optional = true }
pycarlo = { version = "^0.8.1", optional = true }
pydantic = "^1.10.0"
Expand Down
2 changes: 1 addition & 1 deletion tests/azure_data_factory/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def mock_list_pipelines(factory_name, resource_group_name):
"dependsOn": [
{
"activity": "Data flow1",
"dependencyConditions": ["Succeeded"],
"dependencyConditions": ["Succeeded", "Foo"],
}
],
"userProperties": [],
Expand Down
29 changes: 29 additions & 0 deletions tests/power_bi/data/user_subscriptions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"SubscriptionEntities": [
{
"id": "some-uuid",
"title": "Subscription title",
"artifactId": "dashboard-uuid",
"artifactDisplayName": "Report Name",
"subArtifactDisplayName": "Page Name",
"artifactType": "Report",
"isEnabled": true,
"frequency": "Daily",
"startDate": "9/12/2023 12:00:00 AM",
"endDate": "9/12/2024 12:00:00 AM",
"linkToContent": true,
"previewImage": true,
"attachmentFormat": "PNG",
"owner": {
"emailAddress": "[email protected]",
"displayName": "Test account",
"identifier": "[email protected]",
"graphId": "user-uuid",
"principalType": "User"
},
"users": []
}
],
"continuationUri": "https://continuationUri.url/",
"continuationToken": "token"
}
18 changes: 17 additions & 1 deletion tests/power_bi/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,23 @@
"workspace": {
"id": "workspace-1",
"name": "Workspace"
}
},
"subscriptions": [
{
"artifactDisplayName": "Dashboard A",
"endDate": "2000-09-06T00:13:52+00:00",
"startDate": "1998-11-30T17:05:52+00:00",
"frequency": "Daily",
"id": "subscription-1",
"title": "First Subscription",
"users": [
{
"displayName": "Metaphor",
"emailAddress": "[email protected]"
}
]
}
]
},
"title": "Dashboard A"
},
Expand Down
Loading

0 comments on commit aaa9c2d

Please sign in to comment.