Skip to content

Commit

Permalink
Filter dbt cloud jobs by environment id (#749)
Browse files Browse the repository at this point in the history
* Filter dbt cloud jobs by environment id

* bump version

* DBT model lineage should parse snapshot as valid entity upstream (#750)
  • Loading branch information
usefulalgorithm authored Jan 10, 2024
1 parent c354d18 commit b8f9dc2
Show file tree
Hide file tree
Showing 11 changed files with 1,196 additions and 794 deletions.
2 changes: 1 addition & 1 deletion metaphor/dbt/artifact_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ def _parse_depends_on(
[
get_virtual_view_id(self._virtual_views[n].logical_id)
for n in depends_on.nodes
if n.startswith("model.")
if n.startswith("model.") or n.startswith("snapshot.")
]
)

Expand Down
10 changes: 10 additions & 0 deletions metaphor/dbt/cloud/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ If you're using dbt [Single Tenancy](https://docs.getdbt.com/docs/cloud/about-cl
base_url: https://cloud.<tenant>.getdbt.com
```

#### Environment IDs

```yaml
environment_ids:
- <environment_id_1>
- <environment_id_2>
```

If `environment_ids` are specified, only jobs run within those environments are collected. If it is not provided, all dbt jobs will be collected.

## Testing

Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `dbt` extra.
Expand Down
18 changes: 17 additions & 1 deletion metaphor/dbt/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ class DbtAdminAPIClient:
See https://docs.getdbt.com/dbt-cloud/api-v2 for more details.
"""

def __init__(self, base_url: str, account_id: int, service_token: str):
def __init__(
self,
base_url: str,
account_id: int,
service_token: str,
included_env_ids: Set[int] = set(),
):
self.admin_api_base_url = f"{base_url}/api/v2"
self.account_id = account_id
self.service_token = service_token
self.included_env_ids = included_env_ids

def _get(self, path: str, params: Optional[Dict] = None):
url = f"{self.admin_api_base_url}/accounts/{self.account_id}/{path}"
Expand Down Expand Up @@ -71,6 +78,15 @@ def get_project_jobs(self, project_id: int) -> List[int]:
jobs |= new_jobs
offset += page_size

def is_job_included(self, job_id: int) -> bool:
if len(self.included_env_ids) == 0:
# No excluded environment, just return True
return True

resp = self._get(f"jobs/{job_id}")
data = resp.get("data")
return int(data.get("environment_id", -1)) in self.included_env_ids

def get_last_successful_run(self, job_id: int) -> DbtRun:
"""Get the run ID of the last successful run for a job"""

Expand Down
9 changes: 6 additions & 3 deletions metaphor/dbt/cloud/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import field as dataclass_field
from typing import List
from typing import List, Set

from pydantic.dataclasses import dataclass

Expand All @@ -17,10 +17,13 @@ class DbtCloudConfig(BaseConfig):
service_token: str

# dbt cloud job IDs
job_ids: List[int] = dataclass_field(default_factory=list)
job_ids: Set[int] = dataclass_field(default_factory=set)

# dbt cloud project IDs
project_ids: List[int] = dataclass_field(default_factory=list)
project_ids: Set[int] = dataclass_field(default_factory=set)

# dbt cloud environment IDs to include. If specified, only jobs run in the provided environments will be crawled.
environment_ids: Set[int] = dataclass_field(default_factory=set)

# map meta field to ownerships
meta_ownerships: List[MetaOwnership] = dataclass_field(default_factory=list)
Expand Down
7 changes: 6 additions & 1 deletion metaphor/dbt/cloud/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def from_config_file(config_file: str) -> "DbtCloudExtractor":
def __init__(self, config: DbtCloudConfig):
super().__init__(config)
self._account_id = config.account_id
self._job_ids = set(config.job_ids)
self._job_ids = config.job_ids
self._project_ids = config.project_ids
self._service_token = config.service_token
self._meta_ownerships = config.meta_ownerships
Expand All @@ -39,6 +39,7 @@ def __init__(self, config: DbtCloudConfig):
base_url=self._base_url,
account_id=self._account_id,
service_token=self._service_token,
included_env_ids=config.environment_ids,
)

async def extract(self) -> Collection[ENTITY_TYPES]:
Expand All @@ -53,6 +54,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
return [item for ls in self._entities.values() for item in ls]

async def _extract_last_run(self, job_id: int):
if not self._client.is_job_included(job_id):
logger.info(f"Ignoring job ID: {job_id}")
return

logger.info(f"Fetching metadata for job ID: {job_id}")

run = self._client.get_last_successful_run(job_id)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.102"
version = "0.13.103"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
50 changes: 50 additions & 0 deletions tests/dbt/cloud/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,53 @@ def test_get_run_artifact(mock_requests):
},
timeout=600,
)


@patch("metaphor.dbt.cloud.client.requests")
def test_job_is_included(mock_requests):
client = DbtAdminAPIClient(
base_url="http://base.url",
account_id=1111,
service_token="service_token",
included_env_ids={1, 3},
)

def mock_get(url: str, **kwargs):
job_id = int(url.rsplit("/", 1)[-1])
if job_id == 1:
return Response(
200,
{
"data": {
"environment_id": 1,
}
},
)
elif job_id == 2:
return Response(
200,
{
"data": {
"environment_id": 2,
}
},
)
elif job_id == 3:
return Response(
200,
{
"data": {
"environment_id": 4,
}
},
)
return Response(404, {})

mock_requests.get = mock_get

for i in range(1, 4):
included = client.is_job_included(i)
if i == 1:
assert included
else:
assert not included
14 changes: 10 additions & 4 deletions tests/dbt/cloud/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ async def test_extractor(
)
)
mock_client.get_project_jobs = MagicMock(side_effect=[[8888], [2222]])

def mock_is_job_included(job_id: int) -> bool:
return job_id != 3333

mock_client.is_job_included = mock_is_job_included
mock_client.get_snowflake_account = MagicMock(return_value="snowflake_account")
mock_client.get_run_artifact = MagicMock(return_value="tempfile")

Expand All @@ -39,8 +44,9 @@ async def fake_extract():
config = DbtCloudConfig(
output=OutputConfig(),
account_id=1111,
job_ids=[2222],
project_ids=[6666, 4444],
job_ids={2222, 3333},
project_ids={6666, 4444},
environment_ids={1},
base_url="https://cloud.metaphor.getdbt.com",
service_token="service_token",
)
Expand Down Expand Up @@ -80,8 +86,8 @@ async def fake_extract():
config = DbtCloudConfig(
output=OutputConfig(),
account_id=1111,
job_ids=[2222],
project_ids=[6666, 4444],
job_ids={2222},
project_ids={6666, 4444},
base_url="https://cloud.metaphor.getdbt.com",
service_token="service_token",
)
Expand Down
40 changes: 39 additions & 1 deletion tests/dbt/data/ride_share/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
},
{
"dbtModel": {
"compiledSql": "WITH stations AS (\n\n SELECT *\n FROM DEMO_DB.METAPHOR.raw_bike_stations\n\n),\n\nrides AS (\n\n SELECT *\n FROM DEMO_DB.METAPHOR.cleaned_bike_rides\n\n),\n\nstart_stat_join AS (\n\n SELECT rides.*\n , stations.bikes_count as start_station_bikes_count\n , stations.docks_count as start_station_docks_count\n , stations.install_date as start_station_install_date\n FROM rides\n LEFT JOIN stations\n ON rides.start_station_id = stations.id\n)\n\nSELECT \n total_minutes \n , total_bike_hires \n , average_duration \n , month \n , start_peak_travel\n , same_station_flag\n , start_station_id\n , start_station_name\n , start_station_bikes_count \n , start_station_docks_count \n , start_station_install_date \n , end_station_id\n , end_station_name\n , stations.bikes_count as end_station_bikes_count\n , stations.docks_count as end_station_docks_count\n , stations.install_date as end_station_install_date\nFROM start_stat_join\nLEFT JOIN stations\nON start_stat_join.end_station_id = stations.id",
"docsUrl": "http://localhost:8080/#!/model/model.london_bike_analysis.rides_by_month_2017",
"fields": [],
"materialization": {
Expand Down Expand Up @@ -63,6 +64,7 @@
},
{
"dbtModel": {
"compiledSql": "-- Adding extra fields including if the bike was rented during peak time \nSELECT\n SUM(duration_minutes) as total_minutes\n , COUNT(rental_id) as total_bike_hires\n , ROUND(SUM(duration_minutes) / COUNT(rental_id), 2) AS average_duration\n , EXTRACT(month from start_date) as month\n , CASE\n WHEN EXTRACT(HOUR from TO_TIMESTAMP(start_date)) >= 6 AND EXTRACT(HOUR from TO_TIMESTAMP(start_date)) <= 10 THEN 'Morning Peak'\n WHEN EXTRACT(HOUR from TO_TIMESTAMP(start_date)) >= 16 AND EXTRACT(HOUR from TO_TIMESTAMP(start_date)) <= 19 THEN 'Evening Peak'\n ELSE 'Off-Peak'\n END AS start_peak_travel\n , IFF(start_station_id = end_station_id, True, False) as same_station_flag\n , start_station_id\n , start_station_name\n , end_station_id\n , end_station_name\nFROM DEMO_DB.METAPHOR.raw_bike_hires\nGROUP BY 4,5,6,7,8,9,10\nORDER BY total_minutes DESC",
"description": "This table contains a transformed version of the raw_bike_hires table, which includes additional calculated fields such as creating a duration in minutes field. Each ride has been aggregated so any journey that starts and ends at the same station, in the same month and roughly time of day are aggregated together to get the total minutes similar journeys have taken.\n",
"docsUrl": "http://localhost:8080/#!/model/model.london_bike_analysis.cleaned_bike_rides",
"fields": [
Expand Down Expand Up @@ -236,6 +238,7 @@
},
{
"dbtModel": {
"compiledSql": "SELECT \n SUM(total_minutes) AS total_minutes\n , ROUND(SUM(total_minutes) / 60 ,2) AS total_hours\n , SUM(total_bike_hires) AS total_bike_hires\n , ROUND(SUM(total_minutes) / SUM(total_bike_hires), 2) AS average_duration_in_minutes\n , month\n , start_peak_travel\n , same_station_flag\n , start_station_id\n , start_station_name\n , start_station_bikes_count\n , start_station_docks_count\n , start_station_install_date\nFROM DEMO_DB.METAPHOR.rides_by_month_2017\nGROUP BY 5,6,7,8,9,10,11,12\nORDER BY total_bike_hires DESC",
"docsUrl": "http://localhost:8080/#!/model/model.london_bike_analysis.rides_by_month_start_station_2017",
"fields": [],
"materialization": {
Expand Down Expand Up @@ -269,6 +272,7 @@
},
{
"dbtModel": {
"compiledSql": "SELECT \n rental_id\n , duration as duration_seconds\n , duration / 60 as duration_minutes\n , bike_id\n , start_date\n , start_station_id\n , start_station_name\n , end_date\n , end_station_id\n , end_station_name\nFROM DEMO_DB.metaphor.cycle_hire\nWHERE EXTRACT(year from start_date) = 2017",
"description": "This table contains all bike hires in London in 2017. This is the raw dataset so no cleaning or transformation.",
"docsUrl": "http://localhost:8080/#!/model/model.london_bike_analysis.raw_bike_hires",
"fields": [],
Expand Down Expand Up @@ -314,6 +318,7 @@
},
{
"dbtModel": {
"compiledSql": "SELECT \n id\n , name as station_name\n , bikes_count\n , docks_count\n , install_date\n , removal_date\nFROM DEMO_DB.metaphor.cycle_stations\nWHERE install_date < '2017-01-01' and (removal_date < '2018-01-01' or removal_date is null)",
"description": "This table contains all bike stations in the London area. This only includes stations intalled before January 1, 2017 and doesn't include stations that were removed in 2017 (before Jan 1 2018). This is the raw data so no cleaning or transformation.",
"docsUrl": "http://localhost:8080/#!/model/model.london_bike_analysis.raw_bike_stations",
"fields": [
Expand Down Expand Up @@ -356,7 +361,40 @@
},
{
"dbtModel": {
"compiledSql": "\n\n\nselect * from DEMO_DB.metaphor.cycle_hire",
"compiledSql": "-- Adding extra fields including if the bike was rented during peak time \nSELECT\n SUM(duration) as total_seconds\n , COUNT(rental_id) as total_bike_hires\n , ROUND(SUM(duration) / COUNT(rental_id), 2) AS average_duration\n , EXTRACT(month from start_date) as month\n , CASE\n WHEN EXTRACT(HOUR from TO_TIMESTAMP(start_date)) >= 6 AND EXTRACT(HOUR from TO_TIMESTAMP(start_date)) <= 10 THEN 'Morning Peak'\n WHEN EXTRACT(HOUR from TO_TIMESTAMP(start_date)) >= 16 AND EXTRACT(HOUR from TO_TIMESTAMP(start_date)) <= 19 THEN 'Evening Peak'\n ELSE 'Off-Peak'\n END AS start_peak_travel\n , IFF(start_station_id = end_station_id, True, False) as same_station_flag\n , start_station_id\n , start_station_name\n , end_station_id\n , end_station_name\nFROM DEMO_DB.snapshots.cycle_hire_snapshot\nGROUP BY 4,5,6,7,8,9,10\nORDER BY total_seconds DESC",
"docsUrl": "http://localhost:8080/#!/model/model.london_bike_analysis.cleaned_bike_rides_from_snapshot",
"fields": [],
"materialization": {
"targetDataset": "DATASET~9EC8C9186E2155503243FED495387698",
"type": "TABLE"
},
"packageName": "london_bike_analysis",
"rawSql": "-- Adding extra fields including if the bike was rented during peak time \r\nSELECT\r\n SUM(duration) as total_seconds\r\n , COUNT(rental_id) as total_bike_hires\r\n , ROUND(SUM(duration) / COUNT(rental_id), 2) AS average_duration\r\n , EXTRACT(month from start_date) as month\r\n , CASE\r\n WHEN EXTRACT(HOUR from TO_TIMESTAMP(start_date)) >= 6 AND EXTRACT(HOUR from TO_TIMESTAMP(start_date)) <= 10 THEN 'Morning Peak'\r\n WHEN EXTRACT(HOUR from TO_TIMESTAMP(start_date)) >= 16 AND EXTRACT(HOUR from TO_TIMESTAMP(start_date)) <= 19 THEN 'Evening Peak'\r\n ELSE 'Off-Peak'\r\n END AS start_peak_travel\r\n , IFF(start_station_id = end_station_id, True, False) as same_station_flag\r\n , start_station_id\r\n , start_station_name\r\n , end_station_id\r\n , end_station_name\r\nFROM {{ ref('cycle_hire_snapshot') }}\r\nGROUP BY 4,5,6,7,8,9,10\r\nORDER BY total_seconds DESC",
"sourceDatasets": [],
"sourceModels": [
"VIRTUAL_VIEW~FEE8405461EBC519C4D9B3A20C4E251C"
],
"tags": [],
"url": "https://github.com/MetaphorData/dbt/tree/main/ride_share/models/rides/cleaned_bike_rides_from_snapshot.sql"
},
"entityUpstream": {
"sourceEntities": [
"VIRTUAL_VIEW~FEE8405461EBC519C4D9B3A20C4E251C"
]
},
"logicalId": {
"name": "london_bike_analysis.cleaned_bike_rides_from_snapshot",
"type": "DBT_MODEL"
},
"structure": {
"directories": [
"london_bike_analysis"
],
"name": "cleaned_bike_rides_from_snapshot"
}
},
{
"dbtModel": {
"docsUrl": "http://localhost:8080/#!/model/snapshot.london_bike_analysis.cycle_hire_snapshot",
"fields": [],
"materialization": {
Expand Down
Loading

0 comments on commit b8f9dc2

Please sign in to comment.