Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. #12318

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The details of each granted privilege can be viewed in [snowflake docs](https://
If the warehouse is already running during ingestion or has auto-resume enabled,
this permission is not required.
- `usage` is required for us to run queries using the warehouse
- `usage` on `database` and `schema` are required because without it tables and views inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view exists then we will not be able to get metadata for the table/view.
- `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream.
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DatasetSubTypes(StrEnum):
SAC_LIVE_DATA_MODEL = "Live Data Model"
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"

# TODO: Create separate entity...
NOTEBOOK = "Notebook"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SnowflakeObjectDomain(StrEnum):
SCHEMA = "schema"
COLUMN = "column"
ICEBERG_TABLE = "iceberg table"
STREAM = "stream"


GENERIC_PERMISSION_ERROR_KEY = "permission-error"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class SnowflakeFilterConfig(SQLFilterConfig):
)
# table_pattern and view_pattern are inherited from SQLFilterConfig

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
Expand Down Expand Up @@ -268,6 +273,16 @@ class SnowflakeV2Config(
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)

include_streams: bool = Field(
default=True,
description="If enabled, streams will be ingested as separate entities from tables/views.",
)

stream_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for streams to filter in ingestion.",
)

structured_property_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownLineageMapping,
ObservedQuery,
PreparsedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
Expand Down Expand Up @@ -398,6 +399,36 @@
pass
else:
return None

user = CorpUserUrn(

Check warning on line 403 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L403

Added line #L403 was not covered by tests
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

# Check if any of the accessed objects are streams
has_stream_objects = any(

Check warning on line 410 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L410

Added line #L410 was not covered by tests
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
)

# If a stream is used, default to query parsing.
if has_stream_objects:
logger.debug("Found matching stream object")
self.aggregator.add_observed_query(

Check warning on line 417 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L415-L417

Added lines #L415 - L417 were not covered by tests
observed=ObservedQuery(
query=res["query_text"],
session_id=res["session_id"],
timestamp=res["query_start_time"].astimezone(timezone.utc),
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
res["query_text"], self.identifiers.platform, fast=True
),
),
)
return None

Check warning on line 430 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py#L430

Added line #L430 was not covered by tests

upstreams = []
column_usage = {}

Expand Down Expand Up @@ -460,12 +491,6 @@
)
)

user = CorpUserUrn(
self.identifiers.get_user_identifier(
res["user_name"], users.get(res["user_name"])
)
)

brock-acryl marked this conversation as resolved.
Show resolved Hide resolved
timestamp: datetime = res["query_start_time"]
timestamp = timestamp.astimezone(timezone.utc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datahub.utilities.prefix_batch_builder import PrefixGroup

SHOW_VIEWS_MAX_PAGE_SIZE = 10000
SHOW_STREAM_MAX_PAGE_SIZE = 10000


def create_deny_regex_sql_filter(
Expand Down Expand Up @@ -36,6 +37,7 @@ class SnowflakeQuery:
SnowflakeObjectDomain.VIEW.capitalize(),
SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(),
SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(),
SnowflakeObjectDomain.STREAM.capitalize(),
}

ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format(
Expand All @@ -44,7 +46,8 @@ class SnowflakeQuery:
ACCESS_HISTORY_TABLE_DOMAINS_FILTER = (
"("
f"'{SnowflakeObjectDomain.TABLE.capitalize()}',"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}'"
f"'{SnowflakeObjectDomain.VIEW.capitalize()}',"
f"'{SnowflakeObjectDomain.STREAM.capitalize()}',"
")"
)

Expand Down Expand Up @@ -952,3 +955,19 @@ def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str:
@staticmethod
def get_all_users() -> str:
return """SELECT name as "NAME", email as "EMAIL" FROM SNOWFLAKE.ACCOUNT_USAGE.USERS"""

@staticmethod
def streams_for_database(
db_name: str,
limit: int = SHOW_STREAM_MAX_PAGE_SIZE,
stream_pagination_marker: Optional[str] = None,
) -> str:
# SHOW STREAMS can return a maximum of 10000 rows.
# https://docs.snowflake.com/en/sql-reference/sql/show-streams#usage-notes
assert limit <= SHOW_STREAM_MAX_PAGE_SIZE

# To work around this, we paginate through the results using the FROM clause.
from_clause = (
f"""FROM '{stream_pagination_marker}'""" if stream_pagination_marker else ""
)
return f"""SHOW STREAMS IN DATABASE {db_name} LIMIT {limit} {from_clause};"""
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SnowflakeReport(SQLSourceReport, BaseTimeWindowReport):
num_table_to_view_edges_scanned: int = 0
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
num_stream_edges_scanned: int = 0
ignore_start_time_lineage: Optional[bool] = None
upstream_lineage_in_report: Optional[bool] = None
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
Expand Down Expand Up @@ -103,6 +104,7 @@ class SnowflakeV2Report(
schemas_scanned: int = 0
databases_scanned: int = 0
tags_scanned: int = 0
streams_scanned: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
Expand All @@ -112,6 +114,7 @@ class SnowflakeV2Report(
table_lineage_query_secs: float = -1
external_lineage_queries_secs: float = -1
num_tables_with_known_upstreams: int = 0
num_streams_with_known_upstreams: int = 0
num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0

Expand All @@ -129,6 +132,8 @@ class SnowflakeV2Report(
num_get_tags_for_object_queries: int = 0
num_get_tags_on_columns_for_table_queries: int = 0

num_get_streams_for_schema_queries: int = 0

rows_zero_objects_modified: int = 0

_processed_tags: MutableSet[str] = field(default_factory=set)
Expand All @@ -155,6 +160,8 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
return
self._scanned_tags.add(name)
self.tags_scanned += 1
elif ent_type == "stream":
self.streams_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
comment: Optional[str]
tables: List[str] = field(default_factory=list)
views: List[str] = field(default_factory=list)
streams: List[str] = field(default_factory=list)
tags: Optional[List[SnowflakeTag]] = None


Expand All @@ -131,6 +132,29 @@
tags: Optional[List[SnowflakeTag]] = None


@dataclass
class SnowflakeStream:
name: str
created: datetime
owner: str
comment: str
source_type: str
type: str
stale: str
mode: str
invalid_reason: str
owner_role_type: str
database_name: str
schema_name: str
table_name: str
columns: List[SnowflakeColumn] = field(default_factory=list)
stale_after: Optional[datetime] = None
base_tables: Optional[str] = None
tags: Optional[List[SnowflakeTag]] = None
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None


class _SnowflakeTagCache:
def __init__(self) -> None:
# self._database_tags[<database_name>] = list of tags applied to database
Expand Down Expand Up @@ -208,6 +232,7 @@
self.get_tables_for_database,
self.get_views_for_database,
self.get_columns_for_schema,
self.get_streams_for_database,
self.get_pk_constraints_for_schema,
self.get_fk_constraints_for_schema,
]
Expand Down Expand Up @@ -594,3 +619,63 @@
tags[column_name].append(snowflake_tag)

return tags

@serialized_lru_cache(maxsize=1)
def get_streams_for_database(
self, db_name: str
) -> Dict[str, List[SnowflakeStream]]:
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE

streams: Dict[str, List[SnowflakeStream]] = {}

first_iteration = True
stream_pagination_marker: Optional[str] = None
while first_iteration or stream_pagination_marker is not None:
cur = self.connection.query(
SnowflakeQuery.streams_for_database(
db_name,
limit=page_limit,
stream_pagination_marker=stream_pagination_marker,
)
)

first_iteration = False
stream_pagination_marker = None

result_set_size = 0
for stream in cur:
result_set_size += 1

stream_name = stream["name"]
schema_name = stream["schema_name"]
if schema_name not in streams:
streams[schema_name] = []
streams[stream["schema_name"]].append(
SnowflakeStream(
name=stream["name"],
created=stream["created_on"],
owner=stream["owner"],
comment=stream["comment"],
source_type=stream["source_type"],
type=stream["type"],
stale=stream["stale"],
mode=stream["mode"],
database_name=stream["database_name"],
schema_name=stream["schema_name"],
invalid_reason=stream["invalid_reason"],
owner_role_type=stream["owner_role_type"],
stale_after=stream["stale_after"],
table_name=stream["table_name"],
base_tables=stream["base_tables"],
last_altered=stream["created_on"],
)
)

if result_set_size >= page_limit:
# If we hit the limit, we need to send another request to get the next page.
logger.info(

Check warning on line 676 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L676

Added line #L676 was not covered by tests
f"Fetching next page of views for {db_name} - after {stream_name}"
)
stream_pagination_marker = stream_name

Check warning on line 679 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L679

Added line #L679 was not covered by tests

return streams
Loading
Loading