Skip to content

Commit

Permalink
Update kafka.py
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny committed Jan 15, 2025
1 parent 913f12f commit 929b6b1
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,15 +480,26 @@ def get_sample_messages(self, topic: str) -> List[Dict[str, Any]]:

return samples

Check warning on line 481 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L481

Added line #L481 was not covered by tests

def _process_sample_data(self, samples: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Process sample data to extract field information."""
def _process_sample_data(
self,
samples: List[Dict[str, Any]],
schema_metadata: Optional[SchemaMetadataClass] = None,
) -> Dict[str, Any]:
"""Process sample data to extract field information, incorporating schema fields if available."""
all_keys: Set[str] = set()
field_sample_map: Dict[str, List[str]] = {}

Check warning on line 490 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L489-L490

Added lines #L489 - L490 were not covered by tests

# If we have schema metadata, initialize the field map with schema fields
if schema_metadata is not None:
for schema_field in schema_metadata.fields or []:
field_path = schema_field.fieldPath
all_keys.add(field_path)
field_sample_map[field_path] = []

Check warning on line 497 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L493-L497

Added lines #L493 - L497 were not covered by tests

# Process samples
for sample in samples:
if isinstance(sample.get("value", None), dict):
all_keys.update(sample["value"].keys())
# Collect sample values for each field
for key, value in sample["value"].items():
if key not in field_sample_map:
field_sample_map[key] = []
Expand Down Expand Up @@ -520,10 +531,12 @@ def create_samples_wu(
self,
entity_urn: str,
topic: str,
schema_metadata: Optional[SchemaMetadataClass] = None,
) -> Iterable[MetadataWorkUnit]:
"""Create samples work unit incorporating both schema fields and sample values."""
samples = self.get_sample_messages(topic)
if samples:
processed_data = self._process_sample_data(samples)
processed_data = self._process_sample_data(samples, schema_metadata)
profile_data = self._create_profile_data(

Check warning on line 540 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L537-L540

Added lines #L537 - L540 were not covered by tests
processed_data["all_keys"],
processed_data["field_sample_map"],
Expand Down Expand Up @@ -713,6 +726,7 @@ def _extract_record(
yield from self.create_samples_wu(

Check warning on line 726 in metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py#L726

Added line #L726 was not covered by tests
entity_urn=dataset_urn,
topic=topic,
schema_metadata=schema_metadata,
)

def build_custom_properties(
Expand Down

0 comments on commit 929b6b1

Please sign in to comment.