diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 4550a4119e8fb..9f5f119932ee8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -421,6 +421,10 @@ def _extract_record( for tag_association in meta_tags_aspect.tags ] + meta_domain_aspect = meta_aspects.get(Constants.ADD_DOMAIN_OPERATION) + if meta_domain_aspect: + dataset_snapshot.aspects.append(meta_domain_aspect) + if all_tags: dataset_snapshot.aspects.append( mce_builder.make_global_tag_aspect_with_tag_list(all_tags) diff --git a/metadata-ingestion/src/datahub/utilities/mapping.py b/metadata-ingestion/src/datahub/utilities/mapping.py index 3246efde0a18d..71601173897a8 100644 --- a/metadata-ingestion/src/datahub/utilities/mapping.py +++ b/metadata-ingestion/src/datahub/utilities/mapping.py @@ -381,7 +381,9 @@ def get_operation_value( if term.strip() ] elif operation_type == Constants.ADD_DOMAIN_OPERATION: - return mce_builder.make_domain_urn(operation_config[Constants.DOMAIN]) + domain = operation_config[Constants.DOMAIN] + domain = _insert_match_value(domain, _get_best_match(match, "domain")) + return mce_builder.make_domain_urn(domain) return None def sanitize_owner_ids(self, owner_id: str) -> str: