Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration/fivetran): Fivetran connector integration #9018

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
abe52ae
Fivetran source connector initial code added
shubhamjagtap639 Oct 4, 2023
b04ff8c
Code to generate connector workunit added
shubhamjagtap639 Oct 4, 2023
3f392f4
Code to execute query through Sqlalchemy engine added
shubhamjagtap639 Oct 5, 2023
0c295e2
test cases for fivetran source added
shubhamjagtap639 Oct 10, 2023
b6897d1
fivetran source added in base dev requirement
shubhamjagtap639 Oct 10, 2023
7dddda3
Config description modified
shubhamjagtap639 Oct 10, 2023
81cf867
SnowflakeDestinationConfig class renamed to DestinationConfig
shubhamjagtap639 Oct 16, 2023
389c16f
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Oct 16, 2023
561d2b5
Test case golden file updated
shubhamjagtap639 Oct 16, 2023
d1a13f0
Code changes as per review comment
shubhamjagtap639 Oct 23, 2023
19234ee
Fivetran doc file added
shubhamjagtap639 Oct 25, 2023
2bba626
yml file modified
shubhamjagtap639 Oct 26, 2023
2e045d2
Merge branch 'master' into fivetran-connector-integration
shubhamjagtap639 Oct 26, 2023
f17aaf2
clarify example recipe
hsheth2 Oct 30, 2023
af11bcc
fix status aspects
hsheth2 Oct 30, 2023
5c2afd9
Merge branch 'master' of https://github.com/shubhamjagtap639/datahub …
shubhamjagtap639 Oct 31, 2023
1958c3e
Fivetran source integration changes
shubhamjagtap639 Oct 31, 2023
f1fdb04
Code changes as per review comments
shubhamjagtap639 Nov 1, 2023
c92d4fc
destination dataset urn corrected
shubhamjagtap639 Nov 1, 2023
42dd9ef
lint error fixed
shubhamjagtap639 Nov 1, 2023
5e0f3f5
source to database config added
shubhamjagtap639 Nov 2, 2023
b8883ff
always link to upstreams
hsheth2 Nov 3, 2023
425a74b
Merge branch 'master' into fivetran-connector-integration
hsheth2 Nov 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8"},
"unity-catalog": databricks | sqllineage_lib,
"fivetran": {"requests"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't really accurate - fivetran requires snowflake, right?

}

# This is mainly used to exclude plugins from the Docker image.
Expand Down Expand Up @@ -512,6 +513,7 @@
"nifi",
"vertica",
"mode",
"fivetran",
"kafka-connect",
]
if plugin
Expand Down Expand Up @@ -616,6 +618,7 @@
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
"fivetran = datahub.ingestion.source.fivetran:FivetranSource",
],
"datahub.ingestion.transformer.plugins": [
"simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership",
Expand Down
13 changes: 12 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def auto_status_aspect(
For all entities that don't have a status aspect, add one with removed set to false.
"""

skip_entities: Set[str] = {"dataProcessInstance"}
all_urns: Set[str] = set()
status_urns: Set[str] = set()
skip_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)
Expand All @@ -89,9 +91,18 @@ def auto_status_aspect(
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

if (
not isinstance(wu.metadata, MetadataChangeEventClass)
and wu.metadata.entityType in skip_entities
):
# If any entity does not support aspect 'status' then skip that entity from adding status aspect.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a map of exactly what entity types support what aspects - can we look this information up there instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide some pointers on how to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls provide some pointers on this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the helper method from here #9120

# Example like dataProcessInstance doesn't suppport status aspect.
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance
skip_urns.add(urn)

yield wu

for urn in sorted(all_urns - status_urns):
for urn in sorted(all_urns - status_urns - skip_urns):
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.ingestion.source.fivetran.fivetran import FivetranSource
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably isn't necessary - make init empty, and make setup.py point directly at the fivetran source class instead of re-exporting it here

135 changes: 135 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add fivetran to datahub/metadata-service/war/src/main/resources/boot/data_platforms.json (plus a fivetran logo/icon)

For example, a similar change was made here https://github.com/datahub-project/datahub/pull/7971/files

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
from dataclasses import dataclass, field as dataclass_field
from typing import Dict, List, Optional

import pydantic
from pydantic import Field
from pydantic.class_validators import root_validator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from pydantic.class_validators import root_validator
from pydantic import root_validator


from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigMixin
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig

logger = logging.getLogger(__name__)


class Constant:
"""
keys used in fivetran plugin
"""

ORCHESTRATOR = "fivetran"
# table column name
SOURCE_SCHEMA_NAME = "source_schema_name"
SOURCE_TABLE_NAME = "source_table_name"
DESTINATION_SCHEMA_NAME = "destination_schema_name"
DESTINATION_TABLE_NAME = "destination_table_name"
SYNC_ID = "sync_id"
MESSAGE_DATA = "message_data"
TIME_STAMP = "time_stamp"
STATUS = "status"
USER_ID = "user_id"
GIVEN_NAME = "given_name"
FAMILY_NAME = "family_name"
CONNECTOR_ID = "connector_id"
CONNECTOR_NAME = "connector_name"
CONNECTOR_TYPE_ID = "connector_type_id"
PAUSED = "paused"
SYNC_FREQUENCY = "sync_frequency"
DESTINATION_ID = "destination_id"
CONNECTING_USER_ID = "connecting_user_id"
# Job status constants
SUCCESSFUL = "SUCCESSFUL"
FAILURE_WITH_TASK = "FAILURE_WITH_TASK"
CANCELED = "CANCELED"


SUPPORTED_DATA_PLATFORM_MAPPING = {
"postgres": "postgres",
"snowflake": "snowflake",
"mysql": "mysql",
}


class DestinationConfig(BaseSnowflakeConfig):
database: str = Field(description="The fivetran connector log database.")
log_schema: str = Field(description="The fivetran connector log schema.")


class FivetranLogConfig(ConfigModel):
destination_platform: str = pydantic.Field(
default="snowflake",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding,

  1. Is snowflake the only supported destination ?
  2. Is destination platform related to connector's destination ?
  3. What would change in to DestinationConfig class when we want to support more destination platforms ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. No, there are other supported destinations but for now we are just starting with snowflake destination.
  2. No, not all connector's destination. This destination platform only relates to fivetran log connector destination from where we are fetcing all metadata.
  3. In future if we want to support more destination platform, we will rename DestinationConfig to SnowflakeDestinationConfig class. And there will be seperate class from new destination platform as two different destination platform requires different configurations to create connection.

description="The destination platform where fivetran connector log tables are dumped.",
)
destination_config: Optional[DestinationConfig] = pydantic.Field(
default=None,
description="If destination platform is 'snowflake', provide snowflake configuration.",
)

@root_validator(pre=True)
def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
destination_platform = values["destination_platform"]
if destination_platform == "snowflake":
if "destination_config" not in values:
raise ValueError(
"If destination platform is 'snowflake', user must provide snowflake destination configuration in the recipe."
)
else:
raise ValueError(
f"Destination platform '{destination_platform}' is not yet supported."
)
return values


@dataclass
class FivetranSourceReport(StaleEntityRemovalSourceReport):
connectors_scanned: int = 0
filtered_connectors: List[str] = dataclass_field(default_factory=list)

def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count

def report_connectors_dropped(self, model: str) -> None:
self.filtered_connectors.append(model)


class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
)
env: str = pydantic.Field(
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)


class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
fivetran_log_config: FivetranLogConfig = pydantic.Field(
description="Fivetran log connector destination server configurations.",
)
connector_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for connectors to filter in ingestion.",
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Airbyte Stateful Ingestion Config."
)
# Fivetran connector all sources to platform instance mapping
sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of the connector's all sources dataset to platform instance. Use connector id as key.",
)
# Fivetran destination to platform instance mapping
destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of destination dataset to platform instance. Use destination id as key.",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple

from sqlalchemy import create_engine

from datahub.ingestion.source.fivetran.config import (
Constant,
FivetranSourceConfig,
FivetranSourceReport,
)
from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery

logger: logging.Logger = logging.getLogger(__name__)


@dataclass
class Connector:
connector_id: str
connector_name: str
connector_type: str
paused: bool
sync_frequency: int
destination_id: str
user_name: str
source_tables: List[str]
destination_tables: List[str]
jobs: List["Job"]


@dataclass
class Job:
job_id: str
start_time: int
end_time: int
status: str


class FivetranLogDataDictionary:
def __init__(
self, config: FivetranSourceConfig, report: FivetranSourceReport
) -> None:
self.logger = logger
self.config = config
self.report = report
self.engine = self._get_log_destination_engine()

def _get_log_destination_engine(self) -> Any:
destination_platform = self.config.fivetran_log_config.destination_platform
engine = None
if destination_platform == "snowflake":
snowflake_destination_config = (
self.config.fivetran_log_config.destination_config
)
if snowflake_destination_config is not None:
engine = create_engine(
snowflake_destination_config.get_sql_alchemy_url(),
**snowflake_destination_config.get_options(),
)
engine.execute(
FivetranLogQuery.use_schema(
snowflake_destination_config.database,
snowflake_destination_config.log_schema,
)
)
return engine

def _query(self, query: str) -> List[Dict]:
logger.debug("Query : {}".format(query))
resp = self.engine.execute(query)
return [row for row in resp]

def _get_table_lineage(self, connector_id: str) -> Tuple[List[str], List[str]]:
table_lineage = self._query(
FivetranLogQuery.get_table_lineage_query(connector_id=connector_id)
)
source_tables: List[str] = []
destination_tables: List[str] = []
for each in table_lineage:
source_tables.append(
shubhamjagtap639 marked this conversation as resolved.
Show resolved Hide resolved
f"{each[Constant.SOURCE_SCHEMA_NAME]}.{each[Constant.SOURCE_TABLE_NAME]}"
)
destination_tables.append(
f"{each[Constant.DESTINATION_SCHEMA_NAME]}.{each[Constant.DESTINATION_TABLE_NAME]}"
)
return source_tables, destination_tables

def _get_jobs_list(self, connector_id: str) -> List[Job]:
jobs: List[Job] = []
sync_start_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
FivetranLogQuery.get_sync_start_logs_query(connector_id=connector_id)
)
}
sync_end_logs = {
row[Constant.SYNC_ID]: row
for row in self._query(
FivetranLogQuery.get_sync_end_logs_query(connector_id=connector_id)
)
}
for sync_id in sync_start_logs.keys():
if sync_end_logs.get(sync_id) is None:
# If no sync-end event log for this sync id that means sync is still in progress
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ingesting in progress jobs as DataProcessInstance with status STARTED ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are not ingesting in progress jobs in other sources like Airflow, I didn't ingested here as well.

continue

message_data = json.loads(sync_end_logs[sync_id][Constant.MESSAGE_DATA])
if type(message_data) is str:
# Sometimes message_data contains json string inside string
# Ex: '"{\"status\":\"SUCCESSFUL\"}"'
# Hence, need to do json loads twice.
message_data = json.loads(message_data)

jobs.append(
Job(
job_id=sync_id,
start_time=round(
sync_start_logs[sync_id][Constant.TIME_STAMP].timestamp()
),
end_time=round(
sync_end_logs[sync_id][Constant.TIME_STAMP].timestamp()
),
status=message_data[Constant.STATUS],
)
)
return jobs

def _get_user_name(self, user_id: str) -> str:
user_details = self._query(FivetranLogQuery.get_user_query(user_id=user_id))[0]
return (
f"{user_details[Constant.GIVEN_NAME]} {user_details[Constant.FAMILY_NAME]}"
)

def get_connectors_list(self) -> List[Connector]:
connectors: List[Connector] = []
connector_list = self._query(FivetranLogQuery.get_connectors_query())
for connector in connector_list:
if not self.config.connector_patterns.allowed(
connector[Constant.CONNECTOR_NAME]
):
self.report.report_connectors_dropped(connector[Constant.CONNECTOR_ID])
continue

source_tables, destination_tables = self._get_table_lineage(
connector[Constant.CONNECTOR_ID]
)

connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_name=connector[Constant.CONNECTOR_NAME],
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
sync_frequency=connector[Constant.SYNC_FREQUENCY],
destination_id=connector[Constant.DESTINATION_ID],
user_name=self._get_user_name(
connector[Constant.CONNECTING_USER_ID]
),
source_tables=source_tables,
destination_tables=destination_tables,
jobs=self._get_jobs_list(connector[Constant.CONNECTOR_ID]),
)
)
return connectors
Loading
Loading