Skip to content

Commit

Permalink
Refactor Unity Catalog to fetch catalog/schema/table metadata from Sy…
Browse files Browse the repository at this point in the history
…stem tables
  • Loading branch information
mars-lan committed Oct 25, 2024
1 parent d8d7d47 commit bcc86e6
Show file tree
Hide file tree
Showing 20 changed files with 1,748 additions and 466 deletions.
405 changes: 225 additions & 180 deletions metaphor/unity_catalog/extractor.py

Large diffs are not rendered by default.

95 changes: 72 additions & 23 deletions metaphor/unity_catalog/models.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,8 @@
from typing import Dict, List
from datetime import datetime
from typing import Dict, List, Literal, Optional

from databricks.sdk.service.catalog import ColumnInfo
from pydantic import BaseModel

from metaphor.common.fieldpath import build_schema_field
from metaphor.common.logger import get_logger
from metaphor.models.metadata_change_event import SchemaField

logger = get_logger()


def extract_schema_field_from_column_info(column: ColumnInfo) -> SchemaField:
if column.name is None or column.type_name is None:
raise ValueError(f"Invalid column {column.name}, no type_name found")

field = build_schema_field(
column.name, column.type_name.value.lower(), column.comment
)
field.precision = (
float(column.type_precision)
if column.type_precision is not None
else float("nan")
)
return field


class TableLineage(BaseModel):
upstream_tables: List[str] = []
Expand All @@ -36,3 +15,73 @@ class Column(BaseModel):

class ColumnLineage(BaseModel):
upstream_columns: Dict[str, List[Column]] = {}


class CatalogInfo(BaseModel):
catalog_name: str
owner: str
comment: Optional[str] = None


class SchemaInfo(BaseModel):
catalog_name: str
schema_name: str
owner: str
comment: Optional[str] = None


class ColumnInfo(BaseModel):
column_name: str
data_type: str
data_precision: Optional[int]
is_nullable: bool
comment: Optional[str] = None


class TableInfo(BaseModel):
catalog_name: str
schema_name: str
table_name: str
type: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
view_definition: Optional[str] = None
storage_location: Optional[str] = None
data_source_format: str
columns: List[ColumnInfo] = []


class VolumeInfo(BaseModel):
catalog_name: str
schema_name: str
volume_name: str
volume_type: Literal["MANAGED", "EXTERNAL"]
full_name: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
storage_location: str


class VolumeFileInfo(BaseModel):
last_updated: datetime
name: str
path: str
size: float


class CatalogTag(BaseModel):
key: str
value: str


class SchemaTag(BaseModel):
key: str
value: str
Loading

0 comments on commit bcc86e6

Please sign in to comment.