diff --git a/metaphor/common/api_request.py b/metaphor/common/api_request.py index f9189929..3659ce2b 100644 --- a/metaphor/common/api_request.py +++ b/metaphor/common/api_request.py @@ -1,8 +1,8 @@ import json import secrets import tempfile -from typing import Any, Callable, Dict, Type, TypeVar -from urllib.parse import urlparse +from typing import Any, Callable, Dict, Literal, Type, TypeVar +from urllib.parse import urljoin, urlparse import requests from pydantic import TypeAdapter, ValidationError @@ -14,37 +14,41 @@ class ApiError(Exception): - def __init__(self, url: str, status_code: int, error_msg: str) -> None: + def __init__(self, url: str, status_code: int, body: str) -> None: self.status_code = status_code - self.error_msg = error_msg - super().__init__(f"call {url} api failed: {status_code}\n{error_msg}") + self.body = body + super().__init__(f"call {url} api failed: {status_code}\n{body}") -def get_request( +def make_request( url: str, headers: Dict[str, str], type_: Type[T], transform_response: Callable[[requests.Response], Any] = lambda r: r.json(), - timeout: int = 600, # default request timeout 600s + timeout: int = 10, + method: Literal["get", "post"] = "get", **kwargs, ) -> T: """Generic get api request to make third part api call and return with customized data class""" - result = requests.get(url, headers=headers, timeout=timeout, **kwargs) + result = getattr(requests, method)(url, headers=headers, timeout=timeout, **kwargs) if result.status_code == 200: - # Add JSON response to log.zip - file_name = ( - f"{urlparse(url).path[1:].replace('/', u'__')}_{secrets.token_hex(4)}" - ) + + # request signature, example: get_v1__resource_abcd + request_signature = f"{method}_{urlparse(url).path[1:].replace('/', u'__')}" + + # suffix with length 8 chars random string + suffix = f"_{secrets.token_hex(4)}.json" + # Avoid file name too long error and truncate prefix to avoid duplicate file name - # 250 is the lowest default maximum charactors file name length limit acrocess major file systems - file_name = ( - file_name[len(file_name) - 245 :] if len(file_name) > 245 else file_name - ) - file_name = f"{file_name}.json" + # 250 is the lowest default maximum characters file name length limit across major file systems + file_name = f"{request_signature[:250 - len(suffix)]}{suffix}" + + # Add JSON response to log.zip out_file = f"{tempfile.mkdtemp()}/{file_name}" with open(out_file, "w") as fp: json.dump(result.json(), fp, indent=2) debug_files.append(out_file) + try: return TypeAdapter(type_).validate_python(transform_response(result)) except ValidationError as error: @@ -54,3 +58,7 @@ def get_request( raise ApiError(url, result.status_code, "cannot parse result") else: raise ApiError(url, result.status_code, result.content.decode()) + + +def make_url(base: str, path: str): + return urljoin(base, path) diff --git a/metaphor/fivetran/extractor.py b/metaphor/fivetran/extractor.py index 55f3ca91..859488ca 100644 --- a/metaphor/fivetran/extractor.py +++ b/metaphor/fivetran/extractor.py @@ -4,7 +4,7 @@ from requests.auth import HTTPBasicAuth -from metaphor.common.api_request import ApiError, get_request +from metaphor.common.api_request import ApiError, make_request from metaphor.common.base_extractor import BaseExtractor from metaphor.common.entity_id import ( dataset_normalized_name, @@ -550,4 +550,4 @@ def _get_all(self, url: str, type_: Type[DataT]) -> List[DataT]: def _call_get(self, url: str, **kwargs): headers = {"Accept": "application/json;version=2"} - return get_request(url=url, headers=headers, auth=self._auth, **kwargs) + return make_request(url=url, headers=headers, auth=self._auth, **kwargs) diff --git a/metaphor/informatica/README.md b/metaphor/informatica/README.md new file mode 100644 index 00000000..20cd36fe --- /dev/null +++ b/metaphor/informatica/README.md @@ -0,0 +1,33 @@ +# Informatica Connector + +This connector extracts technical metadata from Informatica using [Informatica Intelligent Cloud Services REST API](https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/preface.html). + +## Config File + +Create a YAML config file based on the following template. + +### Required Configurations + +```yaml +base_url: +user: +password: +``` + +### Optional Configurations + +#### Output Destination + +See [Output Config](../common/docs/output.md) for more information. + +## Testing + +Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). + +Run the following command to test the connector locally: + +```shell +metaphor informatica +``` + +Manually verify the output after the command finishes. diff --git a/metaphor/informatica/__init__.py b/metaphor/informatica/__init__.py new file mode 100644 index 00000000..1b3c0c33 --- /dev/null +++ b/metaphor/informatica/__init__.py @@ -0,0 +1,6 @@ +from metaphor.common.cli import cli_main +from metaphor.informatica.extractor import InformaticaExtractor + + +def main(config_file: str): + cli_main(InformaticaExtractor, config_file) diff --git a/metaphor/informatica/config.py b/metaphor/informatica/config.py new file mode 100644 index 00000000..1a6eda37 --- /dev/null +++ b/metaphor/informatica/config.py @@ -0,0 +1,13 @@ +from pydantic.dataclasses import dataclass + +from metaphor.common.base_config import BaseConfig +from metaphor.common.dataclass import ConnectorConfig + + +@dataclass(config=ConnectorConfig) +class InformaticaRunConfig(BaseConfig): + user: str + + password: str + + base_url: str diff --git a/metaphor/informatica/extractor.py b/metaphor/informatica/extractor.py new file mode 100644 index 00000000..8faf914f --- /dev/null +++ b/metaphor/informatica/extractor.py @@ -0,0 +1,334 @@ +from typing import Callable, Collection, Dict, List, Optional +from urllib.parse import urljoin + +from metaphor.common.api_request import ApiError, make_request +from metaphor.common.base_extractor import BaseExtractor +from metaphor.common.entity_id import ( + to_dataset_entity_id_from_logical_id, + to_pipeline_entity_id_from_logical_id, +) +from metaphor.common.event_util import ENTITY_TYPES +from metaphor.common.logger import get_logger +from metaphor.common.utils import unique_list +from metaphor.informatica.config import InformaticaRunConfig +from metaphor.informatica.models import ( + AuthResponse, + ConnectionDetail, + MappingDetailResponse, + MappingParameter, + ObjectDetail, + ObjectDetailResponse, + ObjectReferenceResponse, + ReferenceObjectDetail, +) +from metaphor.informatica.utils import init_dataset_logical_id, parse_error +from metaphor.models.crawler_run_metadata import Platform +from metaphor.models.metadata_change_event import ( + AssetStructure, + DataPlatform, + Dataset, + EntityUpstream, + InformaticaMapping, + Pipeline, + PipelineInfo, + PipelineLogicalID, + PipelineMapping, + PipelineType, + SourceInfo, +) + +logger = get_logger() + + +V2_SESSION_HEADER = "icSessionId" +V3_SESSION_HEADER = "INFA-SESSION-ID" +AUTH_ERROR_CODE = "AUTH_01" +PARAMETER_SOURCE_TYPES = {"EXTENDED_SOURCE"} +PARAMETER_TARGET_TYPES = {"TARGET"} +CONNECTOR_PLATFORM_MAP = {"com.infa.adapter.snowflake": DataPlatform.SNOWFLAKE} + + +class InformaticaExtractor(BaseExtractor): + """Informatica metadata extractor""" + + _description = "Fivetran metadata crawler" + _platform = Platform.INFORMATICA + + @staticmethod + def from_config_file(config_file: str) -> "InformaticaExtractor": + return InformaticaExtractor(InformaticaRunConfig.from_yaml_file(config_file)) + + def __init__(self, config: InformaticaRunConfig) -> None: + super().__init__(config) + self._base_url = config.base_url + self._user = config.user + self._password = config.password + + self._session_id: str = "" + self._api_base_url: str = "" + + self._pipelines: Dict[str, Pipeline] = {} + self._datasets: Dict[str, Dataset] = {} + + async def extract(self) -> Collection[ENTITY_TYPES]: + logger.info("Fetching metadata from Informatica") + + connections = self.extract_connection_detail() + mappings = self.extract_mapping() + self.extract_table_lineage(connections, mappings) + + entities: List[ENTITY_TYPES] = [] + entities.extend(self._pipelines.values()) + entities.extend(self._datasets.values()) + return entities + + def extract_connection_detail(self) -> Dict[str, ConnectionDetail]: + connection_map: Dict[str, ConnectionDetail] = {} + for connection_id in self._list_connection(): + connection_map[connection_id] = self._get_connection_detail(connection_id) + return connection_map + + def extract_mapping(self) -> Dict[str, MappingDetailResponse]: + # v3 id -> mapping object + v3_mapping_objects = self._get_v3_object(type="MAPPING") + + # v2 id -> connection ref object + v3_connections_ref_type: Dict[str, ReferenceObjectDetail] = {} + + # v2 id -> mapping object + mapping_v3_v2_id_map: Dict[str, ReferenceObjectDetail] = {} + + for mapping_v3_object in v3_mapping_objects.values(): + reference = self._get_object_reference(mapping_v3_object.id, "Uses") + for ref in reference.references: + if ref.documentType == "SAAS_CONNECTION": + v3_connections_ref_type[ref.appContextId] = ref + + for connection_ref in v3_connections_ref_type.values(): + reference = self._get_object_reference(connection_ref.id, "usedBy") + for ref in reference.references: + if ref.documentType == "MAPPING": + mapping_v3_v2_id_map[ref.appContextId] = ref + + mappings: Dict[str, MappingDetailResponse] = {} + + for mapping_ref in mapping_v3_v2_id_map.values(): + mapping_detail = self._get_mapping_detail(mapping_ref.appContextId) + + v3_id = mapping_ref.id + v3_mapping_object = v3_mapping_objects.get(v3_id) + + pipeline = Pipeline( + logical_id=PipelineLogicalID( + name=mapping_ref.id, # we should use v3 id + type=PipelineType.INFORMATICA_MAPPING, + ), + source_info=SourceInfo( + main_url=f"{self._api_base_url}/diUI/products/integrationDesign/main/mapping/{v3_id}", + created_at_source=mapping_detail.createTime, + created_by=mapping_detail.createdBy, + last_updated=mapping_detail.updateTime, + updated_by=mapping_detail.updatedBy, + ), + informatica_mapping=InformaticaMapping( + name=mapping_detail.name, + description=mapping_detail.description, + ), + structure=AssetStructure( + name=mapping_detail.name, + directories=( + v3_mapping_object.path.split("/")[:-1] + if v3_mapping_object + else [] + ), + ), + ) + self._pipelines[v3_id] = pipeline + + mappings[v3_id] = mapping_detail + return mappings + + def extract_table_lineage( + self, + connections: Dict[str, ConnectionDetail], + mappings: Dict[str, MappingDetailResponse], + ): + for v3_id, mapping in mappings.items(): + pipeline = self._pipelines.get(v3_id) + assert pipeline is not None + pipeline_entity_id = str( + to_pipeline_entity_id_from_logical_id(pipeline.logical_id) + ) + + sources = [ + p for p in mapping.parameters if p.type in PARAMETER_SOURCE_TYPES + ] + + def trans_source(source: MappingParameter) -> Optional[str]: + connection = connections.get(source.sourceConnectionId or "") + if connection is None or source.extendedObject.object is None: + logger.warning( + f"Invalid source connection id for mapping '{mapping.name}'" + ) + return None + + logical_id = init_dataset_logical_id( + source.extendedObject.object.name, connection + ) + + if logical_id is None: + return None + + return str(to_dataset_entity_id_from_logical_id(logical_id)) + + def is_not_none(value): + return value is not None + + source_entities = unique_list( + filter(is_not_none, map(trans_source, sources)) + ) + + targets = [ + p for p in mapping.parameters if p.type in PARAMETER_TARGET_TYPES + ] + + for target in targets: + connection = connections.get(target.targetConnectionId or "") + + if connection is None or target.targetObject is None: + logger.warning( + f"Invalid target connection id for mapping '{mapping.name}'" + ) + continue + + logical_id = init_dataset_logical_id(target.targetObject, connection) + + if logical_id is None: + logger.warning( + f"Failed to construct target object for mapping '{mapping.name}'" + ) + continue + + dataset = Dataset( + logical_id=logical_id, + entity_upstream=EntityUpstream(source_entities=source_entities), + pipeline_info=PipelineInfo( + pipeline_mapping=[ + PipelineMapping( + is_virtual=False, + pipeline_entity_id=pipeline_entity_id, + ) + ] + ), + ) + + self._datasets[logical_id.name] = dataset + + def with_retry(func: Callable): # type: ignore + def retry_once(self, *args, **kwargs): + for _ in range(2): + try: + self._ensure_session_id() + return func(self, *args, **kwargs) + except ApiError as api_error: + response = parse_error(api_error.body) + if ( + api_error.status_code == 401 + and response + and response.get("code") == AUTH_ERROR_CODE + ): + logger.warning("Session is expired") + self._ensure_session_id(new_session=True) + continue + raise api_error + raise RuntimeError("Invalid Informatica login credential") + + return retry_once + + @with_retry + def _list_connection(self) -> List[str]: + connections = make_request( + url=urljoin(self._api_base_url, "saas/api/v2/connection/"), + headers={V2_SESSION_HEADER: self._session_id}, + type_=List[ConnectionDetail], + timeout=10, + ) + return [c.id for c in connections] + + @with_retry + def _get_connection_detail(self, connection_id: str) -> ConnectionDetail: + return make_request( + url=urljoin(self._api_base_url, f"saas/api/v2/connection/{connection_id}"), + headers={V2_SESSION_HEADER: self._session_id}, + type_=ConnectionDetail, + timeout=10, + ) + + @with_retry + def _get_v3_object(self, type: str) -> Dict[str, ObjectDetail]: + v3_objects: Dict[str, ObjectDetail] = {} + + page_size = 20 + while True: + page = make_request( + url=urljoin(self._api_base_url, "saas/public/core/v3/objects"), + headers={V3_SESSION_HEADER: self._session_id}, + params={ + "limit": page_size, + "skip": len(v3_objects), + "q": f"type=='{type}'", + }, + type_=ObjectDetailResponse, + timeout=30, + ) + if len(page.objects) == 0: + break + for object in page.objects: + v3_objects[object.id] = object + + return v3_objects + + @with_retry + def _get_mapping_detail(self, id: str) -> MappingDetailResponse: + return make_request( + url=urljoin(self._api_base_url, f"saas/api/v2/mapping/{id}"), + headers={V2_SESSION_HEADER: self._session_id}, + type_=MappingDetailResponse, + timeout=10, + ) + + @with_retry + def _get_object_reference( + self, v3_id: str, ref_type: str + ) -> ObjectReferenceResponse: + return make_request( + url=urljoin( + self._api_base_url, f"saas/public/core/v3/objects/{v3_id}/references" + ), + headers={V3_SESSION_HEADER: self._session_id}, + params={ + "refType": ref_type, + }, + type_=ObjectReferenceResponse, + timeout=10, + ) + + def _ensure_session_id(self, new_session=False): + if new_session or not self._session_id: + self._get_session_id() + + def _get_session_id(self): + auth_response = make_request( + url=urljoin(self._base_url, "saas/public/core/v3/login"), + headers={}, + type_=AuthResponse, + timeout=10, + method="post", + json={ + "username": self._user, + "password": self._password, + }, + ) + assert len(auth_response.products) >= 1 + self._api_base_url = auth_response.products[0].baseApiUrl + self._session_id = auth_response.userInfo.sessionId diff --git a/metaphor/informatica/models.py b/metaphor/informatica/models.py new file mode 100644 index 00000000..6cc2441a --- /dev/null +++ b/metaphor/informatica/models.py @@ -0,0 +1,100 @@ +import datetime +from typing import List, Optional + +from pydantic import BaseModel + + +class AuthResponseAuthInfo(BaseModel): + sessionId: str + status: str + + +class AuthResponseProduct(BaseModel): + name: str + baseApiUrl: str + + +class AuthResponse(BaseModel): + """ + doc: https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/platform_rest_api_version_3_resources/login_2.html + """ + + userInfo: AuthResponseAuthInfo + products: List[AuthResponseProduct] + + +class ConnectorParams(BaseModel): + account: Optional[str] = None + + +class ConnectionDetail(BaseModel): + """ + doc: https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/data_integration_rest_api/connection.html + """ + + id: str + connParams: Optional[ConnectorParams] = None + type: str + connectorGuid: str + + +class ObjectDetail(BaseModel): + id: str + tags: List[str] + path: str + + +class ObjectDetailResponse(BaseModel): + """ + doc: https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/platform_rest_api_version_3_resources/objects/finding_an_asset.html + """ + + count: int + objects: List[ObjectDetail] + + +class ReferenceObjectDetail(BaseModel): + id: str + appContextId: Optional[str] = None + documentType: str + + +class ObjectReferenceResponse(BaseModel): + """ + doc: https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/platform_rest_api_version_3_resources/objects/finding_asset_dependencies.html + """ + + count: int + references: List[ReferenceObjectDetail] + + +class ExtendedObjectDetail(BaseModel): + name: str + + +class ExtendedObject(BaseModel): + object: Optional[ExtendedObjectDetail] = None + singleMode: bool + + +class MappingParameter(BaseModel): + type: str + extendedObject: ExtendedObject + sourceConnectionId: Optional[str] = None + targetConnectionId: Optional[str] = None + targetObject: Optional[str] = None + + +class MappingDetailResponse(BaseModel): + """ + doc: https://docs.informatica.com/integration-cloud/b2b-gateway/current-version/rest-api-reference/data_integration_rest_api/mapping.html + """ + + name: str + description: str + createTime: datetime.datetime + updateTime: datetime.datetime + createdBy: str + updatedBy: str + + parameters: List[MappingParameter] diff --git a/metaphor/informatica/utils.py b/metaphor/informatica/utils.py new file mode 100644 index 00000000..751ddf4f --- /dev/null +++ b/metaphor/informatica/utils.py @@ -0,0 +1,48 @@ +import json +from typing import Optional + +from metaphor.common.entity_id import dataset_normalized_name +from metaphor.common.logger import get_logger +from metaphor.informatica.models import ConnectionDetail +from metaphor.models.metadata_change_event import DataPlatform, DatasetLogicalID + +CONNECTOR_PLATFORM_MAP = {"com.infa.adapter.snowflake": DataPlatform.SNOWFLAKE} + +logger = get_logger() + + +def parse_error(content: str): + try: + return json.loads(content) + except json.decoder.JSONDecodeError: + return None + + +def init_dataset_logical_id( + name: str, # INFA format name + connection: ConnectionDetail, +) -> Optional[DatasetLogicalID]: + if connection.type != "TOOLKIT_CCI": + logger.warning(f"Unsupported connection type: {connection.type}") + return None + + account = connection.connParams.account if connection.connParams else None + + parts = name.split("/") + if len(parts) != 3: + logger.warning(f"Invalid object name: {name}") + return None + + [database, schema, table] = parts + + platform = CONNECTOR_PLATFORM_MAP.get(connection.connectorGuid) + + if platform is None: + logger.warning(f"Unsupported connector guid: {connection.connectorGuid}") + return None + + return DatasetLogicalID( + name=dataset_normalized_name(db=database, schema=schema, table=table), + platform=platform, + account=account, + ) diff --git a/metaphor/power_bi/models.py b/metaphor/power_bi/models.py index 72939463..992bcc65 100644 --- a/metaphor/power_bi/models.py +++ b/metaphor/power_bi/models.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Any, List, Optional -from metaphor.common.models import BaseModel +from pydantic import BaseModel class PowerBIApp(BaseModel): diff --git a/metaphor/power_bi/power_bi_client.py b/metaphor/power_bi/power_bi_client.py index 7dea8f7c..efe0b16f 100644 --- a/metaphor/power_bi/power_bi_client.py +++ b/metaphor/power_bi/power_bi_client.py @@ -5,7 +5,7 @@ import requests -from metaphor.common.api_request import ApiError, get_request +from metaphor.common.api_request import ApiError, make_request from metaphor.common.logger import get_logger from metaphor.common.utils import start_of_day from metaphor.power_bi.config import PowerBIRunConfig @@ -441,7 +441,7 @@ def _call_get( transform_response: Callable[[requests.Response], Any] = lambda r: r.json(), ) -> T: try: - return get_request( + return make_request( url, self.get_headers(), type_, @@ -449,10 +449,10 @@ def _call_get( ) except ApiError as error: if error.status_code == 401: - raise AuthenticationError(error.error_msg) from None + raise AuthenticationError(error.body) from None elif error.status_code == 404: - raise EntityNotFoundError(error.error_msg) from None + raise EntityNotFoundError(error.body) from None else: raise AssertionError( - f"GET {url} failed: {error.status_code}\n{error.error_msg}" + f"GET {url} failed: {error.status_code}\n{error.body}" ) from None diff --git a/poetry.lock b/poetry.lock index 6b641f83..883b7e0c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3094,13 +3094,13 @@ files = [ [[package]] name = "metaphor-models" -version = "0.37.0" +version = "0.37.1" description = "" optional = false python-versions = "<4.0,>=3.8" files = [ - {file = "metaphor_models-0.37.0-py3-none-any.whl", hash = "sha256:d1dd1134b2495ac1b21cbae4288935f72a382a5c57179c7e58da281bfb37c425"}, - {file = "metaphor_models-0.37.0.tar.gz", hash = "sha256:7eec3e574dca46d0fbc6172d084d9a602bced018bef524854ab98922b25edae0"}, + {file = "metaphor_models-0.37.1-py3-none-any.whl", hash = "sha256:5ebe69aa966805a1bceaca08855251e4808222c26c4f5f255ea5d1f7715f016c"}, + {file = "metaphor_models-0.37.1.tar.gz", hash = "sha256:54fc21f3857abc7abee3e9f62af13c03a5eded0fa5a982178bd367596009e264"}, ] [[package]] @@ -4850,6 +4850,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -6407,4 +6408,4 @@ unity-catalog = ["databricks-sdk", "databricks-sql-connector", "sqlglot"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "63eb1f68ad0dee8ddd549242ba8555173e449ba558fbb7497684cae98981c3c7" +content-hash = "25c1270cfef037e830b9d7bc04036ce6b1ba638c709e6ae79d900c6e99d163ed" diff --git a/pyproject.toml b/pyproject.toml index eb9a0d86..02972716 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.57" +version = "0.14.58" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] @@ -41,7 +41,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true } llama-index-readers-notion = { version = "^0.1.6", optional = true } looker-sdk = { version = "^24.2.0", optional = true } lxml = { version = "~=5.0.0", optional = true } -metaphor-models = "0.37.0" +metaphor-models = "0.37.1" more-itertools = { version = "^10.1.0", optional = true } msal = { version = "^1.28.0", optional = true } msgraph-beta-sdk = { version = "~1.4.0", optional = true } diff --git a/tests/common/test_api_request.py b/tests/common/test_api_request.py index b978143a..f634cba1 100644 --- a/tests/common/test_api_request.py +++ b/tests/common/test_api_request.py @@ -3,7 +3,7 @@ from pydantic import BaseModel -from metaphor.common.api_request import ApiError, get_request +from metaphor.common.api_request import ApiError, make_request class DummyResult(BaseModel): @@ -17,7 +17,7 @@ def test_get_request_200(mock_get: MagicMock): mock_response.json.return_value = {"foo": "bar"} mock_get.return_value = mock_response - result = get_request("http://test.com", {}, DummyResult) + result = make_request("http://test.com", {}, DummyResult) assert result.foo == "bar" @@ -28,7 +28,7 @@ def test_get_request_not_200(mock_get: MagicMock): mock_get.return_value = mock_response try: - get_request("http://test.com", {}, Dict) + make_request("http://test.com", {}, Dict) assert False, "ApiError not thrown" except ApiError: assert True diff --git a/tests/informatica/__init__.py b/tests/informatica/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/informatica/config.yml b/tests/informatica/config.yml new file mode 100644 index 00000000..9b095682 --- /dev/null +++ b/tests/informatica/config.yml @@ -0,0 +1,5 @@ +--- +base_url: https://test +user: user +password: password +output: {} diff --git a/tests/informatica/expected.json b/tests/informatica/expected.json new file mode 100644 index 00000000..0b0c3a2f --- /dev/null +++ b/tests/informatica/expected.json @@ -0,0 +1,45 @@ +[ + { + "informaticaMapping": { + "description": "desc in Informatica", + "name": "Extract same station bike hires" + }, + "logicalId": { + "name": "71e4gvVQh8lclejElAZ2qZ", + "type": "INFORMATICA_MAPPING" + }, + "sourceInfo": { + "createdAtSource": "2024-07-17T02:25:28+00:00", + "createdBy": "metaphor-dev", + "lastUpdated": "2024-07-25T07:34:07+00:00", + "mainUrl": "https://usw5.dm-us.informaticacloud.com/saas/diUI/products/integrationDesign/main/mapping/71e4gvVQh8lclejElAZ2qZ", + "updatedBy": "metaphor-dev" + }, + "structure": { + "directories": [ + "Default" + ], + "name": "Extract same station bike hires" + } + }, + { + "entityUpstream": { + "sourceEntities": [ + "DATASET~91F0104280AA722838FE72A102FB273A" + ] + }, + "logicalId": { + "account": "snowflake-account", + "name": "acme.informatica.bike_hires_same_station", + "platform": "SNOWFLAKE" + }, + "pipelineInfo": { + "pipelineMapping": [ + { + "isVirtual": false, + "pipelineEntityId": "PIPELINE~BC684854C28C480CD274A4A7736A5FB8" + } + ] + } + } +] diff --git a/tests/informatica/responses/get_saas__api__v2__connection__01CY6H0B000000000002_550571c1.json b/tests/informatica/responses/get_saas__api__v2__connection__01CY6H0B000000000002_550571c1.json new file mode 100644 index 00000000..fff3a7e2 --- /dev/null +++ b/tests/informatica/responses/get_saas__api__v2__connection__01CY6H0B000000000002_550571c1.json @@ -0,0 +1,56 @@ +{ + "@type": "connection", + "id": "01CY6H0B000000000002", + "orgId": "01CY6H", + "name": "Snowflake", + "createTime": "2024-07-15T05:50:45.000Z", + "updateTime": "2024-07-15T06:00:33.000Z", + "createdBy": "metaphor-dev", + "updatedBy": "metaphor-dev", + "agentId": "00800000000002H", + "runtimeEnvironmentId": "025000000000002", + "instanceName": "Snowflake Cloud Data Warehouse V2", + "instanceDisplayName": "Snowflake Data Cloud", + "type": "TOOLKIT_CCI", + "subType": "TOOLKIT_CCI", + "port": 0, + "majorUpdateTime": "2024-07-15T06:00:33.000Z", + "timeout": 60, + "connParams": { + "agentId": "00800000000002H", + "role": "", + "clientID": "", + "oAuthRefreshKey": "", + "agentGroupId": "025000000000002", + "CCM_CONN_PRESENTATION": "", + "orgId": "01CY6H", + "password": "********", + "privateKeyFile": "", + "accessTokenURL": "", + "scope": "", + "additionalparam": "", + "refreshTokenExpireIn": "", + "authentication": "Standard", + "oAuthRefreshText": "", + "accessTokenExpireIn": "", + "oAuthURL": "", + "authorizationCodeParameters": "", + "warehouse": "", + "productInfo": "", + "oAuthRefreshUrl": "", + "clientAuthentication": "Basic Auth Header", + "accessTokenParameters": "", + "tokenType": "", + "user": "", + "account": "snowflake-account" + }, + "internal": false, + "federatedId": "gdm2ALESzgxjhPkzOGvns7", + "retryNetworkError": false, + "supportsCCIMultiGroup": false, + "metadataBrowsable": true, + "supportLabels": true, + "vaultEnabled": false, + "vaultEnabledParams": [], + "connectorGuid": "com.infa.adapter.snowflake" +} diff --git a/tests/informatica/responses/get_saas__api__v2__connection___da2aeee5.json b/tests/informatica/responses/get_saas__api__v2__connection___da2aeee5.json new file mode 100644 index 00000000..094b3393 --- /dev/null +++ b/tests/informatica/responses/get_saas__api__v2__connection___da2aeee5.json @@ -0,0 +1,35 @@ +[ + { + "@type": "connection", + "id": "01CY6H0B000000000002", + "orgId": "01CY6H", + "name": "Snowflake", + "createTime": "2024-07-15T05:50:45.000Z", + "updateTime": "2024-07-15T06:00:33.000Z", + "createdBy": "metaphor-dev", + "updatedBy": "metaphor-dev", + "agentId": "00800000000002H", + "runtimeEnvironmentId": "025000000000002", + "instanceName": "Snowflake Cloud Data Warehouse V2", + "instanceDisplayName": "Snowflake Data Cloud", + "type": "TOOLKIT_CCI", + "subType": "TOOLKIT_CCI", + "port": 0, + "majorUpdateTime": "2024-07-15T06:00:33.000Z", + "timeout": 60, + "connParams": { + "agentId": "00800000000002H", + "agentGroupId": "025000000000002", + "orgId": "01CY6H" + }, + "internal": false, + "federatedId": "gdm2ALESzgxjhPkzOGvns7", + "retryNetworkError": false, + "supportsCCIMultiGroup": false, + "metadataBrowsable": true, + "supportLabels": true, + "vaultEnabled": false, + "vaultEnabledParams": [], + "connectorGuid": "com.infa.adapter.snowflake" + } +] diff --git a/tests/informatica/responses/get_saas__api__v2__mapping__01CY6H1700000000000W_e499f963.json b/tests/informatica/responses/get_saas__api__v2__mapping__01CY6H1700000000000W_e499f963.json new file mode 100644 index 00000000..f683f449 --- /dev/null +++ b/tests/informatica/responses/get_saas__api__v2__mapping__01CY6H1700000000000W_e499f963.json @@ -0,0 +1,175 @@ +{ + "@type": "mappingTemplate", + "id": "01CY6H1700000000000W", + "orgId": "01CY6H", + "name": "Extract same station bike hires", + "description": "desc in Informatica", + "createTime": "2024-07-17T02:25:28.000Z", + "updateTime": "2024-07-25T07:34:07.000Z", + "createdBy": "metaphor-dev", + "updatedBy": "metaphor-dev", + "bundleVersion": "0", + "templateId": "stringIdentity:01CY6H0X00000000003E", + "testTaskId": "01CY6H0Z000000000003", + "deployTime": 1721907248000, + "hasParameters": false, + "valid": true, + "fixedConnection": true, + "hasParametersDeployed": false, + "fixedConnectionDeployed": true, + "isSchemaValidationEnabled": false, + "deployedTemplateId": "stringIdentity:01CY6H0X000000000049", + "prunedTemplateId": "stringIdentity:01CY6H0X00000000003S", + "tasks": 1, + "mappingPreviewFileRecordId": "01CY6H0X000000000048", + "deployedMappingPreviewFileRecordId": "01CY6H0X00000000004A", + "documentType": "MAPPING", + "parameters": [ + { + "@type": "mtTaskParameter", + "id": -1, + "name": "$cleaned_bike_ride$", + "type": "EXTENDED_SOURCE", + "label": "cleaned_bike_ride", + "uiProperties": { + "connectionParameterized": "false", + "isSelectDistinct": "false", + "isMsrcFilterParameterized": "false", + "isMsrcSortParameterized": "false", + "objectParameterized": "false", + "visible": "false", + "originalPath": "cleaned_bike_ride" + }, + "sourceConnectionId": "01CY6H0B000000000002", + "newFlatFile": false, + "newObject": false, + "showBusinessNames": false, + "naturalOrder": true, + "truncateTarget": false, + "bulkApiDBTarget": false, + "tgtFieldRefs": {}, + "targetRefsV2": {}, + "targetUpdateColumns": [], + "extendedObject": { + "@type": "extendedObject", + "object": { + "@type": "mObject", + "name": "ACME/RIDE_SHARE/CLEANED_BIKE_RIDES", + "label": "CLEANED_BIKE_RIDES", + "metadataUpdated": false, + "relations": [], + "children": [] + }, + "singleMode": true, + "objects": [ + { + "@type": "mObject", + "name": "ACME/RIDE_SHARE/CLEANED_BIKE_RIDES", + "label": "CLEANED_BIKE_RIDES", + "metadataUpdated": false, + "relations": [], + "children": [] + } + ], + "filters": [], + "sortFields": [] + }, + "runtimeAttrs": {}, + "oprRuntimeAttrs": { + "schema": "", + "postsql": "", + "database": "", + "role": "", + "sqloverride": "", + "warehouse": "", + "tablename": "", + "presql": "" + }, + "isRESTModernSource": true, + "isFileList": false, + "handleSpecialChars": false, + "handleDecimalRoundOff": false, + "frsAsset": false, + "dynamicFileName": false, + "excludeDynamicFileNameField": false, + "currentlyProcessedFileName": false, + "retainFieldMetadata": false, + "useExactSrcNames": false, + "tgtObjectAttributes": {}, + "runtimeParameterData": { + "@type": "mtTaskRuntimeParameterData", + "isConnectionRuntimeParameter": false, + "isObjectRuntimeParameter": false + }, + "overriddenFields": [] + }, + { + "@type": "mtTaskParameter", + "id": -1, + "name": "$bike_hires_same_station$", + "type": "TARGET", + "label": "bike_hires_same_station", + "uiProperties": { + "connectionParameterized": "false", + "objectParameterized": "false", + "visible": "false", + "defaultTargetUpdateColumns": "", + "supportApplyDDLChanges": "true", + "originalPath": "bike_hires_same_station" + }, + "targetConnectionId": "01CY6H0B000000000002", + "targetObject": "ACME/INFORMATICA/bike_hires_same_station", + "targetObjectLabel": "ACME/INFORMATICA/bike_hires_same_station", + "newFlatFile": false, + "newObject": true, + "showBusinessNames": false, + "naturalOrder": true, + "newObjectName": "ACME/INFORMATICA/bike_hires_same_station", + "truncateTarget": false, + "bulkApiDBTarget": false, + "operationType": "Insert", + "tgtFieldRefs": {}, + "targetRefsV2": {}, + "targetUpdateColumns": [], + "extendedObject": { + "@type": "extendedObject", + "singleMode": true, + "filters": [], + "sortFields": [] + }, + "runtimeAttrs": {}, + "isRESTModernSource": true, + "isFileList": false, + "handleSpecialChars": false, + "handleDecimalRoundOff": false, + "frsAsset": false, + "dynamicFileName": false, + "excludeDynamicFileNameField": false, + "currentlyProcessedFileName": false, + "retainFieldMetadata": false, + "objectName": "ACME/INFORMATICA/bike_hires_same_station", + "objectLabel": "ACME/INFORMATICA/bike_hires_same_station", + "useExactSrcNames": false, + "tgtObjectAttributes": { + "tableType": "TABLE", + "parent": "ACME/INFORMATICA" + }, + "runtimeParameterData": { + "@type": "mtTaskRuntimeParameterData", + "isConnectionRuntimeParameter": false, + "isObjectRuntimeParameter": false + }, + "targetSchemaProviderType": "SELECTED_OBJECT", + "overriddenFields": [] + } + ], + "inOutParameters": [], + "references": [ + { + "@type": "reference", + "refObjectId": "01CY6H0B000000000002", + "refType": "connection" + } + ], + "sequences": [] +} diff --git a/tests/informatica/responses/get_saas__public__core__v3__objects_8d406d40.json b/tests/informatica/responses/get_saas__public__core__v3__objects_8d406d40.json new file mode 100644 index 00000000..4deb5b18 --- /dev/null +++ b/tests/informatica/responses/get_saas__public__core__v3__objects_8d406d40.json @@ -0,0 +1,16 @@ +{ + "count": 2, + "objects": [ + { + "id": "71e4gvVQh8lclejElAZ2qZ", + "path": "Default/Extract same station bike hires", + "type": "DTEMPLATE", + "description": "desc in Informatica", + "updatedBy": "metaphor-dev", + "updateTime": "2024-07-25T11:34:09Z", + "tags": [], + "sourceControl": null, + "customAttributes": null + } + ] +} diff --git a/tests/informatica/responses/get_saas__public__core__v3__objects__71e4gvVQh8lclejElAZ2qZ__references_d6d6c585.json b/tests/informatica/responses/get_saas__public__core__v3__objects__71e4gvVQh8lclejElAZ2qZ__references_d6d6c585.json new file mode 100644 index 00000000..cf74c801 --- /dev/null +++ b/tests/informatica/responses/get_saas__public__core__v3__objects__71e4gvVQh8lclejElAZ2qZ__references_d6d6c585.json @@ -0,0 +1,14 @@ +{ + "count": 1, + "@id": "71e4gvVQh8lclejElAZ2qZ", + "references": [ + { + "id": "gdm2ALESzgxjhPkzOGvns7", + "appContextId": "01CY6H0B000000000002", + "path": "Snowflake", + "documentType": "SAAS_CONNECTION", + "description": null, + "lastUpdatedTime": "2024-07-15T10:00:34Z" + } + ] +} diff --git a/tests/informatica/responses/get_saas__public__core__v3__objects__8WzoMOwPAofia8FrbiDU2e__references_6bb91724.json b/tests/informatica/responses/get_saas__public__core__v3__objects__8WzoMOwPAofia8FrbiDU2e__references_6bb91724.json new file mode 100644 index 00000000..b9819430 --- /dev/null +++ b/tests/informatica/responses/get_saas__public__core__v3__objects__8WzoMOwPAofia8FrbiDU2e__references_6bb91724.json @@ -0,0 +1,30 @@ +{ + "count": 3, + "@id": "8WzoMOwPAofia8FrbiDU2e", + "references": [ + { + "id": "17N0kgL3NlAltRFBDEQQss", + "appContextId": "01CY6H0L00000000001C", + "path": "Add-On Bundles/EDI Gateway V2.1/mplt_IB_ACK_Processor", + "documentType": "SAAS_CUSTOM_FUNC", + "description": null, + "lastUpdatedTime": "2024-07-15T09:28:50Z" + }, + { + "id": "i7GegbaTs0GfaD7gkHFuAN", + "appContextId": "01CY6H0L00000000000T", + "path": "Add-On Bundles/EDI Gateway V2.1/mplt_IB_GS_Processor", + "documentType": "SAAS_CUSTOM_FUNC", + "description": null, + "lastUpdatedTime": "2024-07-15T09:28:47Z" + }, + { + "id": "a2ZNly9miTRjLlJgRtyOAq", + "appContextId": "01CY6H0L000000000011", + "path": "Add-On Bundles/EDI Gateway V2.1/mplt_IB_ISA_Processor", + "documentType": "SAAS_CUSTOM_FUNC", + "description": null, + "lastUpdatedTime": "2024-07-15T09:28:49Z" + } + ] +} diff --git a/tests/informatica/responses/get_saas__public__core__v3__objects__gdm2ALESzgxjhPkzOGvns7__references_56133b21.json b/tests/informatica/responses/get_saas__public__core__v3__objects__gdm2ALESzgxjhPkzOGvns7__references_56133b21.json new file mode 100644 index 00000000..668e75d2 --- /dev/null +++ b/tests/informatica/responses/get_saas__public__core__v3__objects__gdm2ALESzgxjhPkzOGvns7__references_56133b21.json @@ -0,0 +1,30 @@ +{ + "count": 3, + "@id": "gdm2ALESzgxjhPkzOGvns7", + "references": [ + { + "id": "6A1MCUiynMDdWhYNCI0Oc0", + "appContextId": null, + "path": "Default/Data Transfer Task1", + "documentType": "DTT", + "description": "", + "lastUpdatedTime": "2024-07-15T10:02:25Z" + }, + { + "id": "71e4gvVQh8lclejElAZ2qZ", + "appContextId": "01CY6H1700000000000W", + "path": "Default/Extract same station bike hires", + "documentType": "MAPPING", + "description": "desc in Informatica", + "lastUpdatedTime": "2024-07-25T11:34:09Z" + }, + { + "id": "da7K7KnK6w9lGoUogqtcg8", + "appContextId": "01CY6H0Z000000000002", + "path": "Default/Extract same station bike hires task", + "documentType": "MCT", + "description": null, + "lastUpdatedTime": "2024-07-25T11:34:09Z" + } + ] +} diff --git a/tests/informatica/responses/get_saas__public__core__v3__objects_b34053ea.json b/tests/informatica/responses/get_saas__public__core__v3__objects_b34053ea.json new file mode 100644 index 00000000..37d8f064 --- /dev/null +++ b/tests/informatica/responses/get_saas__public__core__v3__objects_b34053ea.json @@ -0,0 +1,4 @@ +{ + "count": 2, + "objects": [] +} diff --git a/tests/informatica/responses/get_saas__public__core__v3__objects_b96dd6a7.json b/tests/informatica/responses/get_saas__public__core__v3__objects_b96dd6a7.json new file mode 100644 index 00000000..64f6ac48 --- /dev/null +++ b/tests/informatica/responses/get_saas__public__core__v3__objects_b96dd6a7.json @@ -0,0 +1,16 @@ +{ + "count": 2, + "objects": [ + { + "id": "8WzoMOwPAofia8FrbiDU2e", + "path": "Add-On Bundles/EDI Gateway V2.1/B2B_Gateway_EDI_Inbound_Mapping", + "type": "DTEMPLATE", + "description": "", + "updatedBy": "bundle-license-notifier", + "updateTime": "2024-07-15T09:28:52Z", + "tags": [], + "sourceControl": null, + "customAttributes": null + } + ] +} diff --git a/tests/informatica/responses/post_saas__public__core__v3__login_03b80555.json b/tests/informatica/responses/post_saas__public__core__v3__login_03b80555.json new file mode 100644 index 00000000..ae7d245e --- /dev/null +++ b/tests/informatica/responses/post_saas__public__core__v3__login_03b80555.json @@ -0,0 +1,18 @@ +{ + "products": [ + { + "name": "Integration Cloud", + "baseApiUrl": "https://usw5.dm-us.informaticacloud.com/saas" + } + ], + "userInfo": { + "sessionId": "a2ww3bpo7kWiulCQEgX5E9", + "id": "0ZGLGtzmytfe5ZJI6kZcwN", + "name": "", + "parentOrgId": "52ZSTB0IDK6dXxaEQLUaQu", + "orgId": "5Mp4BTqelMFlR89CcPgf49", + "orgName": "", + "groups": {}, + "status": "Active" + } +} diff --git a/tests/informatica/test_config.py b/tests/informatica/test_config.py new file mode 100644 index 00000000..172a083d --- /dev/null +++ b/tests/informatica/test_config.py @@ -0,0 +1,15 @@ +from metaphor.common.base_config import OutputConfig +from metaphor.informatica.config import InformaticaRunConfig + + +def test_yaml_config(test_root_dir): + config = InformaticaRunConfig.from_yaml_file( + f"{test_root_dir}/informatica/config.yml" + ) + + assert config == InformaticaRunConfig( + base_url="https://test", + user="user", + password="password", + output=OutputConfig(), + ) diff --git a/tests/informatica/test_extractor.py b/tests/informatica/test_extractor.py new file mode 100644 index 00000000..e9866fb2 --- /dev/null +++ b/tests/informatica/test_extractor.py @@ -0,0 +1,93 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from metaphor.common.base_config import OutputConfig +from metaphor.common.event_util import EventUtil +from metaphor.informatica.config import InformaticaRunConfig +from metaphor.informatica.extractor import InformaticaExtractor +from tests.test_utils import load_json + + +class MockResponse: + def __init__(self, json_data, status_code=200): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + + def raise_for_status(self): + return + + +@patch("requests.get") +@patch("requests.post") +@pytest.mark.asyncio +async def test_extractor( + mock_post_method: MagicMock, mock_get_method: MagicMock, test_root_dir: str +): + mock_post_method.side_effect = [ + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/post_saas__public__core__v3__login_03b80555.json" + ) + ), + ] + + mock_get_method.side_effect = [ + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__api__v2__connection___da2aeee5.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__api__v2__connection__01CY6H0B000000000002_550571c1.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__public__core__v3__objects_8d406d40.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__public__core__v3__objects_b96dd6a7.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__public__core__v3__objects_b34053ea.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__public__core__v3__objects__71e4gvVQh8lclejElAZ2qZ__references_d6d6c585.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__public__core__v3__objects__8WzoMOwPAofia8FrbiDU2e__references_6bb91724.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__public__core__v3__objects__gdm2ALESzgxjhPkzOGvns7__references_56133b21.json" + ) + ), + MockResponse( + load_json( + f"{test_root_dir}/informatica/responses/get_saas__api__v2__mapping__01CY6H1700000000000W_e499f963.json" + ) + ), + ] + + config = InformaticaRunConfig( + output=OutputConfig(), base_url="", user="", password="" + ) + extractor = InformaticaExtractor(config) + + events = [EventUtil.trim_event(e) for e in await extractor.extract()] + + assert events == load_json(f"{test_root_dir}/informatica/expected.json") diff --git a/tests/informatica/test_utils.py b/tests/informatica/test_utils.py new file mode 100644 index 00000000..7f5321c7 --- /dev/null +++ b/tests/informatica/test_utils.py @@ -0,0 +1,63 @@ +from metaphor.informatica.models import ConnectionDetail, ConnectorParams +from metaphor.informatica.utils import init_dataset_logical_id, parse_error +from metaphor.models.metadata_change_event import DataPlatform, DatasetLogicalID + + +def test_parse_error(): + assert parse_error('{"code": 1}') == {"code": 1} + assert parse_error("[}]") is None + + +def test_init_dataset_logical_id(): + assert init_dataset_logical_id( + "DB/SCHEMA/TABLE", + ConnectionDetail( + id="id", + type="TOOLKIT_CCI", + connectorGuid="com.infa.adapter.snowflake", + connParams=ConnectorParams(account="account"), + ), + ) == DatasetLogicalID( + name="db.schema.table", + platform=DataPlatform.SNOWFLAKE, + account="account", + ) + + assert ( + init_dataset_logical_id( + "DB.SCHEMA.TABLE", + ConnectionDetail( + id="id", + type="TOOLKIT_CCI", + connectorGuid="com.infa.adapter.snowflake", + connParams=ConnectorParams(account="account"), + ), + ) + is None + ) + + assert ( + init_dataset_logical_id( + "DB/SCHEMA/TABLE", + ConnectionDetail( + id="id", + type="MYSQL", + connectorGuid="com.infa.adapter.snowflake", + connParams=ConnectorParams(account="account"), + ), + ) + is None + ) + + assert ( + init_dataset_logical_id( + "DB/SCHEMA/TABLE", + ConnectionDetail( + id="id", + type="TOOLKIT_CCI", + connectorGuid="com.infa.adapter.bigQuery", + connParams=ConnectorParams(account="account"), + ), + ) + is None + ) diff --git a/tests/kafka/__init__.py b/tests/kafka/__init__.py new file mode 100644 index 00000000..e69de29b