diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py index 8917e74b6d200..4b92c5034a091 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py @@ -480,15 +480,26 @@ def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]: return samples - def _process_sample_data(self, samples: List[Dict[str, Any]]) -> Dict[str, Any]: - """Process sample data to extract field information.""" + def _process_sample_data( + self, + samples: List[Dict[str, Any]], + schema_metadata: Optional[SchemaMetadataClass] = None, + ) -> Dict[str, Any]: + """Process sample data to extract field information, incorporating schema fields if available.""" all_keys: Set[str] = set() field_sample_map: Dict[str, List[str]] = {} + # If we have schema metadata, initialize the field map with schema fields + if schema_metadata is not None: + for schema_field in schema_metadata.fields or []: + field_path = schema_field.fieldPath + all_keys.add(field_path) + field_sample_map[field_path] = [] + + # Process samples for sample in samples: if isinstance(sample.get("value", None), dict): all_keys.update(sample["value"].keys()) - # Collect sample values for each field for key, value in sample["value"].items(): if key not in field_sample_map: field_sample_map[key] = [] @@ -520,10 +531,12 @@ def create_samples_wu( self, entity_urn: str, topic: str, + schema_metadata: Optional[SchemaMetadataClass] = None, ) -> Iterable[MetadataWorkUnit]: + """Create samples work unit incorporating both schema fields and sample values.""" samples = self.get_sample_messages(topic) if samples: - processed_data = self._process_sample_data(samples) + processed_data = self._process_sample_data(samples, schema_metadata) profile_data = self._create_profile_data( processed_data["all_keys"], processed_data["field_sample_map"], @@ -713,6 +726,7 @@ def _extract_record( yield from self.create_samples_wu( entity_urn=dataset_urn, topic=topic, + schema_metadata=schema_metadata, ) def build_custom_properties(