Skip to content

Commit

Permalink
feat(ingest/dynamoDB): flatten struct fields (datahub-project#9852)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamas Nemeth <[email protected]>
  • Loading branch information
TonyOuyangGit and treff7es authored Feb 15, 2024
1 parent bbd818a commit ae1806f
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass, field
from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Type, Union

import boto3
import pydantic
Expand Down Expand Up @@ -61,6 +61,7 @@
PAGE_SIZE = 100
MAX_SCHEMA_SIZE = 300
MAX_PRIMARY_KEYS_SIZE = 100
FIELD_DELIMITER = "."

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -285,13 +286,13 @@ def construct_schema_from_dynamodb(
dynamodb_client: BaseClient,
region: str,
table_name: str,
) -> Dict[str, SchemaDescription]:
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
This will use the dynamodb client to scan the given table to retrieve items with pagination,
and construct the schema of this table by reading the attributes of the retrieved items
"""
paginator = dynamodb_client.get_paginator("scan")
schema: Dict[str, SchemaDescription] = {}
schema: Dict[Tuple[str, ...], SchemaDescription] = {}
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Paginator.Scan
Note that the behavior of the pagination does not align with the documentation according to https://stackoverflow.com/questions/39201093/how-to-use-boto3-pagination
Expand Down Expand Up @@ -323,7 +324,7 @@ def include_table_item_to_schema(
dynamodb_client: Any,
region: str,
table_name: str,
schema: Dict[str, SchemaDescription],
schema: Dict[Tuple[str, ...], SchemaDescription],
) -> None:
"""
It will look up in the config include_table_item dict to see if "region.table_name" exists as key,
Expand Down Expand Up @@ -358,7 +359,9 @@ def include_table_item_to_schema(
self.construct_schema_from_items(items, schema)

def construct_schema_from_items(
slef, items: List[Dict[str, Dict]], schema: Dict[str, SchemaDescription]
self,
items: List[Dict[str, Dict]],
schema: Dict[Tuple[str, ...], SchemaDescription],
) -> None:
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
Expand All @@ -367,35 +370,58 @@ def construct_schema_from_items(
we are writing our own construct schema method, take the attribute name as key and SchemaDescription as value
"""
for document in items:
# the key is the attribute name and the value is a dict with only one entry,
# whose key is the data type and value is the data
for key, value in document.items():
if value is not None:
data_type = list(value.keys())[0]
if key not in schema:
schema[key] = {
"types": Counter(data_type),
"count": 1,
# It seems we don't have collapsed field name so we are using attribute name here
"delimited_name": key,
"type": data_type,
"nullable": False,
}
else:
# update the type count
schema[key]["types"].update({data_type: 1})
schema[key]["count"] += 1
# if we found an attribute name with different attribute type, we consider this attribute type as "mixed"
field_types = schema[key]["types"]
if len(field_types.keys()) > 1:
schema[key]["type"] = "mixed"
self.append_schema(schema, document)

def append_schema(
self,
schema: Dict[Tuple[str, ...], SchemaDescription],
document: Dict[str, Dict],
parent_field_path: Tuple[str, ...] = (),
) -> None:
# the key is the attribute name and the value is a dict with only one entry,
# whose key is the data type and value is the data and we will recursively expand
# map data type to get flattened field
for key, value in document.items():
if value is not None:
data_type = list(value.keys())[0]
attribute_value = value[data_type]
current_field_path = parent_field_path + (key,)
# Handle nested maps by recursive calls
if data_type == "M":
logger.debug(
f"expanding nested fields for map, current_field_path: {current_field_path}"
)
self.append_schema(schema, attribute_value, current_field_path)

if current_field_path not in schema:
schema[current_field_path] = {
"types": Counter({data_type: 1}),
"count": 1,
# It seems we don't have collapsed field name so we are using attribute name here
"delimited_name": FIELD_DELIMITER.join(current_field_path),
"type": data_type,
"nullable": False,
}
else:
schema[current_field_path]["types"].update({data_type: 1})
schema[current_field_path]["count"] += 1
# if we found an attribute name with different attribute type, we consider this attribute type as "mixed"
if len(schema[current_field_path]["types"]) > 1:
schema[current_field_path]["type"] = "mixed"
schema[current_field_path]["nullable"] |= (
attribute_value is None
) # Mark as nullable if null encountered
types = schema[current_field_path]["types"]
logger.debug(
f"append schema with field_path: {current_field_path} and type: {types}"
)

def construct_schema_metadata(
self,
table_name: str,
dataset_urn: str,
dataset_properties: DatasetPropertiesClass,
schema: Dict[str, SchemaDescription],
schema: Dict[Tuple[str, ...], SchemaDescription],
primary_key_dict: Dict[str, str],
) -> SchemaMetadata:
""" "
Expand All @@ -407,20 +433,23 @@ def construct_schema_metadata(
canonical_schema: List[SchemaField] = []
schema_size = len(schema.values())
table_fields = list(schema.values())

if schema_size > MAX_SCHEMA_SIZE:
# downsample the schema, using frequency as the sort key
self.report.report_warning(
key=dataset_urn,
reason=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}",
)

# Add this information to the custom properties so user can know they are looking at down sampled schema
dataset_properties.customProperties["schema.downsampled"] = "True"
dataset_properties.customProperties["schema.totalFields"] = f"{schema_size}"
# append each schema field (sort so output is consistent)
# append each schema field, schema will be sorted by count descending and delimited_name ascending and sliced to only include MAX_SCHEMA_SIZE items
for schema_field in sorted(
table_fields,
key=lambda x: x["delimited_name"],
key=lambda x: (
-x["count"],
x["delimited_name"],
), # Negate `count` for descending order, `delimited_name` stays the same for ascending
)[0:MAX_SCHEMA_SIZE]:
field_path = schema_field["delimited_name"]
native_data_type = self.get_native_type(schema_field["type"], table_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "contactNumbers",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.ArrayType": {}
}
},
"nativeDataType": "List",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "partitionKey",
"nullable": false,
Expand All @@ -59,6 +71,78 @@
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.RecordType": {}
}
},
"nativeDataType": "Map",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.RecordType": {}
}
},
"nativeDataType": "Map",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours.close",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours.open",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.parking",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
},
"nativeDataType": "Boolean",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.wifi",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "zip",
"nullable": true,
Expand All @@ -76,7 +160,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -95,7 +180,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -111,7 +197,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -126,7 +213,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
}
]
Loading

0 comments on commit ae1806f

Please sign in to comment.