Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Unity Catalog to fetch catalog/schema/table metadata from System tables #1022

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
685 changes: 270 additions & 415 deletions metaphor/unity_catalog/extractor.py

Large diffs are not rendered by default.

95 changes: 72 additions & 23 deletions metaphor/unity_catalog/models.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,8 @@
from typing import Dict, List
from datetime import datetime
from typing import Dict, List, Literal, Optional

from databricks.sdk.service.catalog import ColumnInfo
from pydantic import BaseModel

from metaphor.common.fieldpath import build_schema_field
from metaphor.common.logger import get_logger
from metaphor.models.metadata_change_event import SchemaField

logger = get_logger()


def extract_schema_field_from_column_info(column: ColumnInfo) -> SchemaField:
if column.name is None or column.type_name is None:
raise ValueError(f"Invalid column {column.name}, no type_name found")

field = build_schema_field(
column.name, column.type_name.value.lower(), column.comment
)
field.precision = (
float(column.type_precision)
if column.type_precision is not None
else float("nan")
)
return field


class TableLineage(BaseModel):
upstream_tables: List[str] = []
Expand All @@ -36,3 +15,73 @@ class Column(BaseModel):

class ColumnLineage(BaseModel):
upstream_columns: Dict[str, List[Column]] = {}


class Tag(BaseModel):
key: str
value: str


class CatalogInfo(BaseModel):
catalog_name: str
owner: str
comment: Optional[str] = None
tags: List[Tag]


class SchemaInfo(BaseModel):
catalog_name: str
schema_name: str
owner: str
comment: Optional[str] = None
tags: List[Tag]


class ColumnInfo(BaseModel):
column_name: str
data_type: str
data_precision: Optional[int]
is_nullable: bool
comment: Optional[str] = None
tags: List[Tag]


class TableInfo(BaseModel):
catalog_name: str
schema_name: str
table_name: str
type: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
view_definition: Optional[str] = None
storage_location: Optional[str] = None
data_source_format: str
tags: List[Tag] = []
columns: List[ColumnInfo] = []


class VolumeInfo(BaseModel):
catalog_name: str
schema_name: str
volume_name: str
volume_type: Literal["MANAGED", "EXTERNAL"]
full_name: str
owner: str
comment: Optional[str] = None
created_at: datetime
created_by: str
updated_at: datetime
updated_by: str
storage_location: str
tags: List[Tag]


class VolumeFileInfo(BaseModel):
last_updated: datetime
name: str
path: str
size: float
10 changes: 9 additions & 1 deletion metaphor/unity_catalog/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from metaphor.common.entity_id import normalize_full_dataset_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.fieldpath import build_field_statistics
from metaphor.common.filter import DatasetFilter
from metaphor.common.logger import get_logger
from metaphor.common.utils import safe_float
from metaphor.models.crawler_run_metadata import Platform
Expand All @@ -26,7 +27,6 @@
DatasetLogicalID,
DatasetStatistics,
)
from metaphor.unity_catalog.extractor import DEFAULT_FILTER
from metaphor.unity_catalog.profile.config import UnityCatalogProfileRunConfig
from metaphor.unity_catalog.utils import (
create_api,
Expand All @@ -36,6 +36,14 @@

logger = get_logger()

# Filter out "system" database & all "information_schema" schemas
DEFAULT_FILTER: DatasetFilter = DatasetFilter(
excludes={
"system": None,
"*": {"information_schema": None},
}
)

NON_MODIFICATION_OPERATIONS = {
"SET TBLPROPERTIES",
"ADD CONSTRAINT",
Expand Down
Loading
Loading