Skip to content

Commit

Permalink
querying data from different dialects and docummentation updates
Browse files Browse the repository at this point in the history
  • Loading branch information
CrispenGari committed Feb 4, 2024
1 parent 90d97f1 commit fa1bda8
Show file tree
Hide file tree
Showing 18 changed files with 6,757 additions and 436 deletions.
5,850 changes: 5,850 additions & 0 deletions dataloom.sql

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dataloom/constants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import datetime

CURRENT_TIME_STAMP = datetime.now()
instances = {
"postgres": {
"type": "postgres",
Expand Down
69 changes: 20 additions & 49 deletions dataloom/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,29 +315,30 @@ def find_one(self, instance: Model, filters: dict = {}):
sql, fields, params = instance._get_select_where_stm(
dialect=self.dialect, args=filters
)
print(params)
row = self._execute_sql(sql, args=params, fetchone=True)
return None if row is None else instance(**dict(zip(fields, row)))

def update_by_pk(self, instance: Model, pk, values: dict = {}):
sql, args = instance._get_update_by_pk_stm(dialect=self.dialect, args=values)
args.append(pk)
affected_rows = self._execute_sql(sql, args=args, affected_rows=True)
return affected_rows

# """
# SELECT
# posts.post_id,
# posts.content,
# posts.created_at,
# users.user_id,
# users.username
# FROM
# posts
# JOIN
# users ON posts.user_id = users.user_id
# WHERE
# posts.post_id = 1; -- Replace 1 with the specific post_id you are interested in
# """
# fields = list()
# for name, field in inspect.getmembers(instance):
# if isinstance(field, Column):
# fields.append(name)
def update_one(self, instance: Model, filters: dict = {}, values: dict = {}):
sql, new_values, filter_values = instance._get_update_one_stm(
dialect=self.dialect, filters=filters, values=values
)
args = [*new_values, *filter_values]
affected_rows = self._execute_sql(sql, args=args, affected_rows=True)
return affected_rows

def update_bulk(self, instance: Model, filters: dict = {}, values: dict = {}):
sql, new_values, filter_values = instance._get_update_bulk_where_stm(
dialect=self.dialect, filters=filters, values=values
)
args = [*new_values, *filter_values]
affected_rows = self._execute_sql(sql, args=args, affected_rows=True)
return affected_rows


# def delete_bulk(self, instance: Model, filters: dict = {}):
Expand Down Expand Up @@ -368,33 +369,3 @@ def find_one(self, instance: Model, filters: dict = {}):
# sql, args=(pk,), affected_rows=True, fetchall=True
# )
# return affected_rows

# def update_by_pk(self, instance: Model, pk, values: dict = {}):
# pk_name = "id"
# for name, field in inspect.getmembers(instance):
# if isinstance(field, PrimaryKeyColumn):
# pk_name = name
# sql, values = instance._get_update_by_pk_stm(pk_name, values)
# values.append(pk)
# affected_rows = self._execute_sql(sql, args=values, affected_rows=True)
# return affected_rows

# def update_one(self, instance: Model, filters: dict = {}, values: dict = {}):
# pk_name = "id"
# for name, field in inspect.getmembers(instance):
# if isinstance(field, PrimaryKeyColumn):
# pk_name = name
# sql, new_values, filter_values = instance._get_update_one_stm(
# pk_name, filters, values
# )
# args = [*new_values, *filter_values]
# affected_rows = self._execute_sql(sql, args=args, affected_rows=True)
# return affected_rows

# def update_bulk(self, instance: Model, filters: dict = {}, values: dict = {}):
# sql, new_values, filter_values = instance._get_update_bulk_where_stm(
# filters, values
# )
# args = [*new_values, *filter_values]
# affected_rows = self._execute_sql(sql, args=args, affected_rows=True)
# return affected_rows
8 changes: 8 additions & 0 deletions dataloom/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@ class UnsupportedDialectException(ValueError):

class UnsupportedTypeException(ValueError):
pass


class InvalidFiltersForTableColumnException(ValueError):
pass


class InvalidColumnValuesException(ValueError):
pass
157 changes: 157 additions & 0 deletions dataloom/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CreatedAtColumn,
UpdatedAtColumn,
)
from dataloom.constants import CURRENT_TIME_STAMP


class Model:
Expand Down Expand Up @@ -247,6 +248,162 @@ def _get_select_where_stm(cls, dialect: str, args: dict = {}):
)
return sql, fields, params

@classmethod
def _get_update_by_pk_stm(cls, dialect: str, args: dict = {}):
fields = []
# what is the pk name and updated column name?
pk_name = None
updatedAtColumName = None
for name, field in inspect.getmembers(cls):
if isinstance(field, Column):
fields.append(name)
elif isinstance(field, ForeignKeyColumn):
fields.append(name)
elif isinstance(field, PrimaryKeyColumn):
fields.append(name)
pk_name = f'"{name}"' if dialect == "postgres" else f"`{name}`"
elif isinstance(field, CreatedAtColumn):
fields.append(name)
elif isinstance(field, UpdatedAtColumn):
fields.append(name)
updatedAtColumName = (
f'"{name}"' if dialect == "postgres" else f"`{name}`"
)
values = list()
placeholders = list()
for key, value in args.items():
if key in fields:
placeholders.append(
f'"{key}" = %s' if dialect == "postgres" else f"`{key}` = {'%s' if dialect == 'mysql' else '?'}"
)
values.append(value)

if updatedAtColumName is not None:
placeholders.append(f'{updatedAtColumName} = {'?' if dialect == 'sqlite' else '%s'}')
values.append(CURRENT_TIME_STAMP)

if dialect == "postgres" or "mysql" or "sqlite":
sql = GetStatement(
dialect=dialect, model=cls, table_name=cls._get_table_name()
)._get_update_by_pk_command(placeholders=placeholders, pk_name=pk_name)
else:
raise UnsupportedDialectException(
"The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}"
)
return sql, values


@classmethod
def _get_update_one_stm(cls, dialect: str,
filters: dict={}, values: dict={}
):
fields = []
# what is the pk name and updated column name?
pk_name = None
updatedAtColumName = None
for name, field in inspect.getmembers(cls):
if isinstance(field, Column):
fields.append(name)
elif isinstance(field, ForeignKeyColumn):
fields.append(name)
elif isinstance(field, PrimaryKeyColumn):
fields.append(name)
pk_name = f'"{name}"' if dialect == "postgres" else f"`{name}`"
elif isinstance(field, CreatedAtColumn):
fields.append(name)
elif isinstance(field, UpdatedAtColumn):
fields.append(name)
updatedAtColumName = (
f'"{name}"' if dialect == "postgres" else f"`{name}`"
)

new_values = []
placeholders_of_new_values = []
placeholder_filter_values = []
placeholder_filters = []

for key, value in filters.items():
if key in fields:
placeholder_filters.append(
f'"{key}" = %s' if dialect == "postgres" else f"`{key}` = {'%s' if dialect == 'mysql' else '?'}"
)
placeholder_filter_values.append(value)
for key, value in values.items():
if key in fields:
placeholders_of_new_values.append(
f'"{key}" = %s' if dialect == "postgres" else f"`{key}` = {'%s' if dialect == 'mysql' else '?'}"
)
new_values.append(value)

if updatedAtColumName is not None:
placeholders_of_new_values.append(f'{updatedAtColumName} = {'?' if dialect == 'sqlite' else '%s'}')
new_values.append(CURRENT_TIME_STAMP)

if dialect == "postgres" or "mysql" or "sqlite":
sql = GetStatement(
dialect=dialect, model=cls, table_name=cls._get_table_name()
)._get_update_one_command(pk_name=pk_name,placeholders_of_new_values=placeholders_of_new_values, placeholder_filters=placeholder_filters)
else:
raise UnsupportedDialectException(
"The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}"
)
return sql, new_values, placeholder_filter_values

@classmethod
def _get_update_bulk_where_stm(cls, dialect: str,
filters: dict={}, values: dict={}
):
fields = []
# what is updated column name?

updatedAtColumName = None
for name, field in inspect.getmembers(cls):
if isinstance(field, Column):
fields.append(name)
elif isinstance(field, ForeignKeyColumn):
fields.append(name)
elif isinstance(field, PrimaryKeyColumn):
fields.append(name)
elif isinstance(field, CreatedAtColumn):
fields.append(name)
elif isinstance(field, UpdatedAtColumn):
fields.append(name)
updatedAtColumName = (
f'"{name}"' if dialect == "postgres" else f"`{name}`"
)

new_values = []
placeholders_of_new_values = []
placeholder_filter_values = []
placeholder_filters = []

for key, value in filters.items():
if key in fields:
placeholder_filters.append(
f'"{key}" = %s' if dialect == "postgres" else f"`{key}` = {'%s' if dialect == 'mysql' else '?'}"
)
placeholder_filter_values.append(value)
for key, value in values.items():
if key in fields:
placeholders_of_new_values.append(
f'"{key}" = %s' if dialect == "postgres" else f"`{key}` = {'%s' if dialect == 'mysql' else '?'}"
)
new_values.append(value)

if updatedAtColumName is not None:
placeholders_of_new_values.append(f'{updatedAtColumName} = {'?' if dialect == 'sqlite' else '%s'}')
new_values.append(CURRENT_TIME_STAMP)

if dialect == "postgres" or "mysql" or "sqlite":
sql = GetStatement(
dialect=dialect, model=cls, table_name=cls._get_table_name()
)._get_update_bulk_command(placeholders_of_new_values=placeholders_of_new_values, placeholder_filters=placeholder_filters)
else:
raise UnsupportedDialectException(
"The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}"
)
return sql, new_values, placeholder_filter_values


# class IModel[T](ABC):

Expand Down
Loading

0 comments on commit fa1bda8

Please sign in to comment.