Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/kafka): Flag for optional schemas ingestion #12077

Merged
merged 10 commits into from
Dec 11, 2024
3 changes: 2 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```

The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatibility, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.
- #12077: `Kafka` source no longer ingests schemas as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
skrydal marked this conversation as resolved.
Show resolved Hide resolved
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.

Expand Down
29 changes: 18 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class KafkaSourceConfig(
default=False,
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
)
ingest_schemas_as_entities: bool = pydantic.Field(
default=False,
description="Enables ingesting schemas, additionally to the topics, as separate entities",
skrydal marked this conversation as resolved.
Show resolved Hide resolved
)


def get_kafka_consumer(
Expand Down Expand Up @@ -343,17 +347,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
self.report.report_dropped(topic)

# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)
if self.source_config.ingest_schemas_as_entities:
# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
for subject in self.schema_registry_client.get_subjects():
try:
yield from self._extract_record(
subject, True, topic_detail=None, extra_topic_config=None
)
except Exception as e:
logger.warning(
f"Failed to extract subject {subject}", exc_info=True
)
self.report.report_warning(
"subject", f"Exception while extracting topic {subject}: {e}"
)

def _extract_record(
self,
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/tests/unit/test_kafka_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
"topic2-key": "test.acryl.Topic2Key",
"topic2-value": "test.acryl.Topic2Value",
},
"ingest_schemas_as_entities": True,
}
ctx = PipelineContext(run_id="test")
kafka_source = KafkaSource.create(source_config, ctx)
Expand Down Expand Up @@ -478,8 +479,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
kafka_source = KafkaSource.create(source_config, ctx)

workunits = list(kafka_source.get_workunits())

assert len(workunits) == 6
assert len(workunits) == 2
if ignore_warnings_on_schema_type:
assert not kafka_source.report.warnings
else:
Expand Down Expand Up @@ -622,6 +622,7 @@ def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
kafka_source = KafkaSource.create(
{
"connection": {"bootstrap": "localhost:9092"},
"ingest_schemas_as_entities": True,
"meta_mapping": {
"owner": {
"match": "^@(.*)",
Expand Down
Loading