Skip to content

Commit

Permalink
Merge branch 'main' into marslan/sc-29346/use-system-information-sche…
Browse files Browse the repository at this point in the history
…ma-in-unity-catalog
  • Loading branch information
mars-lan committed Oct 28, 2024
2 parents d325bf7 + db18615 commit 7ab9f36
Show file tree
Hide file tree
Showing 23 changed files with 4,555 additions and 120 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ Each connector is placed under its own directory under [metaphor](./metaphor) an
| [monte_carlo](metaphor/monte_carlo/) | Data monitor |
| [mssql](metaphor/mssql/) | Schema |
| [mysql](metaphor/mysql/) | Schema, description |
| [openapi](metaphor/openapi/) | API, description |
| [oracle](metaphor/oracle/) | Schema, description, queries |
| [notion](metaphor/notion/) | Document embeddings |
| [postgresql](metaphor/postgresql/) | Schema, description, statistics |
| [postgresql.profile](metaphor/postgresql/profile/) | Data profile |
| [postgresql.usage](metaphor/postgresql/usage/) | Usage |
| [power_bi](metaphor/power_bi/) | Dashboard, lineage |
| [quick_sight](metaphor/quick_sight/) | Dashboard, lineage |
| [redshift](metaphor/redshift/) | Schema, description, statistics, queries |
Expand Down
8 changes: 6 additions & 2 deletions metaphor/common/event_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from metaphor import models # type: ignore
from metaphor.models.metadata_change_event import (
API,
Dashboard,
Dataset,
ExternalSearchDocument,
Expand All @@ -26,6 +27,7 @@
logger.setLevel(logging.INFO)

ENTITY_TYPES = Union[
API,
Dashboard,
Dataset,
ExternalSearchDocument,
Expand Down Expand Up @@ -57,9 +59,11 @@ def _build_event(**kwargs) -> MetadataChangeEvent:
return MetadataChangeEvent(**kwargs)

@staticmethod
def build_event(entity: ENTITY_TYPES):
def build_event(entity: ENTITY_TYPES): # noqa: C901
"""Build MCE given an entity"""
if type(entity) is Dashboard:
if type(entity) is API:
return EventUtil._build_event(api=entity)
elif type(entity) is Dashboard:
return EventUtil._build_event(dashboard=entity)
elif type(entity) is Dataset:
return EventUtil._build_event(dataset=entity)
Expand Down
54 changes: 54 additions & 0 deletions metaphor/openapi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# OpenAPI Connector

This connector extracts APIs from an OpenAPI Specification JSON. The following OAS versions are supported:

- OpenAPI Specification 3.1.0
- OpenAPI Specification 3.0.0
- Swagger 2.0

## Config File

Create a YAML config file based on the following template.

### Required Configurations

Configure the connector by either

```yaml
base_url: <url> # BaseUrl for endpoints in OAS
openapi_json_path: <path> # path to OAS JSON file
```
or
```yaml
base_url: <url> # BaseUrl for endpoints in OAS
openapi_json_url: <url> # URL of OAS
```
### Optional Configurations
If accessing the OAS JSON requires authentication, please include an optional auth configuration.
```yaml
auth:
basic_auth:
user: <user>
password: <password>
```
#### Output Destination
See [Output Config](../common/docs/output.md) for more information on the optional `output` config.

## Testing

Follow the [installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include the `openapi` or `all` extra.

Run the following command to test the connector locally:

```shell
metaphor openapi <config_file>
```

Manually verify the output after the run finishes.
6 changes: 6 additions & 0 deletions metaphor/openapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from metaphor.common.cli import cli_main
from metaphor.openapi.extractor import OpenAPIExtractor


def main(config_file: str):
cli_main(OpenAPIExtractor, config_file)
32 changes: 32 additions & 0 deletions metaphor/openapi/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Optional

from pydantic import FilePath, HttpUrl, model_validator
from pydantic.dataclasses import dataclass

from metaphor.common.base_config import BaseConfig
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.utils import must_set_exactly_one


@dataclass(config=ConnectorConfig)
class BasicAuth:
user: str
password: str


@dataclass(config=ConnectorConfig)
class OpenAPIAuthConfig:
basic_auth: Optional[BasicAuth] = None


@dataclass(config=ConnectorConfig)
class OpenAPIRunConfig(BaseConfig):
base_url: HttpUrl
openapi_json_path: Optional[FilePath] = None
openapi_json_url: Optional[HttpUrl] = None
auth: Optional[OpenAPIAuthConfig] = None

@model_validator(mode="after")
def have_path_or_url(self) -> "OpenAPIRunConfig":
must_set_exactly_one(self.__dict__, ["openapi_json_path", "openapi_json_url"])
return self
194 changes: 194 additions & 0 deletions metaphor/openapi/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import json
from collections import OrderedDict
from typing import Collection, List, Optional
from urllib.parse import urljoin

import requests

from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.common.utils import md5_digest
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
API,
APILogicalID,
APIPlatform,
AssetPlatform,
AssetStructure,
Hierarchy,
HierarchyInfo,
HierarchyLogicalID,
HierarchyType,
OpenAPI,
OpenAPIMethod,
OpenAPISpecification,
OperationType,
)
from metaphor.openapi.config import OpenAPIRunConfig

logger = get_logger()


class OpenAPIExtractor(BaseExtractor):
"""OpenAPI metadata extractor"""

_description = "OpenAPI metadata crawler"
_platform = Platform.OPEN_API

@staticmethod
def from_config_file(config_file: str) -> "OpenAPIExtractor":
return OpenAPIExtractor(OpenAPIRunConfig.from_yaml_file(config_file))

def __init__(self, config: OpenAPIRunConfig):
super().__init__(config)

self._base_url = str(config.base_url)
self._api_id = md5_digest(self._base_url.encode("utf-8"))
self._openapi_json_path = config.openapi_json_path
self._openapi_json_url = str(config.openapi_json_url)
self._auth = config.auth
self._init_session()

async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info(
f"Fetching metadata from {self._openapi_json_path or self._openapi_json_url}"
)

openapi_json = self._get_openapi_json()

if not openapi_json:
logger.error("Unable to get OAS json")
return []

apis = self._extract_apis(openapi_json)
hierarchies = self._extract_hierarchies(openapi_json)

return hierarchies + apis

def _init_session(self):
self._requests_session = requests.sessions.Session()

if not self._auth:
return

if self._auth.basic_auth:
basic_auth = self._auth.basic_auth
self._requests_session.auth = (basic_auth.user, basic_auth.password)

def _get_openapi_json(self) -> Optional[dict]:
if self._openapi_json_path:
with open(self._openapi_json_path, "r") as f:
return json.load(f)

# to have full control of HTTP header
headers = OrderedDict(
{
"User-Agent": None,
"Accept": "application/json",
"Connection": None,
"Accept-Encoding": None,
}
)
resp = self._requests_session.get(self._openapi_json_url, headers=headers)

if resp.status_code != 200:
return None

return resp.json()

def _extract_apis(self, openapi: dict) -> List[API]:
apis: List[API] = []
servers = openapi.get("servers")

for path, path_item in openapi["paths"].items():
path_servers = path_item.get("servers")
server = (
path_servers[0]["url"]
if path_servers
else servers[0]["url"] if servers else ""
)

if not server.startswith("http"):
endpoint_url = urljoin(self._base_url, server + path)
else:
endpoint_url = urljoin(server + "/", f"./{path}")

first_tag = self._get_first_tag(path_item)

endpoint = API(
logical_id=APILogicalID(
name=endpoint_url, platform=APIPlatform.OPEN_API
),
open_api=OpenAPI(path=path, methods=self._extract_methods(path_item)),
structure=AssetStructure(
directories=[self._api_id] + [first_tag] if first_tag else [],
name=path,
),
)
apis.append(endpoint)
return apis

def _get_first_tag(self, path_item: dict) -> Optional[str]:
for item in path_item.values():
if "tags" in item and len(item["tags"]) > 0:
return item["tags"][0]
return None

def _extract_methods(self, path_item: dict) -> List[OpenAPIMethod]:
def to_operation_type(method: str) -> Optional[OperationType]:
try:
return OperationType(method.upper())
except ValueError:
return None

methods: List[OpenAPIMethod] = []
for method, item in path_item.items():
if operation_type := to_operation_type(method):
methods.append(
OpenAPIMethod(
summary=item.get("summary") or None,
description=item.get("description") or None,
type=operation_type,
)
)
return methods

def _extract_hierarchies(self, openapi: dict) -> List[Hierarchy]:
hierarchies: List[Hierarchy] = []

title = openapi["info"]["title"]
hierarchies.append(
Hierarchy(
logical_id=HierarchyLogicalID(
path=[AssetPlatform.OPEN_API.value, self._api_id],
),
hierarchy_info=HierarchyInfo(
name=title,
open_api=OpenAPISpecification(definition=json.dumps(openapi)),
type=HierarchyType.OPEN_API,
),
)
)

for tag in openapi.get("tags") or []:
name = tag.get("name")
description = tag.get("description")

if not name:
continue

hierarchies.append(
Hierarchy(
logical_id=HierarchyLogicalID(
path=[AssetPlatform.OPEN_API.value, self._api_id, name],
),
hierarchy_info=HierarchyInfo(
name=name,
description=description,
type=HierarchyType.OPEN_API,
),
)
)

return hierarchies
Loading

0 comments on commit 7ab9f36

Please sign in to comment.