Skip to content

Commit

Permalink
Extract created information (#764)
Browse files Browse the repository at this point in the history
* Extract table creation time

* Update packages

* Bump version

* Bump version
  • Loading branch information
elic-eon authored Jan 25, 2024
1 parent 55268eb commit 3990b6c
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 37 deletions.
4 changes: 4 additions & 0 deletions metaphor/bigquery/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
QueryLog,
SchemaField,
SchemaType,
SourceInfo,
SQLSchema,
TypeEnum,
)
Expand Down Expand Up @@ -199,6 +200,9 @@ def _parse_table(project_id, bq_table: bigquery.table.Table) -> Dataset:
structure=DatasetStructure(
database=project_id, schema=bq_table.dataset_id, table=bq_table.table_id
),
source_info=SourceInfo(
created_at_source=bq_table.created, last_updated=bq_table.modified
),
)

# See https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.table.Table.html#google.cloud.bigquery.table.Table
Expand Down
19 changes: 16 additions & 3 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _fetch_shared_databases(cursor: SnowflakeCursor) -> List[str]:
return [db[3].lower() for db in cursor if db[1] == "INBOUND"]

FETCH_TABLE_QUERY = """
SELECT table_catalog, table_schema, table_name, table_type, COMMENT, row_count, bytes
SELECT table_catalog, table_schema, table_name, table_type, COMMENT, row_count, bytes, created
FROM information_schema.tables
WHERE table_schema != 'INFORMATION_SCHEMA'
ORDER BY table_schema, table_name
Expand All @@ -199,6 +199,7 @@ def _fetch_tables(
comment,
row_count,
table_bytes,
created,
) in cursor:
normalized_name = dataset_normalized_name(database, schema, name)
if not self._filter.include_table(database, schema, name):
Expand All @@ -213,7 +214,14 @@ def _fetch_tables(
continue

self._datasets[normalized_name] = self._init_dataset(
database, schema, name, table_type, comment, row_count, table_bytes
database,
schema,
name,
table_type,
comment,
row_count,
table_bytes,
created,
)
tables[normalized_name] = DatasetInfo(database, schema, name, table_type)

Expand Down Expand Up @@ -564,6 +572,7 @@ def _fetch_streams(self, cursor: SnowflakeCursor, database: str, schema: str):
cursor.execute(f"SHOW STREAMS IN {schema}")
for entry in cursor:
(
create_on,
stream_name,
comment,
source_name,
Expand All @@ -572,6 +581,7 @@ def _fetch_streams(self, cursor: SnowflakeCursor, database: str, schema: str):
stream_type_str,
stale_after,
) = (
entry[0],
entry[1],
entry[5],
entry[6],
Expand All @@ -598,6 +608,7 @@ def _fetch_streams(self, cursor: SnowflakeCursor, database: str, schema: str):
comment=comment,
row_count=row_count,
table_bytes=None, # Not applicable to streams
created=create_on,
)

def _to_dataset_eid(x: str) -> str:
Expand Down Expand Up @@ -784,6 +795,7 @@ def _init_dataset(
comment: str,
row_count: Optional[int],
table_bytes: Optional[float],
created: Optional[datetime] = None,
) -> Dataset:
normalized_name = dataset_normalized_name(database, schema, table)
dataset = Dataset()
Expand All @@ -793,7 +805,8 @@ def _init_dataset(
)

dataset.source_info = SourceInfo(
main_url=SnowflakeExtractor.build_table_url(self._account, normalized_name)
main_url=SnowflakeExtractor.build_table_url(self._account, normalized_name),
created_at_source=created,
)

sql_schema = SQLSchema()
Expand Down
12 changes: 7 additions & 5 deletions metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import json
import logging
import re
Expand Down Expand Up @@ -52,6 +51,7 @@
build_query_log_filter_by,
create_api,
create_connection,
from_timestamp_ms,
list_column_lineage,
list_table_lineage,
)
Expand Down Expand Up @@ -226,7 +226,11 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset:
)

main_url = self._get_source_url(database, schema_name, table_name)
dataset.source_info = SourceInfo(main_url=main_url)
dataset.source_info = SourceInfo(
main_url=main_url,
created_at_source=from_timestamp_ms(table_info.created_at),
last_updated=from_timestamp_ms(table_info.updated_at),
)

dataset.unity_catalog = UnityCatalog(
table_type=UnityCatalogTableType[table_info.table_type.value],
Expand Down Expand Up @@ -334,9 +338,7 @@ def _get_query_logs(self) -> QueryLogs:
):
start_time = None
if query_info.query_start_time_ms is not None:
start_time = datetime.datetime.fromtimestamp(
query_info.query_start_time_ms / 1000, tz=datetime.timezone.utc
)
start_time = from_timestamp_ms(query_info.query_start_time_ms)

user_id, email = user_id_or_email(query_info.user_name)

Expand Down
4 changes: 4 additions & 0 deletions metaphor/unity_catalog/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ def create_connection(

def create_api(host: str, token: str) -> WorkspaceClient:
return WorkspaceClient(host=host, token=token)


def from_timestamp_ms(timestamp: int) -> datetime.datetime:
return datetime.datetime.fromtimestamp(timestamp / 1000, tz=datetime.timezone.utc)
Loading

0 comments on commit 3990b6c

Please sign in to comment.