Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/mysql): support stored procedures #12356

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 111 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import logging
from typing import Dict, Iterable, List

from pydantic.fields import Field
from sqlalchemy.engine import Inspector
from sqlalchemy.engine.base import Connection

from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand All @@ -6,11 +13,24 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource
from datahub.ingestion.source.sql.mysql.job_models import (
MySQLDataFlow,
MySQLProcedureContainer,
MySQLStoredProcedure,
)
from datahub.ingestion.source.sql.mysql.source import MySQLConnectionConfig

logger: logging.Logger = logging.getLogger(__name__)


class MariaDBConnectionConfig(MySQLConnectionConfig):
host_port: str = Field(default="localhost:3306", description="MariaDB host URL.")


@platform_name("MariaDB")
@config_class(MySQLConfig)
@config_class(MariaDBConnectionConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
Expand All @@ -19,3 +39,93 @@
class MariaDBSource(MySQLSource):
def get_platform(self):
return "mariadb"

def _get_stored_procedures(
self,
conn: Connection,
db_name: str,
schema: str,
) -> List[Dict[str, str]]:
stored_procedures_data = conn.execute(

Check warning on line 49 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L49

Added line #L49 was not covered by tests
f"""
SELECT
ROUTINE_SCHEMA,
ROUTINE_NAME,
ROUTINE_DEFINITION,
ROUTINE_COMMENT,
CREATED,
LAST_ALTERED,
SQL_DATA_ACCESS,
SECURITY_TYPE,
DEFINER
FROM information_schema.ROUTINES
WHERE ROUTINE_TYPE = 'PROCEDURE'
AND ROUTINE_SCHEMA = '{schema}'
"""
)

procedures_list = []
for row in stored_procedures_data:

Check warning on line 68 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L67-L68

Added lines #L67 - L68 were not covered by tests
# For MariaDB, always try to get the procedure definition using SHOW CREATE
# as it's more reliable for large procedures
try:
create_proc = conn.execute(

Check warning on line 72 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L71-L72

Added lines #L71 - L72 were not covered by tests
f"SHOW CREATE PROCEDURE `{schema}`.`{row['ROUTINE_NAME']}`"
).fetchone()
code = (

Check warning on line 75 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L75

Added line #L75 was not covered by tests
create_proc[2] if create_proc else row["ROUTINE_DEFINITION"]
) # MariaDB returns (Procedure, body, something)
except Exception as e:
logger.warning(

Check warning on line 79 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L78-L79

Added lines #L78 - L79 were not covered by tests
f"Failed to get procedure definition for {schema}.{row['ROUTINE_NAME']}: {e}"
)
code = row["ROUTINE_DEFINITION"]

Check warning on line 82 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L82

Added line #L82 was not covered by tests

procedures_list.append(

Check warning on line 84 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L84

Added line #L84 was not covered by tests
dict(
routine_schema=schema,
routine_name=row["ROUTINE_NAME"],
code=code,
)
)
return procedures_list

Check warning on line 91 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L91

Added line #L91 was not covered by tests

def loop_stored_procedures(
self,
inspector: Inspector,
schema: str, # In two-tier this is actually the database name
sql_config: MySQLConfig,
) -> Iterable[MetadataWorkUnit]:
"""
Override to ensure MariaDB source is set correctly while maintaining two-tier structure.
"""
db_name = self.get_db_name(inspector)
procedure_flow_name = f"{db_name}.stored_procedures"
mariadb_procedure_container = MySQLProcedureContainer(

Check warning on line 104 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L102-L104

Added lines #L102 - L104 were not covered by tests
name=procedure_flow_name,
env=sql_config.env,
db=db_name,
platform_instance=sql_config.platform_instance,
source="mariadb",
)
data_flow = MySQLDataFlow(entity=mariadb_procedure_container)

Check warning on line 111 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L111

Added line #L111 was not covered by tests

with inspector.engine.connect() as conn:
procedures_data = self._get_stored_procedures(conn, db_name, schema)
procedures: List[MySQLStoredProcedure] = []

Check warning on line 115 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L113-L115

Added lines #L113 - L115 were not covered by tests

for procedure_data in procedures_data:
procedure_full_name = f"{db_name}.{procedure_data['routine_name']}"
if not self.config.procedure_pattern.allowed(procedure_full_name):
self.report.report_dropped(procedure_full_name)
continue
procedures.append(

Check warning on line 122 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L117-L122

Added lines #L117 - L122 were not covered by tests
MySQLStoredProcedure(
flow=mariadb_procedure_container, **procedure_data
)
)

if procedures:
yield from self.construct_flow_workunits(data_flow=data_flow)
for procedure in procedures:
yield from self._process_stored_procedure(conn, procedure)

Check warning on line 131 in metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/mariadb.py#L128-L131

Added lines #L128 - L131 were not covered by tests
98 changes: 0 additions & 98 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.ingestion.source.sql.mysql.source import MySQLConfig, MySQLSource
Loading
Loading