Skip to content

Commit

Permalink
feat(ingest): add preset source (datahub-project#10954)
Browse files Browse the repository at this point in the history
Co-authored-by: MARK CHENG <[email protected]>
Co-authored-by: hwmarkcheng <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent 0414443 commit f147b51
Show file tree
Hide file tree
Showing 7 changed files with 1,068 additions and 10 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"preset = datahub.ingestion.source.preset:PresetSource",
"tableau = datahub.ingestion.source.tableau.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
Expand Down
114 changes: 114 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
from typing import Dict, Optional

import requests
from pydantic.class_validators import root_validator, validator
from pydantic.fields import Field

from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource
from datahub.utilities import config_clean

logger = logging.getLogger(__name__)


class PresetConfig(SupersetConfig):
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
connect_uri: str = Field(default="", description="Preset workspace URL.")
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")

# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Preset Stateful Ingestion Config."
)

options: Dict = Field(default={}, description="")
env: str = Field(
default=DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs",
)
database_alias: Dict[str, str] = Field(
default={},
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)

@root_validator
def default_display_uri_to_connect_uri(cls, values):
base = values.get("display_uri")
if base is None:
values["display_uri"] = values.get("connect_uri")
return values


@platform_name("Preset")
@config_class(PresetConfig)
@support_status(SupportStatus.TESTING)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class PresetSource(SupersetSource):
"""
Variation of the Superset plugin that works with Preset.io (Apache Superset SaaS).
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()

def login(self):
try:
login_response = requests.post(
f"{self.config.manager_uri}/v1/auth/",
json={"name": self.config.api_key, "secret": self.config.api_secret},
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to authenticate with Preset: {e}")
raise e

self.access_token = login_response.json()["payload"]["access_token"]
logger.debug("Got access token from Preset")

requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)
# Test the connection
test_response = requests_session.get(f"{self.config.connect_uri}/version")
if not test_response.ok:
logger.error("Unable to connect to workspace")
return requests_session
28 changes: 18 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class SupersetConfig(
)
username: Optional[str] = Field(default=None, description="Superset username.")
password: Optional[str] = Field(default=None, description="Superset password.")

api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Superset Stateful Ingestion Config."
Expand Down Expand Up @@ -179,7 +183,14 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)
self.session = self.login()

def login(self) -> requests.Session:
login_response = requests.post(
f"{self.config.connect_uri}/api/v1/security/login",
json={
Expand All @@ -193,26 +204,23 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
self.access_token = login_response.json()["access_token"]
logger.debug("Got access token from superset")

self.session = requests.Session()
self.session.headers.update(
requests_session = requests.Session()
requests_session.headers.update(
{
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)

# Test the connection
test_response = self.session.get(f"{self.config.connect_uri}/api/v1/dashboard/")
test_response = requests_session.get(
f"{self.config.connect_uri}/api/v1/dashboard/"
)
if test_response.status_code == 200:
pass
# TODO(Gabe): how should we message about this error?
return requests_session

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
Expand Down
Loading

0 comments on commit f147b51

Please sign in to comment.