Skip to content

Commit

Permalink
feat: add validation for mapping values
Browse files Browse the repository at this point in the history
  • Loading branch information
HectorxH committed Oct 7, 2024
1 parent 8bcad29 commit 0aea99a
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 358 deletions.
9 changes: 0 additions & 9 deletions .pre-commit-config.yaml

This file was deleted.

18 changes: 11 additions & 7 deletions mapping.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#:schema ./mapping_schema.json
[[tables]]
from = "object_parquet"
to = "object"
from = "objects_parquet"
to = "objects"
[[tables.columns]]
from = "oid_parquet"
to = "oid"
Expand All @@ -16,14 +16,18 @@ to = "ndet"
from = "detections_parquet"
to = "detections"
[[tables.columns]]
from = "candid_parquet"
to = "candid"
[[tables.columns]]
from = "oid_parquet"
to = "oid"
[[tables.columns]]
from = "mag_parquet"
to = "mag"
[[tables.columns]]
from = "other_parquet"
to = "blahblah"
from = "candid_parquet"
to = "candid"

[[tables]]
from = "fake_parquet"
to = "fake_table"
[[tables.columns]]
from = "fake_column_parquet"
to = "fake_column_db"
1 change: 1 addition & 0 deletions mapping_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"$defs": {"ColumnMapping": {"properties": {"from": {"title": "From", "type": "string"}, "to": {"title": "To", "type": "string"}}, "required": ["from", "to"], "title": "ColumnMapping", "type": "object"}, "TableMappings": {"properties": {"from": {"title": "From", "type": "string"}, "to": {"title": "To", "type": "string"}, "columns": {"items": {"$ref": "#/$defs/ColumnMapping"}, "title": "Columns", "type": "array"}}, "required": ["from", "to", "columns"], "title": "TableMappings", "type": "object"}}, "properties": {"tables": {"items": {"$ref": "#/$defs/TableMappings"}, "title": "Tables", "type": "array"}}, "required": ["tables"], "title": "DBMappings", "type": "object"}
545 changes: 271 additions & 274 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ readme = "README.md"

[tool.poetry.scripts]
mdbl = "mdbl.cli:main"
mdbl_dev = "mdbl.cli:dev_utils"

[tool.poetry.dependencies]
python = "^3.12"
Expand All @@ -17,11 +18,11 @@ pydantic = "^2.9.1"
faker = "^28.4.1"
polars = "^1.7.1"
pyyaml = "^6.0.2"
psycopg = { extras = ["binary"], version = "^3.2.2" }

[tool.poetry.group.dev.dependencies]
ruff = "^0.6.4"
pytest = "^8.3.3"
pre-commit = "^3.8.0"

[build-system]
requires = ["poetry-core"]
Expand All @@ -31,3 +32,6 @@ build-backend = "poetry.core.masonry.api"
reportAny = false
reportUnknownMemberType = false
reportUnusedCallResult = false

[tool.ruff]
target-version = "py312"
4 changes: 4 additions & 0 deletions src/mdbl/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from mdbl.cli import main

if __name__ == "__main__":
main(obj={})
114 changes: 87 additions & 27 deletions src/mdbl/cli.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,51 @@
import json
from typing import BinaryIO, TextIO
from typing import Any, BinaryIO, TextIO

import click
import duckdb
import psycopg

import mdbl.mdbl as mdbl
from mdbl import utils
from mdbl.models.cli import ValidFileTypes
from mdbl.models.mappings import DBMappings


@click.group
def main():
@click.option("--db-name", required=True, type=str, envvar="MDBL_DB_NAME")
@click.option("--db-user", required=True, type=str, envvar="MDBL_DB_USER")
@click.option("--db-host", required=True, type=str, envvar="MDBL_DB_HOST")
@click.option("--db-port", required=True, type=int, envvar="MDBL_DB_PORT")
@click.option("--db-pass", required=True, type=str, envvar="MDBL_DB_PASS")
@click.pass_context
def main(
ctx: click.Context,
db_name: str,
db_user: str,
db_host: str,
db_port: int,
db_pass: str,
):
"""
Main Group
"""
pass
con = duckdb.connect()
con.install_extension("postgres")
con.load_extension("postgres")
con.sql(
f"CREATE SECRET (TYPE POSTGRES, HOST '{db_host}', PORT {db_port}, DATABASE {db_name}, USER '{db_user}', PASSWORD '{db_pass}');"
)
con.sql("ATTACH '' AS pg (TYPE POSTGRES);")
con.sql("USE pg;")

ctx.ensure_object(dict)
ctx.obj["con"] = con


@main.command()
@click.option("-f", "--file", type=click.File("w"))
@click.option("-i", "--indent", type=int, default=4)
def schema(file: TextIO | None, indent: int):
def generate_mapping_schema(file: TextIO | None, indent: int):
"""
Generates a JSON schema describing the format of the mapping file
to give better IDE support.
Expand All @@ -33,32 +58,67 @@ def schema(file: TextIO | None, indent: int):


@main.command()
def sql():
with duckdb.connect() as con:
con.install_extension("postgres")
con.load_extension("postgres")
con.sql(
"ATTACH 'dbname=postgres user=postgres password=postgres host=127.0.0.1' as postgres (TYPE POSTGRES)"
)
while True:
query: str = click.prompt("SQL")
try:
con.sql(query).show()
except click.ClickException as e:
click.echo(e)
return
except Exception as e:
click.secho(e, fg="yellow")


@main.command()
@click.option("-f", "--file", required=True, type=click.File("rb"))
@click.pass_obj
@click.option("-p", "--parquets", default="parquets", type=str)
@click.option("-m", "--mapping", required=True, type=click.File("rb"))
@click.option(
"-t",
"--file_type",
required=True,
default="TOML",
type=click.Choice(ValidFileTypes.possible_values(), case_sensitive=False),
)
def data_load(file: BinaryIO, file_type: str):
db_mappings = mdbl.read_mapping(file, ValidFileTypes(file_type))
mdbl.data_load(db_mappings)
def data_load(
obj: dict[str, Any],
parquets: str,
mapping: BinaryIO,
file_type: str,
):
con: duckdb.DuckDBPyConnection = obj["con"]
db_mappings = mdbl.read_mapping(mapping, ValidFileTypes(file_type))
try:
mdbl.check_missing_elements(con, db_mappings, parquets)
mdbl.data_load(con, db_mappings, parquets)
except IOError as e:
click.echo(e)
except ExceptionGroup as eg:
click.echo(eg.message)
for e in eg.exceptions:
click.echo(f"\t> {e}")
finally:
con.close()


@click.group
def dev_utils():
pass


@dev_utils.command()
def generate_dummy_data():
utils.generate_dummy_parquets()


@dev_utils.command()
def recreate_tables():
with psycopg.connect(
"dbname=postgres user=postgres password=postgres host=127.0.0.1"
) as con:
with con.cursor() as cur:
cur.execute("""
DROP TABLE IF EXISTS objects CASCADE;
CREATE TABLE objects (
oid VARCHAR(255) PRIMARY KEY,
firstmjd VARCHAR(255),
ndet VARCHAR(255)
)
""")

cur.execute("""
DROP TABLE IF EXISTS detections CASCADE;
CREATE TABLE detections (
candid VARCHAR(255) PRIMARY KEY,
oid VARCHAR(255) REFERENCES objects(oid),
mag VARCHAR(255)
)
""")
110 changes: 91 additions & 19 deletions src/mdbl/mdbl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import BinaryIO

import duckdb
Expand All @@ -6,7 +7,6 @@

from mdbl.models.cli import ValidFileTypes
from mdbl.models.mappings import DBMappings
from mdbl.utils import generate_dummy_parquets


def read_mapping(file: BinaryIO, file_type: ValidFileTypes) -> DBMappings:
Expand All @@ -19,7 +19,88 @@ def read_mapping(file: BinaryIO, file_type: ValidFileTypes) -> DBMappings:
return DBMappings.model_validate(data)


def data_load(db_mappings: DBMappings, folder: str = "parquets"):
class MissingColumnError(Exception):
pass


class MissingTableError(Exception):
pass


def check_missing_elements(
con: duckdb.DuckDBPyConnection, db_mappings: DBMappings, folder: str
):
if not os.path.isdir(folder):
raise IOError(f"Folder '{folder}' does not exist.")

db_tables = con.execute("SELECT name FROM (SHOW TABLES);").fetchall()
db_tables = set(map(lambda row: row[0], db_tables))

db_columns = {}
for table in db_tables:
table_columns = con.execute(
f"SELECT column_name FROM (SHOW {table})"
).fetchall()
db_columns[table] = set(map(lambda row: row[0], table_columns))

exceptions: list[MissingColumnError | MissingTableError] = []

# Check for missing fields on db
for table_mapping in db_mappings.tables:
if table_mapping.to not in db_tables:
exceptions.append(
MissingTableError(
f"Table '{table_mapping.to}' does not exist in database."
)
)
continue
for column_mapping in table_mapping.columns:
if column_mapping.to not in db_columns[table_mapping.to]:
exceptions.append(
MissingColumnError(
f"Column '{column_mapping.to}' does not exist in table '{table_mapping.to}' in the database"
)
)

# Check for missing fields on parquets
for table_mappings in db_mappings.tables:
parquet_path = os.path.join(folder, table_mappings.from_)
if not os.path.isdir(parquet_path):
exceptions.append(
MissingTableError(
f"Parquets '{table_mappings.from_}' not found in '{os.path.join(parquet_path, '*.parquet')}'."
)
)
continue

columns = duckdb.execute(
f"""
SELECT column_name
FROM (
SHOW SELECT * FROM read_parquet('{os.path.join(parquet_path, '*.parquet')}')
);"""
).fetchall()
columns = set(map(lambda row: row[0], columns))
for column_mapping in table_mappings.columns:
if column_mapping.from_ not in columns:
exceptions.append(
MissingColumnError(
f"Column '{column_mapping.to}' does not exist on parquets '{os.path.join(parquet_path, '*.parquet')}'."
)
)

if exceptions != []:
raise ExceptionGroup(
"While checking the mapping, found missing items",
exceptions,
)


def data_load(
con: duckdb.DuckDBPyConnection,
db_mappings: DBMappings,
folder: str = "parquets",
):
"""
Exmaple:
Expand All @@ -39,7 +120,7 @@ def data_load(db_mappings: DBMappings, folder: str = "parquets"):
```
```sql
CREATE TABLE postgres.object AS (
INSERT INTO postgres.object (
SELECT
oid_parquet as oid,
firstmjd_parquet as firstmjd,
Expand All @@ -48,21 +129,12 @@ def data_load(db_mappings: DBMappings, folder: str = "parquets"):
);
```
"""
generate_dummy_parquets(folder=folder)

with duckdb.connect() as con:
con.install_extension("postgres")
con.load_extension("postgres")
con.sql(
"ATTACH 'dbname=postgres user=postgres password=postgres host=127.0.0.1' as postgres (TYPE POSTGRES)"
for table in db_mappings.tables:
aliases = ", ".join(
[f"{column.from_} as {column.to}" for column in table.columns]
)
query = f"SELECT {aliases} FROM read_parquet('{os.path.join(folder, table.from_, '*.parquet')}')"
con.sql(query).show()

for table in db_mappings.tables:
aliases = ", ".join(
[f"{column.from_} as {column.to}" for column in table.columns]
)
query = f"SELECT {aliases} FROM read_parquet('{folder}/{table.from_}/*.parquet')"
con.sql(query).show()

con.sql(f"CREATE OR REPLACE TABLE postgres.{table.to} AS {query}")
con.sql(f"SELECT * FROM postgres.{table.to}")
con.sql(f"INSERT INTO {table.to} BY NAME {query}")
con.sql(f"SELECT COUNT() as 'Total rows inserted' FROM {table.to}").show()
20 changes: 0 additions & 20 deletions src/mdbl/models/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,3 @@ class TableMappings(BaseModel):

class DBMappings(BaseModel):
tables: list[TableMappings] = []


a = {
"tables": [
{
"from": "parquet_obj",
"to": "obj",
"columns": [{"from": "col_tabla", "to": "col_db"}],
},
{
"from": "parquet_det",
"to": "det",
"columns": [
{"from": "col_tabla", "to": "col_db"},
{"from": "col_tabla", "to": "col_db"},
{"from": "col_tabla", "to": "col_db"},
],
},
]
}
Loading

0 comments on commit 0aea99a

Please sign in to comment.