From ae1806fefa695ea4f4ebccc86b2f89e7c9629761 Mon Sep 17 00:00:00 2001 From: Tony Ouyang Date: Thu, 15 Feb 2024 13:40:03 -0800 Subject: [PATCH] feat(ingest/dynamoDB): flatten struct fields (#9852) Co-authored-by: Tamas Nemeth --- .../ingestion/source/dynamodb/dynamodb.py | 91 ++++++++++++------ ...default_platform_instance_mces_golden.json | 96 ++++++++++++++++++- ...ynamodb_platform_instance_mces_golden.json | 96 ++++++++++++++++++- .../integration/dynamodb/test_dynamodb.py | 17 +++- 4 files changed, 260 insertions(+), 40 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py index 972eb60ff5b05d..f917c38f947f41 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass, field -from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union +from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Type, Union import boto3 import pydantic @@ -61,6 +61,7 @@ PAGE_SIZE = 100 MAX_SCHEMA_SIZE = 300 MAX_PRIMARY_KEYS_SIZE = 100 +FIELD_DELIMITER = "." logger: logging.Logger = logging.getLogger(__name__) @@ -285,13 +286,13 @@ def construct_schema_from_dynamodb( dynamodb_client: BaseClient, region: str, table_name: str, - ) -> Dict[str, SchemaDescription]: + ) -> Dict[Tuple[str, ...], SchemaDescription]: """ This will use the dynamodb client to scan the given table to retrieve items with pagination, and construct the schema of this table by reading the attributes of the retrieved items """ paginator = dynamodb_client.get_paginator("scan") - schema: Dict[str, SchemaDescription] = {} + schema: Dict[Tuple[str, ...], SchemaDescription] = {} """ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Paginator.Scan Note that the behavior of the pagination does not align with the documentation according to https://stackoverflow.com/questions/39201093/how-to-use-boto3-pagination @@ -323,7 +324,7 @@ def include_table_item_to_schema( dynamodb_client: Any, region: str, table_name: str, - schema: Dict[str, SchemaDescription], + schema: Dict[Tuple[str, ...], SchemaDescription], ) -> None: """ It will look up in the config include_table_item dict to see if "region.table_name" exists as key, @@ -358,7 +359,9 @@ def include_table_item_to_schema( self.construct_schema_from_items(items, schema) def construct_schema_from_items( - slef, items: List[Dict[str, Dict]], schema: Dict[str, SchemaDescription] + self, + items: List[Dict[str, Dict]], + schema: Dict[Tuple[str, ...], SchemaDescription], ) -> None: """ https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan @@ -367,35 +370,58 @@ def construct_schema_from_items( we are writing our own construct schema method, take the attribute name as key and SchemaDescription as value """ for document in items: - # the key is the attribute name and the value is a dict with only one entry, - # whose key is the data type and value is the data - for key, value in document.items(): - if value is not None: - data_type = list(value.keys())[0] - if key not in schema: - schema[key] = { - "types": Counter(data_type), - "count": 1, - # It seems we don't have collapsed field name so we are using attribute name here - "delimited_name": key, - "type": data_type, - "nullable": False, - } - else: - # update the type count - schema[key]["types"].update({data_type: 1}) - schema[key]["count"] += 1 - # if we found an attribute name with different attribute type, we consider this attribute type as "mixed" - field_types = schema[key]["types"] - if len(field_types.keys()) > 1: - schema[key]["type"] = "mixed" + self.append_schema(schema, document) + + def append_schema( + self, + schema: Dict[Tuple[str, ...], SchemaDescription], + document: Dict[str, Dict], + parent_field_path: Tuple[str, ...] = (), + ) -> None: + # the key is the attribute name and the value is a dict with only one entry, + # whose key is the data type and value is the data and we will recursively expand + # map data type to get flattened field + for key, value in document.items(): + if value is not None: + data_type = list(value.keys())[0] + attribute_value = value[data_type] + current_field_path = parent_field_path + (key,) + # Handle nested maps by recursive calls + if data_type == "M": + logger.debug( + f"expanding nested fields for map, current_field_path: {current_field_path}" + ) + self.append_schema(schema, attribute_value, current_field_path) + + if current_field_path not in schema: + schema[current_field_path] = { + "types": Counter({data_type: 1}), + "count": 1, + # It seems we don't have collapsed field name so we are using attribute name here + "delimited_name": FIELD_DELIMITER.join(current_field_path), + "type": data_type, + "nullable": False, + } + else: + schema[current_field_path]["types"].update({data_type: 1}) + schema[current_field_path]["count"] += 1 + # if we found an attribute name with different attribute type, we consider this attribute type as "mixed" + if len(schema[current_field_path]["types"]) > 1: + schema[current_field_path]["type"] = "mixed" + schema[current_field_path]["nullable"] |= ( + attribute_value is None + ) # Mark as nullable if null encountered + types = schema[current_field_path]["types"] + logger.debug( + f"append schema with field_path: {current_field_path} and type: {types}" + ) def construct_schema_metadata( self, table_name: str, dataset_urn: str, dataset_properties: DatasetPropertiesClass, - schema: Dict[str, SchemaDescription], + schema: Dict[Tuple[str, ...], SchemaDescription], primary_key_dict: Dict[str, str], ) -> SchemaMetadata: """ " @@ -407,20 +433,23 @@ def construct_schema_metadata( canonical_schema: List[SchemaField] = [] schema_size = len(schema.values()) table_fields = list(schema.values()) - if schema_size > MAX_SCHEMA_SIZE: # downsample the schema, using frequency as the sort key self.report.report_warning( key=dataset_urn, reason=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}", ) + # Add this information to the custom properties so user can know they are looking at down sampled schema dataset_properties.customProperties["schema.downsampled"] = "True" dataset_properties.customProperties["schema.totalFields"] = f"{schema_size}" - # append each schema field (sort so output is consistent) + # append each schema field, schema will be sorted by count descending and delimited_name ascending and sliced to only include MAX_SCHEMA_SIZE items for schema_field in sorted( table_fields, - key=lambda x: x["delimited_name"], + key=lambda x: ( + -x["count"], + x["delimited_name"], + ), # Negate `count` for descending order, `delimited_name` stays the same for ascending )[0:MAX_SCHEMA_SIZE]: field_path = schema_field["delimited_name"] native_data_type = self.get_native_type(schema_field["type"], table_name) diff --git a/metadata-ingestion/tests/integration/dynamodb/dynamodb_default_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dynamodb/dynamodb_default_platform_instance_mces_golden.json index f3d6c9809f5d2c..04e0e71c295f8d 100644 --- a/metadata-ingestion/tests/integration/dynamodb/dynamodb_default_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dynamodb/dynamodb_default_platform_instance_mces_golden.json @@ -46,6 +46,18 @@ "recursive": false, "isPartOfKey": false }, + { + "fieldPath": "contactNumbers", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.ArrayType": {} + } + }, + "nativeDataType": "List", + "recursive": false, + "isPartOfKey": false + }, { "fieldPath": "partitionKey", "nullable": false, @@ -59,6 +71,78 @@ "recursive": false, "isPartOfKey": false }, + { + "fieldPath": "services", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.RecordType": {} + } + }, + "nativeDataType": "Map", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.hours", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.RecordType": {} + } + }, + "nativeDataType": "Map", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.hours.close", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "String", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.hours.open", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "String", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.parking", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "Boolean", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.wifi", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "String", + "recursive": false, + "isPartOfKey": false + }, { "fieldPath": "zip", "nullable": true, @@ -76,7 +160,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } }, { @@ -95,7 +180,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } }, { @@ -111,7 +197,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } }, { @@ -126,7 +213,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json index b1176b1fd5786d..2cfc559ed77444 100644 --- a/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json @@ -46,6 +46,18 @@ "recursive": false, "isPartOfKey": false }, + { + "fieldPath": "contactNumbers", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.ArrayType": {} + } + }, + "nativeDataType": "List", + "recursive": false, + "isPartOfKey": false + }, { "fieldPath": "partitionKey", "nullable": false, @@ -59,6 +71,78 @@ "recursive": false, "isPartOfKey": false }, + { + "fieldPath": "services", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.RecordType": {} + } + }, + "nativeDataType": "Map", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.hours", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.RecordType": {} + } + }, + "nativeDataType": "Map", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.hours.close", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "String", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.hours.open", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "String", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.parking", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "Boolean", + "recursive": false, + "isPartOfKey": false + }, + { + "fieldPath": "services.wifi", + "nullable": true, + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "String", + "recursive": false, + "isPartOfKey": false + }, { "fieldPath": "zip", "nullable": true, @@ -76,7 +160,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } }, { @@ -95,7 +180,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } }, { @@ -111,7 +197,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } }, { @@ -126,7 +213,8 @@ }, "systemMetadata": { "lastObserved": 1693396800000, - "runId": "dynamodb-test" + "runId": "dynamodb-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py b/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py index ef2446ddd8d627..0bca1e8ac66de5 100644 --- a/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py +++ b/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py @@ -15,7 +15,7 @@ @freeze_time(FROZEN_TIME) @mock_dynamodb @pytest.mark.integration -def test_dynamodb(pytestconfig, tmp_path, mock_time): +def test_dynamodb(pytestconfig, tmp_path): boto3.setup_default_session() client = boto3.client("dynamodb", region_name="us-west-2") client.create_table( @@ -35,6 +35,21 @@ def test_dynamodb(pytestconfig, tmp_path, mock_time): "city": {"S": "San Francisco"}, "address": {"S": "1st Market st"}, "zip": {"N": "94000"}, + "contactNumbers": { # List type + "L": [ + {"S": "+14150000000"}, + {"S": "+14151111111"}, + ] + }, + "services": { # Map type + "M": { + "parking": {"BOOL": True}, + "wifi": {"S": "Free"}, + "hours": { # Map type inside Map for nested structure + "M": {"open": {"S": "08:00"}, "close": {"S": "22:00"}} + }, + } + }, }, )