Skip to content

Commit

Permalink
[dagster-dlift] scaffold testing environment (#25444)
Browse files Browse the repository at this point in the history
## Summary & Motivation
- Scaffold a live testing environment for dlift. 
- Make live tests run on any commit with [dagster-dlift] in the message,
and also on the nightly OSS build.
- Introduce an "instance" for interacting with the dbt cloud rest API.
- Introduce an "instance fake" for spec-ing out fake results from the
rest API to be used in unit tests.

## How I Tested These Changes
- All the testing I added, some tests for the testing utils as well.
  • Loading branch information
dpeng817 authored Oct 31, 2024
1 parent d54c530 commit 14557ad
Show file tree
Hide file tree
Showing 21 changed files with 413 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class PackageSpec:
queue: Optional[BuildkiteQueue] = None
run_pytest: bool = True
always_run_if: Optional[Callable[[], bool]] = None
skip_if: Optional[Callable[[], str]] = None

def __post_init__(self):
if not self.name:
Expand Down Expand Up @@ -243,13 +244,17 @@ def requirements(self):

@property
def skip_reason(self) -> Optional[str]:
# Memoize so we don't log twice
if self._should_skip is False:
if self._should_skip is None:
return None

# Memoize so we don't log twice
if self.always_run_if and self.always_run_if():
self._should_skip = False
return None
if self.skip_if and self.skip_if():
self._skip_reason = self.skip_if()
self._should_skip = True
return self._skip_reason

if self._skip_reason:
return self._skip_reason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
],
pytest_extra_cmds=k8s_extra_cmds,
),
PackageSpec(
"examples/experimental/dagster-dlift/kitchen-sink",
name="dbt-cloud-live-tests",
env_vars=[
"KS_DBT_CLOUD_ACCOUNT_ID",
"KS_DBT_CLOUD_TOKEN",
"KS_DBT_CLOUD_ACCESS_URL",
"KS_DBT_CLOUD_DISCOVERY_API_URL",
],
),
]
)

Expand Down
14 changes: 14 additions & 0 deletions .buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
has_dagster_airlift_changes,
has_storage_test_fixture_changes,
network_buildkite_container,
skip_if_not_dlift_commit,
)


Expand Down Expand Up @@ -387,6 +388,19 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
"examples/experimental/dagster-dlift",
name=":dbt: dlift",
),
# Runs against live dbt cloud instance, we only want to run on commits and on the
# nightly build
PackageSpec(
"examples/experimental/dagster-dlift/kitchen-sink",
skip_if=skip_if_not_dlift_commit,
name=":dbt: :sink: Dbt Cloud-Lift Kitchen Sink",
env_vars=[
"KS_DBT_CLOUD_ACCOUNT_ID",
"KS_DBT_CLOUD_TOKEN",
"KS_DBT_CLOUD_ACCESS_URL",
"KS_DBT_CLOUD_DISCOVERY_API_URL",
],
),
]


Expand Down
8 changes: 8 additions & 0 deletions .buildkite/dagster-buildkite/dagster_buildkite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ def has_storage_test_fixture_changes():
)


def skip_if_not_dlift_commit():
return (
None
if any("dagster-dlift" in str(path) for path in ChangedFiles.all)
else "Not a dlift commit"
)


def skip_if_no_helm_changes():
if message_contains("NO_SKIP"):
return None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Any, Mapping, Sequence

import requests

from dagster_dlift.gql_queries import VERIFICATION_QUERY

ENVIRONMENTS_SUBPATH = "environments/"


class DbtCloudInstance:
def __init__(
self,
# Can be found on the Account Info page of dbt.
account_id: str,
# Can be either a personal token or a service token.
token: str,
# Can be found on the
access_url: str,
discovery_api_url: str,
):
self.account_id = account_id
self.token = token
self.access_url = access_url
self.discovery_api_url = discovery_api_url

def get_api_v2_url(self) -> str:
return f"{self.access_url}/api/v2/accounts/{self.account_id}"

def get_discovery_api_url(self) -> str:
return f"{self.discovery_api_url}/graphql"

def get_session(self) -> requests.Session:
session = requests.Session()
session.headers.update(
{
"Accept": "application/json",
"Authorization": f"Token {self.token}",
}
)
return session

def make_access_api_request(self, subpath: str) -> Mapping[str, Any]:
session = self.get_session()
return self.ensure_valid_response(session.get(f"{self.get_api_v2_url()}/{subpath}")).json()

def ensure_valid_response(self, response: requests.Response) -> requests.Response:
if response.status_code != 200:
raise Exception(f"Request to DBT Cloud failed: {response.text}")
return response

def make_discovery_api_query(self, query: str, variables: Mapping[str, Any]):
session = self.get_session()
return self.ensure_valid_response(
session.post(
self.get_discovery_api_url(),
json={"query": query, "variables": variables},
)
).json()

def list_environment_ids(self) -> Sequence[int]:
return [
environment["id"]
for environment in self.make_access_api_request(ENVIRONMENTS_SUBPATH)["data"]
]

def verify_connections(self) -> None:
# Verifies connection to both the access and discovery APIs.
for environment_id in self.list_environment_ids():
response = self.make_discovery_api_query(
VERIFICATION_QUERY, {"environmentId": environment_id}
)
try:
if response["data"]["environment"]["__typename"] != "Environment":
raise Exception(
f"Failed to verify connection to environment {environment_id}. Response: {response}"
)
except KeyError:
raise Exception(
f"Failed to verify connection to environment {environment_id}. Response: {response}"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
VERIFICATION_QUERY = """
query VerificationQuery($environmentId: BigInt!) {
environment(id: $environmentId) {
__typename
}
}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Mapping, NamedTuple

from dagster_dlift.cloud_instance import DbtCloudInstance


class ExpectedDiscoveryApiRequest(NamedTuple):
query: str
variables: Mapping[str, Any]

def __hash__(self) -> int:
return hash((self.query, frozenset(self.variables.items())))


class ExpectedAccessApiRequest(NamedTuple):
subpath: str

def __hash__(self) -> int:
return hash(self.subpath)


class DbtCloudInstanceFake(DbtCloudInstance):
"""A version that allows users to fake API responses for testing purposes."""

def __init__(
self,
access_api_responses: Mapping[ExpectedAccessApiRequest, Any],
discovery_api_responses: Mapping[ExpectedDiscoveryApiRequest, Any],
):
self.access_api_responses = access_api_responses
self.discovery_api_responses = discovery_api_responses

def make_access_api_request(self, subpath: str) -> Mapping[str, Any]:
if ExpectedAccessApiRequest(subpath) not in self.access_api_responses:
raise Exception(
f"ExpectedAccessApiRequest({subpath}) not found in access_api_responses"
)
return self.access_api_responses[ExpectedAccessApiRequest(subpath)]

def make_discovery_api_query(
self, query: str, variables: Mapping[str, Any]
) -> Mapping[str, Any]:
if ExpectedDiscoveryApiRequest(query, variables) not in self.discovery_api_responses:
raise Exception(
f"ExpectedDiscoveryApiRequest({query}, {variables}) not found in discovery_api_responses"
)
return self.discovery_api_responses[ExpectedDiscoveryApiRequest(query, variables)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os


def get_env_var(var_name: str) -> str:
value = os.getenv(var_name)
if not value:
raise Exception(f"{var_name} is not set")
return value
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
from dagster_dlift.cloud_instance import ENVIRONMENTS_SUBPATH
from dagster_dlift.gql_queries import VERIFICATION_QUERY
from dagster_dlift.test.instance_fake import (
DbtCloudInstanceFake,
ExpectedAccessApiRequest,
ExpectedDiscoveryApiRequest,
)


def test_verification() -> None:
"""Test proper error states when we can't properly verify the instance."""
# We get no response back from the discovery api
fake_instance = DbtCloudInstanceFake(
access_api_responses={
ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {"data": [{"id": 1}]}
},
discovery_api_responses={
ExpectedDiscoveryApiRequest(
query=VERIFICATION_QUERY, variables={"environmentId": 1}
): {}
},
)

with pytest.raises(Exception, match="Failed to verify"):
fake_instance.verify_connections()

# We get a response back from the discovery api, but it's not what we expect
fake_instance = DbtCloudInstanceFake(
access_api_responses={
ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {"data": [{"id": 1}]}
},
discovery_api_responses={
ExpectedDiscoveryApiRequest(query=VERIFICATION_QUERY, variables={"environmentId": 1}): {
"data": {"environment": {"__typename": "NotEnvironment"}}
}
},
)

with pytest.raises(Exception, match="Failed to verify"):
fake_instance.verify_connections()

# Finally, we get a valid response back from the discovery api
fake_instance = DbtCloudInstanceFake(
access_api_responses={
ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {"data": [{"id": 1}]}
},
discovery_api_responses={
ExpectedDiscoveryApiRequest(query=VERIFICATION_QUERY, variables={"environmentId": 1}): {
"data": {"environment": {"__typename": "Environment"}}
}
},
)
fake_instance.verify_connections()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
from dagster._core.test_utils import environ
from dagster_dlift.cloud_instance import ENVIRONMENTS_SUBPATH
from dagster_dlift.gql_queries import VERIFICATION_QUERY
from dagster_dlift.test.instance_fake import (
DbtCloudInstanceFake,
ExpectedAccessApiRequest,
ExpectedDiscoveryApiRequest,
)
from dagster_dlift.test.utils import get_env_var


def test_get_env_var() -> None:
"""Test we can get an env var, and good error state for lack of env var."""
with environ({"TEST_ENV_VAR": "test_value"}):
assert get_env_var("TEST_ENV_VAR") == "test_value"

with pytest.raises(Exception, match="TEST_ENV_VAR"):
get_env_var("TEST_ENV_VAR")


def test_cloud_instance_fake() -> None:
"""Test that cloud instance fake behaves properly when inducing queries."""
fake_instance = DbtCloudInstanceFake(
access_api_responses={
ExpectedAccessApiRequest(subpath=ENVIRONMENTS_SUBPATH): {
"data": {"environments": [{"id": 1}]}
}
},
discovery_api_responses={
ExpectedDiscoveryApiRequest(query=VERIFICATION_QUERY, variables={"environmentId": 1}): {
"data": {"environment": {"__typename": "Environment"}}
}
},
)

assert fake_instance.make_access_api_request(ENVIRONMENTS_SUBPATH) == {
"data": {"environments": [{"id": 1}]}
}
assert fake_instance.make_discovery_api_query(VERIFICATION_QUERY, {"environmentId": 1}) == {
"data": {"environment": {"__typename": "Environment"}}
}
with pytest.raises(Exception, match="ExpectedAccessApiRequest"):
fake_instance.make_access_api_request("bad_subpath")
with pytest.raises(Exception, match="ExpectedDiscoveryApiRequest"):
fake_instance.make_discovery_api_query(VERIFICATION_QUERY, {"accountId": "bad"})
2 changes: 2 additions & 0 deletions examples/experimental/dagster-dlift/kitchen-sink/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.airflow_home
.dagster_home
27 changes: 27 additions & 0 deletions examples/experimental/dagster-dlift/kitchen-sink/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
.PHONY: help

define GET_MAKEFILE_DIR
$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))) | sed 's:/*$$::')
endef

export MAKEFILE_DIR := $(GET_MAKEFILE_DIR)
export DAGSTER_HOME := $(MAKEFILE_DIR)/.dagster_home
export DAGSTER_URL := http://localhost:3333

help:
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

dev_install:
pip install uv && \
uv pip install -e ../../../dagster-airlift
uv pip install -e .

setup_local_env:
$(MAKE) wipe
mkdir -p $(DAGSTER_HOME)

run_dagster:
dagster dev -m kitchen_sink_dlift.dagster_defs.defs -p 3333

wipe: ## Wipe out all the files created by the Makefile
rm -rf $(DAGSTER_HOME)
22 changes: 22 additions & 0 deletions examples/experimental/dagster-dlift/kitchen-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Kitchen Sink

This is designed to be a testbed for testing specific migration scenarios.

First:

```bash
make dev_install
make setup_local_env
```

Then in one shell:

```
make run_airflow
```

Then in another shell:

```
make run_dagster
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest_plugins = ["dagster_dlift.test.shared_fixtures"]
Empty file.
Loading

0 comments on commit 14557ad

Please sign in to comment.