-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): support view lineage for all sqlalchemy sources #9039
feat(ingest): support view lineage for all sqlalchemy sources #9039
Conversation
Additional Changes: 1. Support incremental lineage for all sqlalchemy sources 2. Keep column level lineage enabled and incremental lineage disabled by default 3. Monkey-patch hive dialect to extract hive view definitions to extract lineage 4. Fix incremental_lineage_helper for empty upstreams Pending Followup Changes: 1. Support postgres-like partial view definitions
dd85c59
to
75f63a2
Compare
@@ -283,7 +283,7 @@ class VersionedConfig(ConfigModel): | |||
|
|||
class LineageConfig(ConfigModel): | |||
incremental_lineage: bool = Field( | |||
default=True, | |||
default=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incremental lineage requires presence of DataHubGraph, which is available by default only when using DataHub rest sink. We plan to keep this default enabled in managed ingestion.
graph, urn, lineage_aspect, wu.metadata.systemMetadata | ||
) | ||
elif lineage_aspect.upstreams: | ||
yield _convert_upstream_lineage_to_patch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a table level upstream only aspect with empty upstreams, we ignore it, as part of incremental lineage.
return self._create_upstream_lineage_workunit( | ||
dataset_identifier, upstreams, fine_upstreams | ||
) | ||
if upstreams: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not emit upstream lineage if no upstreams are found.
@@ -90,19 +96,39 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw): | |||
logger.warning(f"Failed to patch method due to {e}") | |||
|
|||
|
|||
try: | |||
from pyhive.sqlalchemy_hive import HiveDialect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively we can also move this code to acryl pyhive fork - https://github.com/acryldata/PyHive
This seemed simpler and easier to test this end to end. Open to suggestions here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems fine for now, and we can fix up when we refactor sql common next week
metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Outdated
Show resolved
Hide resolved
self._view_definition_cache = FileBackedDict[str]() | ||
else: | ||
self._view_definition_cache = {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved use_file_backed_cache
related logic from teraform source here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we fixed the file backed dict to support windows, imo we can probably drop this config flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point me to the PR that fixes this ?
- primarily to reduce adverse effect on other sources, such as dbt which have their own flavour of incremental lineage implementation
HiveDialect.get_view_definition = get_view_definition_patched | ||
except ModuleNotFoundError: | ||
pass | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failure to patch should cause the source to fail to load right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Let me remove this exception handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Outdated
Show resolved
Hide resolved
self._view_definition_cache = FileBackedDict[str]() | ||
else: | ||
self._view_definition_cache = {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we fixed the file backed dict to support windows, imo we can probably drop this config flag
@@ -71,6 +71,10 @@ def __init__(self, config, ctx, platform): | |||
super().__init__(config, ctx, platform) | |||
self.config: TwoTierSQLAlchemyConfig = config | |||
|
|||
def get_db_schema(self, dataset_identifier: str) -> Tuple[Optional[str], str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we do the refactoring, let's put identifiers in a real data class instead of joining into a string in one place and then splitting in other places
There's one remaining thing around handling partial view definitions |
@mayurinehate Seeing a real issue in the smoke test - looks like sql_common needs a dependency on sqlparse
|
fallback to native postgres view lineage extraction for failed views
This is now added in this PR.
Fixed it. |
The cypress test failure appears unrelated ( |
Additional Changes:
Checklist