-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(integration/fivetran): Fivetran connector integration #9018
feat(integration/fivetran): Fivetran connector integration #9018
Conversation
…into fivetran-connector-integration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to suggest a few changes to code structure -
- Separate current
data_classes.py
intodata_classes.py
(classesConnector
,Job
) andfivetran_log_api.py
(classFivetranLogDataDictionary
or rename asFivetranPlatformConnectorApi
) - Classes in
fivetran_log_api
act as interface for interacting with fivetran log api and return models defined indata_classes.py
. I prefer to have separate public methods for each data model extraction rather than nested calls to fivetran log apis but don't feel strongly here. - All orchestration logic required for metadata events generation is handled by
FivetranSource
. - e.g. pattern-based filtering. It would help not to pass entireFivetranSourceConfig
tofivetran_api
but only what's required to invoke the api. That usually helps in separation of responsibilities. - If the logic in
FivetranSource
is too complex or long then we can introduce extractor layer in the middle that shares the orchestration responsibility of a particular feature.
metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py
Outdated
Show resolved
Hide resolved
|
||
class FivetranLogConfig(ConfigModel): | ||
destination_platform: str = pydantic.Field( | ||
default="snowflake", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding,
- Is snowflake the only supported destination ?
- Is destination platform related to connector's destination ?
- What would change in to DestinationConfig class when we want to support more destination platforms ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- No, there are other supported destinations but for now we are just starting with snowflake destination.
- No, not all connector's destination. This destination platform only relates to fivetran log connector destination from where we are fetcing all metadata.
- In future if we want to support more destination platform, we will rename DestinationConfig to SnowflakeDestinationConfig class. And there will be seperate class from new destination platform as two different destination platform requires different configurations to create connection.
} | ||
for sync_id in sync_start_logs.keys(): | ||
if sync_end_logs.get(sync_id) is None: | ||
# If no sync-end event log for this sync id that means sync is still in progress |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about ingesting in progress jobs as DataProcessInstance with status STARTED ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are not ingesting in progress jobs in other sources like Airflow, I didn't ingested here as well.
metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py
Outdated
Show resolved
Hide resolved
metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial thoughts, but overall looks good
I haven't actually tested this locally yet though
def _get_log_destination_engine(self) -> Any: | ||
destination_platform = self.fivetran_log_config.destination_platform | ||
engine = None | ||
if destination_platform == "snowflake": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as long as the config type has a get_sql_alchemy_url and get_options method, we should be able to call create_engine() right? what's the reason we need to do this per destination platform?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, As creation of sqlalchemy engine using sqlalchemy URL to execute the queries is the common thing for other destination as well. Hence, surely we will need one common method that is get_sql_alchemy_url.
We can remove get_options method as its not required compulsary.
@staticmethod | ||
def get_connectors_query() -> str: | ||
return """ | ||
SELECT connector_id as "CONNECTOR_ID", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these queries work multiple underlying databases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't checked that but syntactically it should work as most of the destinations use SQL query language.
If there is some syntax which is not common, we can modify logic here in future to get different query for different destination.
not isinstance(wu.metadata, MetadataChangeEventClass) | ||
and wu.metadata.entityType in skip_entities | ||
): | ||
# If any entity does not support aspect 'status' then skip that entity from adding status aspect. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a map of exactly what entity types support what aspects - can we look this information up there instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can provide some pointers on how to do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls provide some pointers on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the helper method from here #9120
@@ -0,0 +1 @@ | |||
from datahub.ingestion.source.fivetran.fivetran import FivetranSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this probably isn't necessary - make init empty, and make setup.py point directly at the fivetran source class instead of re-exporting it here
|
||
Source and destination are mapped to Dataset as an Input and Output of Connector. | ||
|
||
## Snowflake destination Configuration Guide |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explicitly call out that this only works with snowflake for now
metadata-ingestion/setup.py
Outdated
@@ -389,6 +389,7 @@ | |||
"powerbi-report-server": powerbi_report_server, | |||
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"}, | |||
"unity-catalog": databricks | sqllineage_lib, | |||
"fivetran": {"requests"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't really accurate - fivetran requires snowflake, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to add fivetran to datahub/metadata-service/war/src/main/resources/boot/data_platforms.json (plus a fivetran logo/icon)
For example, a similar change was made here https://github.com/datahub-project/datahub/pull/7971/files
|
||
import pydantic | ||
from pydantic import Field | ||
from pydantic.class_validators import root_validator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from pydantic.class_validators import root_validator | |
from pydantic import root_validator |
continue | ||
|
||
message_data = json.loads(sync_end_logs[sync_id][Constant.MESSAGE_DATA]) | ||
if type(message_data) is str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use isinstance
, not type(...) is ...
user_name=self._get_user_name( | ||
connector[Constant.CONNECTING_USER_ID] | ||
), | ||
table_lineage=self._get_table_lineage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they said column lineage is live a few weeks ago - how easy is it to capture that too?
) | ||
) | ||
output_dataset_urn_list.append( | ||
DatasetUrn.create_from_ids( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this produces urns of this form:
urn:li:dataset:(urn:li:dataPlatform:snowflake,[<optional_platform_instance.]<schema>.<table>,PROD)
However, that's not correct - snowflake urns (and in fact, urns for all "three-tier" sources) should be urn:li:dataset:(urn:li:dataPlatform:snowflake,[<optional_platform_instance.]<database>.<schema>.<table>,PROD)
): | ||
yield mcp.as_workunit() | ||
|
||
def _get_connector_workunit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this plural
def _get_connector_workunit( | |
def _get_connector_workunits( |
…into fivetran-connector-integration
Checklist