diff --git a/dataloom/__init__.py b/dataloom/__init__.py index fc446a0..c48cdef 100644 --- a/dataloom/__init__.py +++ b/dataloom/__init__.py @@ -1,3 +1,27 @@ from dataloom.db import Dataloom +from dataloom import exceptions +from dataloom.model import Model +from dataloom.model.column import ( + PrimaryKeyColumn, + CreatedAtColumn, + ForeignKeyColumn, + UpdatedAtColumn, + Column, +) -dataloom = Dataloom() +Dataloom = Dataloom +# Columns +PrimaryKeyColumn = PrimaryKeyColumn +CreatedAtColumn = CreatedAtColumn +ForeignKeyColumn = ForeignKeyColumn +UpdatedAtColumn = UpdatedAtColumn +Column = Column + + +# exceptions +PkNotDefinedException = exceptions.PkNotDefinedException +TooManyPkException = exceptions.TooManyPkException +UnsupportedDialectException = exceptions.UnsupportedDialectException + +# models +Model = Model diff --git a/dataloom/conn/__init__.py b/dataloom/conn/__init__.py index 443427a..fa98c28 100644 --- a/dataloom/conn/__init__.py +++ b/dataloom/conn/__init__.py @@ -1,8 +1,12 @@ from dataclasses import dataclass, field -from dataloom.exceptions import UnsupportedDialect +from dataloom.exceptions import UnsupportedDialectException from sqlite3.dbapi2 import Connection from os import PathLike from typing_extensions import TypeAlias +from typing import Optional + +from dataloom.constants import instances + StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] @@ -16,12 +20,12 @@ def connection_options[T](self) -> T: @dataclass(kw_only=True) class PostgresDialect(Dialect): # "postgres://postgres:postgres@localhost:5432/db" - connection_string: str | None = field(default=None) + connection_string: Optional[str] = field(default=None) database: str - user: str | None = field(default="postgres") - host: str | None = field(default="localhost") - port: int | None = field(default=5432) - password: str | None = field(default="postgres") + user: Optional[str] = field(default=instances["postgres"].get("user")) + host: Optional[str] = field(default=instances["postgres"].get("host")) + port: Optional[int] = field(default=instances["postgres"].get("port")) + password: Optional[str] = field(default=instances["postgres"].get("user")) def connection_options[T](self) -> T: return ( @@ -31,12 +35,12 @@ def connection_options[T](self) -> T: @dataclass(kw_only=True) class MySQLDialect(Dialect): - connection_string: str | None = field(default=None) + connection_string: Optional[str] = field(default=None) database: str - user: str | None = field(default="root") - host: str | None = field(default="localhost") - port: int | None = field(default=3306) - password: str | None = field(default="root") + user: Optional[str] = field(default=instances["mysql"].get("user")) + host: Optional[str] = field(default=instances["mysql"].get("host")) + port: Optional[int] = field(default=instances["mysql"].get("port")) + password: Optional[str] = field(default=instances["mysql"].get("password")) def connection_options[T](self) -> T: return ( @@ -46,14 +50,14 @@ def connection_options[T](self) -> T: @dataclass(kw_only=True) class SQLiteDialect(Dialect): - database: StrOrBytesPath = field(default="users.db") - timeout: float | None = field(default=None) - detect_types: int | None = field(default=None) - isolation_level: str | None = field(default=None) + database: StrOrBytesPath = field(default=instances["sqlite"].get("database")) + timeout: Optional[float] = field(default=None) + detect_types: Optional[int] = field(default=None) + isolation_level: Optional[str] = field(default=None) check_same_thread: bool = field(default=None) factory: type[Connection] | None = field(default=None) - cached_statements: int | None = field(default=None) - uri: bool | None = field(default=None) + cached_statements: Optional[int] = field(default=None) + uri: Optional[bool] = field(default=None) def connection_options[T](self) -> T: return vars(self) @@ -62,7 +66,9 @@ def connection_options[T](self) -> T: @dataclass class ConnectionOptionsFactory: @staticmethod - def get_connection_options(dialect, **kwargs): + def get_connection_options(**kwargs): + dialect = kwargs.get("dialect") + kwargs = {k: v for k, v in kwargs.items() if k != "dialect"} if dialect == "postgres": return { k: v @@ -78,6 +84,6 @@ def get_connection_options(dialect, **kwargs): if v is not None } else: - raise UnsupportedDialect( + raise UnsupportedDialectException( "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" ) diff --git a/dataloom/constants/__init__.py b/dataloom/constants/__init__.py index a60a7c0..b377890 100644 --- a/dataloom/constants/__init__.py +++ b/dataloom/constants/__init__.py @@ -4,6 +4,14 @@ "port": 5432, "user": "postgres", "password": "postgres", + "host": "localhost" or "127.0.0.1", + }, + "mysql": { + "type": "mysql", + "port": 3306, + "user": "root", + "password": "root", "host": "127.0.0.1" or "localhost", - } + }, + "sqlite": {"database": "dataloom_instance.db", "type": "sqlite"}, } diff --git a/dataloom/db/__init__.py b/dataloom/db/__init__.py index 9c3a5ca..5634eb3 100644 --- a/dataloom/db/__init__.py +++ b/dataloom/db/__init__.py @@ -3,9 +3,10 @@ import sqlite3 import inspect from dataloom.constants import instances -from dataloom.model.statements import Statements -from dataloom.exceptions import UnsupportedDialect -from dataloom.model.model import Model +from dataloom.model.statements import PgStatements +from dataloom.exceptions import UnsupportedDialectException +from dataloom.model import Model +from dataloom.statements import GetStatement from dataloom.conn import ConnectionOptionsFactory from dataloom.model.column import ( Column, @@ -14,59 +15,52 @@ CreatedAtColumn, ForeignKeyColumn, ) +from typing import Optional class Dataloom: - conn = None - - def connect(self, dialect, **kwargs): - if dialect == "postgres": - options = ConnectionOptionsFactory.get_connection_options(dialect, **kwargs) - self.conn = psycopg2.connect(**options) - return self.conn - elif dialect == "mysql": - options = ConnectionOptionsFactory.get_connection_options(dialect, **kwargs) - self.conn = connector.connect(**options) - return self.conn - elif dialect == "sqlite": - options = ConnectionOptionsFactory.get_connection_options(dialect, **kwargs) - self.conn = ( - sqlite3.connect(options.get("database")) - if "database" in options - else sqlite3.connect(**options) - ) - return self.conn - else: - raise UnsupportedDialect( - "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" - ) - - -class Database: def __init__( self, database: str, - dialect: str = "postgres", - user: str | None = None, - host: str | None = None, - port: int | None = None, - password: str | None = None, - logs: bool = True, + dialect: str, + user: Optional[str] = None, + host: Optional[str] = None, + port: Optional[int] = None, + password: Optional[str] = None, + logging: bool = True, ) -> None: - config = instances[dialect] - self.user = user if user else config["user"] - self.password = password if password else config["password"] - self.port = port if port else config["port"] - self.host = host if host else config["host"] self.database = database self.conn = None - self.logs = logs + self.logging = logging + self.dialect = dialect + try: + config = instances[dialect] + except KeyError: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + if dialect != "sqlite": + self.connection_options = { + "database": self.database, + "dialect": self.dialect, + "user": user if user else config["user"], + "host": host if host else config["host"], + "port": port if port else config["port"], + "password": password if password else config["password"], + } + else: + self.connection_options = { + "database": self.database, + "dialect": self.dialect, + } @property def tables(self): - res = self._execute_sql( - Statements.GET_TABLES.format(schema_name="public"), fetchall=True - ) + sql = GetStatement(self.dialect)._get_tables_command + res = self._execute_sql(sql, fetchall=True) + if self.dialect == "sqlite": + return [t[0] for t in res if not str(t[0]).lower().startswith("sqlite_")] return [t[0] for t in res] def _execute_sql( @@ -81,19 +75,50 @@ def _execute_sql( affected_rows: bool = False, ): # do we need to log the executed SQL? - if self.logs: + if self.logging: print(sql) - - with self.conn.cursor() as cursor: - if args is None: + if self.dialect == "postgres": + with self.conn.cursor() as cursor: + if args is None: + cursor.execute(sql) + else: + if bulk: + cursor.executemany(sql, vars_list=args) + else: + cursor.execute(sql, vars=args) + # options + if bulk or affected_rows: + result = cursor.rowcount + else: + if fetchmany: + result = cursor.fetchmany() + elif fetchall: + result = cursor.fetchall() + elif fetchone: + result = cursor.fetchone() + else: + result = None + if mutation: + self.conn.commit() + elif self.dialect == "mysql": + with self.conn.cursor(buffered=True) as cursor: cursor.execute(sql) + if bulk or affected_rows: + result = cursor.rowcount else: - ( - cursor.executemany(sql, vars_list=args) - if bulk - else cursor.execute(sql, vars=args) - ) - # options + if fetchmany: + result = cursor.fetchmany() + elif fetchall: + result = cursor.fetchall() + elif fetchone: + result = cursor.fetchone() + else: + result = None + if mutation: + self.conn.commit() + elif self.dialect == "sqlite": + cursor = self.conn.cursor() + cursor.execute(sql) if bulk or affected_rows: result = cursor.rowcount else: @@ -107,209 +132,327 @@ def _execute_sql( result = None if mutation: self.conn.commit() + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) return result def connect(self): - try: - self.conn = psycopg2.connect( - host=self.host, - database=self.database, - user=self.user, - password=self.password, - port=self.port, + if self.dialect == "postgres": + options = ConnectionOptionsFactory.get_connection_options( + **self.connection_options ) - return self.conn - except Exception as e: - raise Exception(e) - - def connect_and_sync( - self, models: list[Model], drop=False, force=False, alter=False - ): - try: - self.conn = psycopg2.connect( - host=self.host, - database=self.database, - user=self.user, - password=self.password, - port=self.port, + with psycopg2.connect(**options) as conn: + self.conn = conn + elif self.dialect == "mysql": + options = ConnectionOptionsFactory.get_connection_options( + **self.connection_options ) - for model in models: - if drop or force: - self._execute_sql(model._drop_sql()) - self._execute_sql(model._create_sql()) - elif alter: - pass - else: - self._execute_sql(model._create_sql(ignore_exists=True)) - return self.conn, self.tables + self.conn = connector.connect(**options) - except Exception as e: - raise Exception(e) + elif self.dialect == "sqlite": + options = ConnectionOptionsFactory.get_connection_options( + **self.connection_options + ) + if "database" in options: + with sqlite3.connect(options.get("database")) as conn: + self.conn = conn + else: + with sqlite3.connect(**options) as conn: + self.conn = conn + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return self.conn def sync(self, models: list[Model], drop=False, force=False, alter=False): try: for model in models: if drop or force: - self._execute_sql(model._drop_sql()) - self._execute_sql(model._create_sql()) + self._execute_sql(model._drop_sql(dialect=self.dialect)) + self._execute_sql(model._create_sql(dialect=self.dialect)) elif alter: pass else: - self._execute_sql(model._create_sql(ignore_exists=True)) - + self._execute_sql( + model._create_sql(dialect=self.dialect, ignore_exists=True) + ) return self.tables except Exception as e: raise Exception(e) - def create(self, instance: Model): - sql, values = instance._get_insert_one_stm() - row = self._execute_sql(sql, args=tuple(values), fetchone=True) - return row[0] - - def create_bulk(self, instances: list[Model]): - columns = None - placeholders = None - data = list() - for instance in instances: - ( - column_names, - placeholder_values, - _values, - ) = instance._get_insert_bulk_attrs() - if columns is None: - columns = column_names - if placeholders is None: - placeholders = placeholder_values - - data.append(_values) - sql, values = instance._get_insert_bulk_smt(placeholders, columns, data) - row_count = self._execute_sql(sql, args=tuple(values), fetchall=True, bulk=True) - return row_count - - def find_all(self, instance: Model): - fields = list() - for name, field in inspect.getmembers(instance): - if isinstance(field, Column): - fields.append(name) - sql, _, __ = instance._get_select_where_stm(fields) - data = list() - rows = self._execute_sql(sql, fetchall=True) - for row in rows: - res = dict(zip(fields, row)) - data.append(instance(**res)) - return data - - def find_many(self, instance: Model, filters: dict = {}): - fields = list() - for name, field in inspect.getmembers(instance): - if isinstance(field, Column): - fields.append(name) - sql, _, params = instance._get_select_where_stm(fields, filters) - data = list() - rows = self._execute_sql(sql, args=params, fetchall=True) - for row in rows: - res = dict(zip(fields, row)) - data.append(instance(**res)) - return data - - def find_by_pk(self, instance: Model, pk, options: dict = {}): - # what is the name of the primary key column? - """ - SELECT - posts.post_id, - posts.content, - posts.created_at, - users.user_id, - users.username - FROM - posts - JOIN - users ON posts.user_id = users.user_id - WHERE - posts.post_id = 1; -- Replace 1 with the specific post_id you are interested in - """ - pk_name = "id" - fields = list() - for name, field in inspect.getmembers(instance): - if ( - isinstance(field, Column) - or isinstance(field, ForeignKeyColumn) - or isinstance(field, CreatedAtColumn) - or isinstance(field, UpdatedAtColumn) - ): - fields.append(name) - elif isinstance(field, PrimaryKeyColumn): - pk_name = name - fields.append(name) - sql, fields = instance._get_select_by_pk_stm(pk, pk_name, fields=fields) - row = self._execute_sql(sql, fetchone=True) - return None if row is None else instance(**dict(zip(fields, row))) - - def find_one(self, instance: Model, filters: dict = {}): - fields = list() - for name, field in inspect.getmembers(instance): - if isinstance(field, Column): - fields.append(name) - sql, _, params = instance._get_select_where_stm(fields, filters) - row = self._execute_sql(sql, args=params, fetchone=True) - return None if row is None else instance(**dict(zip(fields, row))) - - def delete_bulk(self, instance: Model, filters: dict = {}): - sql, params = instance._get_delete_bulk_where_stm(filters) - affected_rows = self._execute_sql( - sql, args=params, affected_rows=True, fetchall=True - ) - return affected_rows - - def delete_one(self, instance: Model, filters: dict = {}): - pk = None - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk = name - sql, params = instance._get_delete_where_stm(pk=pk, args=filters) - affected_rows = self._execute_sql(sql, args=params, affected_rows=True) - return affected_rows - - def delete_by_pk(self, instance: Model, pk): - # what is the name of the primary key column? - pk_name = "id" - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk_name = name - - sql, pk = instance._get_delete_by_pk_stm(pk, pk_name) - affected_rows = self._execute_sql( - sql, args=(pk,), affected_rows=True, fetchall=True - ) - return affected_rows - - def update_by_pk(self, instance: Model, pk, values: dict = {}): - pk_name = "id" - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk_name = name - sql, values = instance._get_update_by_pk_stm(pk_name, values) - values.append(pk) - affected_rows = self._execute_sql(sql, args=values, affected_rows=True) - return affected_rows - - def update_one(self, instance: Model, filters: dict = {}, values: dict = {}): - pk_name = "id" - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk_name = name - sql, new_values, filter_values = instance._get_update_one_stm( - pk_name, filters, values - ) - args = [*new_values, *filter_values] - affected_rows = self._execute_sql(sql, args=args, affected_rows=True) - return affected_rows - - def update_bulk(self, instance: Model, filters: dict = {}, values: dict = {}): - sql, new_values, filter_values = instance._get_update_bulk_where_stm( - filters, values - ) - args = [*new_values, *filter_values] - affected_rows = self._execute_sql(sql, args=args, affected_rows=True) - return affected_rows + +# class Database: +# def __init__( +# self, +# database: str, +# dialect: str = "postgres", +# user: str | None = None, +# host: str | None = None, +# port: int | None = None, +# password: str | None = None, +# logs: bool = True, +# ) -> None: +# config = instances[dialect] +# self.user = user if user else config["user"] +# self.password = password if password else config["password"] +# self.port = port if port else config["port"] +# self.host = host if host else config["host"] +# self.database = database +# self.conn = None +# self.logs = logs + +# @property +# def tables(self): +# res = self._execute_sql( +# PgStatements.GET_TABLES.format(schema_name="public"), fetchall=True +# ) +# return [t[0] for t in res] + +# def _execute_sql( +# self, +# sql, +# args=None, +# fetchone=False, +# fetchmany=False, +# fetchall=False, +# mutation=True, +# bulk: bool = False, +# affected_rows: bool = False, +# ): +# # do we need to log the executed SQL? +# if self.logs: +# print(sql) + +# with self.conn.cursor() as cursor: +# if args is None: +# cursor.execute(sql) +# else: +# ( +# cursor.executemany(sql, vars_list=args) +# if bulk +# else cursor.execute(sql, vars=args) +# ) +# # options +# if bulk or affected_rows: +# result = cursor.rowcount +# else: +# if fetchmany: +# result = cursor.fetchmany() +# elif fetchall: +# result = cursor.fetchall() +# elif fetchone: +# result = cursor.fetchone() +# else: +# result = None +# if mutation: +# self.conn.commit() + +# return result + +# def connect(self): +# try: +# self.conn = psycopg2.connect( +# host=self.host, +# database=self.database, +# user=self.user, +# password=self.password, +# port=self.port, +# ) +# return self.conn +# except Exception as e: +# raise Exception(e) + +# def connect_and_sync( +# self, models: list[Model], drop=False, force=False, alter=False +# ): +# try: +# self.conn = psycopg2.connect( +# host=self.host, +# database=self.database, +# user=self.user, +# password=self.password, +# port=self.port, +# ) +# for model in models: +# if drop or force: +# self._execute_sql(model._drop_sql()) +# self._execute_sql(model._create_sql()) +# elif alter: +# pass +# else: +# self._execute_sql(model._create_sql(ignore_exists=True)) +# return self.conn, self.tables + +# except Exception as e: +# raise Exception(e) + +# def sync(self, models: list[Model], drop=False, force=False, alter=False): +# try: +# for model in models: +# if drop or force: +# self._execute_sql(model._drop_sql()) +# self._execute_sql(model._create_sql()) +# elif alter: +# pass +# else: +# self._execute_sql(model._create_sql(ignore_exists=True)) + +# return self.tables +# except Exception as e: +# raise Exception(e) + +# def create(self, instance: Model): +# sql, values = instance._get_insert_one_stm() +# row = self._execute_sql(sql, args=tuple(values), fetchone=True) +# return row[0] + +# def create_bulk(self, instances: list[Model]): +# columns = None +# placeholders = None +# data = list() +# for instance in instances: +# ( +# column_names, +# placeholder_values, +# _values, +# ) = instance._get_insert_bulk_attrs() +# if columns is None: +# columns = column_names +# if placeholders is None: +# placeholders = placeholder_values + +# data.append(_values) +# sql, values = instance._get_insert_bulk_smt(placeholders, columns, data) +# row_count = self._execute_sql(sql, args=tuple(values), fetchall=True, bulk=True) +# return row_count + +# def find_all(self, instance: Model): +# fields = list() +# for name, field in inspect.getmembers(instance): +# if isinstance(field, Column): +# fields.append(name) +# sql, _, __ = instance._get_select_where_stm(fields) +# data = list() +# rows = self._execute_sql(sql, fetchall=True) +# for row in rows: +# res = dict(zip(fields, row)) +# data.append(instance(**res)) +# return data + +# def find_many(self, instance: Model, filters: dict = {}): +# fields = list() +# for name, field in inspect.getmembers(instance): +# if isinstance(field, Column): +# fields.append(name) +# sql, _, params = instance._get_select_where_stm(fields, filters) +# data = list() +# rows = self._execute_sql(sql, args=params, fetchall=True) +# for row in rows: +# res = dict(zip(fields, row)) +# data.append(instance(**res)) +# return data + +# def find_by_pk(self, instance: Model, pk, options: dict = {}): +# # what is the name of the primary key column? +# """ +# SELECT +# posts.post_id, +# posts.content, +# posts.created_at, +# users.user_id, +# users.username +# FROM +# posts +# JOIN +# users ON posts.user_id = users.user_id +# WHERE +# posts.post_id = 1; -- Replace 1 with the specific post_id you are interested in +# """ +# pk_name = "id" +# fields = list() +# for name, field in inspect.getmembers(instance): +# if ( +# isinstance(field, Column) +# or isinstance(field, ForeignKeyColumn) +# or isinstance(field, CreatedAtColumn) +# or isinstance(field, UpdatedAtColumn) +# ): +# fields.append(name) +# elif isinstance(field, PrimaryKeyColumn): +# pk_name = name +# fields.append(name) +# sql, fields = instance._get_select_by_pk_stm(pk, pk_name, fields=fields) +# row = self._execute_sql(sql, fetchone=True) +# return None if row is None else instance(**dict(zip(fields, row))) + +# def find_one(self, instance: Model, filters: dict = {}): +# fields = list() +# for name, field in inspect.getmembers(instance): +# if isinstance(field, Column): +# fields.append(name) +# sql, _, params = instance._get_select_where_stm(fields, filters) +# row = self._execute_sql(sql, args=params, fetchone=True) +# return None if row is None else instance(**dict(zip(fields, row))) + +# def delete_bulk(self, instance: Model, filters: dict = {}): +# sql, params = instance._get_delete_bulk_where_stm(filters) +# affected_rows = self._execute_sql( +# sql, args=params, affected_rows=True, fetchall=True +# ) +# return affected_rows + +# def delete_one(self, instance: Model, filters: dict = {}): +# pk = None +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk = name +# sql, params = instance._get_delete_where_stm(pk=pk, args=filters) +# affected_rows = self._execute_sql(sql, args=params, affected_rows=True) +# return affected_rows + +# def delete_by_pk(self, instance: Model, pk): +# # what is the name of the primary key column? +# pk_name = "id" +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk_name = name + +# sql, pk = instance._get_delete_by_pk_stm(pk, pk_name) +# affected_rows = self._execute_sql( +# sql, args=(pk,), affected_rows=True, fetchall=True +# ) +# return affected_rows + +# def update_by_pk(self, instance: Model, pk, values: dict = {}): +# pk_name = "id" +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk_name = name +# sql, values = instance._get_update_by_pk_stm(pk_name, values) +# values.append(pk) +# affected_rows = self._execute_sql(sql, args=values, affected_rows=True) +# return affected_rows + +# def update_one(self, instance: Model, filters: dict = {}, values: dict = {}): +# pk_name = "id" +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk_name = name +# sql, new_values, filter_values = instance._get_update_one_stm( +# pk_name, filters, values +# ) +# args = [*new_values, *filter_values] +# affected_rows = self._execute_sql(sql, args=args, affected_rows=True) +# return affected_rows + +# def update_bulk(self, instance: Model, filters: dict = {}, values: dict = {}): +# sql, new_values, filter_values = instance._get_update_bulk_where_stm( +# filters, values +# ) +# args = [*new_values, *filter_values] +# affected_rows = self._execute_sql(sql, args=args, affected_rows=True) +# return affected_rows diff --git a/dataloom/exceptions/__init__.py b/dataloom/exceptions/__init__.py index 5eccfd4..5c330bc 100644 --- a/dataloom/exceptions/__init__.py +++ b/dataloom/exceptions/__init__.py @@ -6,5 +6,9 @@ class TooManyPkException(Exception): pass -class UnsupportedDialect(ValueError): +class UnsupportedDialectException(ValueError): + pass + + +class UnsupportedTypeException(ValueError): pass diff --git a/dataloom/model/__init__.py b/dataloom/model/__init__.py index e69de29..6943579 100644 --- a/dataloom/model/__init__.py +++ b/dataloom/model/__init__.py @@ -0,0 +1,103 @@ +from dataloom.exceptions import UnsupportedDialectException +from dataloom.statements import GetStatement +import inspect +from dataloom.model.column import ( + PrimaryKeyColumn, + TableColumn, +) + + +class Model: + def __init__(self, **args) -> None: + self._data = {} + for k, v in args.items(): + self._data[k] = v + + def __getattribute__(self, key: str): + _data = object.__getattribute__(self, "_data") + if key in _data: + return _data[key] + return object.__getattribute__(self, key) + + @classmethod + def _create_sql(cls, dialect: str, ignore_exists=True): + sql = GetStatement( + dialect=dialect, model=cls, table_name=cls._get_table_name() + )._get_create_table_command + return sql + + @classmethod + def _get_table_name(self): + __tablename__ = None + for _, field in inspect.getmembers(self): + if isinstance(field, TableColumn): + __tablename__ = field.name + return ( + f"{self.__name__.lower()}" if __tablename__ is None else f"{__tablename__}" + ) + + @classmethod + def _get_pk_attributes(cls, dialect: str): + pk = None + pk_type = "BIGSERIAL" + for name, field in inspect.getmembers(cls): + if isinstance(field, PrimaryKeyColumn): + pk = name + pk_type = field.sql_type(dialect) + return pk, pk_type + + @classmethod + def _drop_sql(cls, dialect: str): + if dialect == "postgres" or "mysql" or "sqlite": + sql = GetStatement( + dialect=dialect, model=cls, table_name=cls._get_table_name() + )._get_drop_table_command.format(table_name=cls._get_table_name()) + + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return sql + + +# class IModel[T](ABC): + +# @abstractmethod +# def find_one(self, filters: dict = {}) -> T: +# raise NotImplemented + +# @abstractmethod +# def create(self, TModel: T) -> None: +# raise NotImplemented + + +# @dataclass(kw_only=True) +# class Model[T](IModel[T]): + +# def _get_pk_attributes(self): +# pk = None +# pk_type = "BIGSERIAL" +# for name, field in inspect.getmembers(self.model): +# if isinstance(field, PrimaryKeyColumn): +# pk = name +# pk_type = field.sql_type +# return pk, pk_type + + +# def __create_table(self) -> None: +# [dialect, cursor, _] = self.instance + +# self._execute_sql(sql) + +# def __init__[Y](self, model: T, instance: Y) -> None: +# super().__init__() +# self.model = model +# self.instance = instance +# self.logging = instance[-1] +# self.__create_table() + +# def create(self, TModel: T) -> None: +# pass + +# def find_one(self, filters: dict = {}) -> T: +# pass diff --git a/dataloom/model/column.py b/dataloom/model/column.py index 7f839fe..e15f7c5 100644 --- a/dataloom/model/column.py +++ b/dataloom/model/column.py @@ -1,4 +1,6 @@ -from dataloom.types import POSTGRES_SQL_TYPES +from dataloom.types import POSTGRES_SQL_TYPES, MYSQL_SQL_TYPES, SQLITE3_SQL_TYPES +from dataclasses import dataclass +from dataloom.exceptions import UnsupportedTypeException, UnsupportedDialectException class CreatedAtColumn: @@ -23,6 +25,11 @@ def updated_at(self): ) +@dataclass +class TableColumn: + name: str + + class ForeignKeyColumn: def __init__( self, @@ -38,18 +45,50 @@ def __init__( self.onUpdate = onUpdate self.type = type - @property - def sql_type(self): - if self.type in POSTGRES_SQL_TYPES: - return POSTGRES_SQL_TYPES[self.type] + def sql_type(self, dialect: str): + if dialect == "postgres": + if self.type in POSTGRES_SQL_TYPES: + return ( + f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" + if self.length + else POSTGRES_SQL_TYPES[self.type] + ) + else: + types = POSTGRES_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + + elif dialect == "mysql": + if self.type in MYSQL_SQL_TYPES: + return ( + f"{MYSQL_SQL_TYPES[self.type]}({self.length})" + if self.length + else MYSQL_SQL_TYPES[self.type] + ) + else: + types = MYSQL_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + elif dialect == "sqlite": + if self.type in SQLITE3_SQL_TYPES: + return SQLITE3_SQL_TYPES[self.type] + else: + types = SQLITE3_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) else: - raise ValueError(f"Unsupported column type: {self.type}") + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) class PrimaryKeyColumn: def __init__( self, - type: str = "bigserial", + type: str, length: int | None = None, auto_increment: bool = False, nullable: bool = False, @@ -77,18 +116,46 @@ def unique_constraint(self): def nullable_constraint(self): return "NOT NULL" if not self.nullable else "" - @property - def sql_type(self): - if self.type in POSTGRES_SQL_TYPES: - if self.auto_increment: - return "BIGSERIAL" - return ( - f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" - if self.length - else POSTGRES_SQL_TYPES[self.type] + def sql_type(self, dialect: str): + if dialect == "postgres": + if self.type in POSTGRES_SQL_TYPES: + if self.auto_increment: + return "BIGSERIAL" + return ( + f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" + if self.length + else POSTGRES_SQL_TYPES[self.type] + ) + else: + types = POSTGRES_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" ) + + elif dialect == "mysql": + if self.type in MYSQL_SQL_TYPES: + return ( + f"{MYSQL_SQL_TYPES[self.type]}({self.length})" + if self.length + else MYSQL_SQL_TYPES[self.type] + ) + else: + types = MYSQL_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + elif dialect == "sqlite": + if self.type in SQLITE3_SQL_TYPES: + return SQLITE3_SQL_TYPES[self.type] + else: + types = SQLITE3_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) else: - raise ValueError(f"Unsupported column type: {self.type}") + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) class Column: @@ -124,13 +191,41 @@ def default_constraint(self): "DEFAULT '{default}'".format(default=self.default) if self.default else "" ) - @property - def sql_type(self): - if self.type in POSTGRES_SQL_TYPES: - return ( - f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" - if self.length - else POSTGRES_SQL_TYPES[self.type] + def sql_type(self, dialect: str): + if dialect == "postgres": + if self.type in POSTGRES_SQL_TYPES: + return ( + f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" + if self.length + else POSTGRES_SQL_TYPES[self.type] + ) + else: + types = POSTGRES_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" ) + + elif dialect == "mysql": + if self.type in MYSQL_SQL_TYPES: + return ( + f"{MYSQL_SQL_TYPES[self.type]}({self.length})" + if self.length + else MYSQL_SQL_TYPES[self.type] + ) + else: + types = MYSQL_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + elif dialect == "sqlite": + if self.type in SQLITE3_SQL_TYPES: + return SQLITE3_SQL_TYPES[self.type] + else: + types = SQLITE3_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) else: - raise ValueError(f"Unsupported column type: {self.type}") + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) diff --git a/dataloom/model/model.py b/dataloom/model/model.py index 8cd9512..e295314 100644 --- a/dataloom/model/model.py +++ b/dataloom/model/model.py @@ -6,7 +6,7 @@ ForeignKeyColumn, PrimaryKeyColumn, ) -from dataloom.model.statements import Statements +from dataloom.model.statements import PgStatements from dataloom.exceptions import * import inspect from datetime import datetime @@ -30,10 +30,6 @@ def __getattribute__(self, key: str): return _data[key] return object.__getattribute__(self, key) - # def __setattr__(self, __name: str, __value: Any) -> None: - # if __name in self._data: - # self._data[__name] = __value - @classmethod def _get_name(cls): __tablename__ = None @@ -58,7 +54,7 @@ def _get_pk_attributes(cls): @classmethod def _drop_sql(cls): - sql = Statements.DROP_TABLE.format(table_name=cls._get_name()) + sql = PgStatements.DROP_TABLE.format(table_name=cls._get_name()) return sql @classmethod @@ -130,11 +126,11 @@ def _create_sql(cls, ignore_exists=True): fields = [*user_fields, *predefined_fields] fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) sql = ( - Statements.CREATE_NEW_TABLE.format( + PgStatements.CREATE_NEW_TABLE.format( table_name=cls._get_name(), fields_name=fields_name ) if not ignore_exists - else Statements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + else PgStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( table_name=cls._get_name(), fields_name=fields_name ) ) @@ -178,7 +174,7 @@ def _get_insert_one_stm(self): placeholders.append("%s") elif isinstance(field, PrimaryKeyColumn): pk = f'"{_name}"' - sql = Statements.INSERT_COMMAND_ONE.format( + sql = PgStatements.INSERT_COMMAND_ONE.format( table_name=self.__class__._get_name(), column_name=", ".join([f'"{f}"' for f in fields]), placeholder_values=", ".join(placeholders), @@ -197,7 +193,7 @@ def _get_select_by_pk_stm(cls, pk, pk_name: str = "id", fields: list = []): or isinstance(field, PrimaryKeyColumn) ) and name not in fields: fields.append(name) - sql = Statements.SELECT_BY_PK.format( + sql = PgStatements.SELECT_BY_PK.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), pk=pk, @@ -207,7 +203,7 @@ def _get_select_by_pk_stm(cls, pk, pk_name: str = "id", fields: list = []): @classmethod def _get_delete_by_pk_stm(cls, pk, pk_name: str = "id"): - sql = Statements.DELETE_BY_PK.format( + sql = PgStatements.DELETE_BY_PK.format( table_name=cls._get_name(), pk="%s", # mask it to avoid SQL Injection pk_name=pk_name, @@ -218,7 +214,7 @@ def _get_delete_by_pk_stm(cls, pk, pk_name: str = "id"): def _get_insert_bulk_smt(cls, placeholders, columns, data): column_names = columns placeholders = placeholders - sql = Statements.INSERT_COMMAND_MANY.format( + sql = PgStatements.INSERT_COMMAND_MANY.format( column_names=column_names, table_name=cls._get_name(), placeholder_values=placeholders, @@ -236,7 +232,7 @@ def _get_select_one_stm(cls, pk, pk_name: str = "id", fields: list = []): or isinstance(field, PrimaryKeyColumn) ) and name not in fields: fields.append(name) - sql = Statements.SELECT_BY_PK.format( + sql = PgStatements.SELECT_BY_PK.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), pk=pk, @@ -262,12 +258,12 @@ def _get_select_where_stm(cls, fields: list = [], args: dict = {}): filters.append(f"{key} = %s") params.append(value) if len(filters) == 0: - sql = Statements.SELECT_COMMAND.format( + sql = PgStatements.SELECT_COMMAND.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), ) else: - sql = Statements.SELECT_WHERE_COMMAND.format( + sql = PgStatements.SELECT_WHERE_COMMAND.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), filters=" AND ".join(filters), @@ -282,11 +278,11 @@ def _get_delete_where_stm(cls, pk: str = "id", args: dict = {}): filters.append(f"{key} = %s") params.append(value) if len(filters) == 0: - sql = Statements.DELETE_ALL_COMMAND.format( + sql = PgStatements.DELETE_ALL_COMMAND.format( table_name=cls._get_name(), ) else: - sql = Statements.DELETE_ONE_WHERE_COMMAND.format( + sql = PgStatements.DELETE_ONE_WHERE_COMMAND.format( table_name=cls._get_name(), filters=" AND ".join(filters), pk_name=pk ) return sql, params @@ -299,11 +295,11 @@ def _get_delete_bulk_where_stm(cls, args: dict = {}): filters.append(f"{key} = %s") params.append(value) if len(filters) == 0: - sql = Statements.DELETE_ALL_COMMAND.format( + sql = PgStatements.DELETE_ALL_COMMAND.format( table_name=cls._get_name(), ) else: - sql = Statements.DELETE_BULK_WHERE_COMMAND.format( + sql = PgStatements.DELETE_BULK_WHERE_COMMAND.format( table_name=cls._get_name(), filters=" AND ".join(filters), ) @@ -326,7 +322,7 @@ def _get_update_by_pk_stm(cls, pk_name: str = "id", args: dict = {}): placeholders.append(f'"{updatedAtColumName}" = %s') values.append(current_time_stamp) - sql = Statements.UPDATE_BY_PK_COMMAND.format( + sql = PgStatements.UPDATE_BY_PK_COMMAND.format( table_name=cls._get_name(), pk="%s", pk_name=pk_name, @@ -358,7 +354,7 @@ def _get_update_one_stm( placeholder_values.append(f'"{updatedAtColumName}" = %s') values.append(current_time_stamp) - sql = Statements.UPDATE_BY_ONE_COMMAND.format( + sql = PgStatements.UPDATE_BY_ONE_COMMAND.format( table_name=cls._get_name(), pk_name=pk_name, placeholder_values=", ".join(placeholder_values), @@ -388,7 +384,7 @@ def _get_update_bulk_where_stm(cls, filters: dict = {}, args: dict = {}): placeholder_values.append(f'"{updatedAtColumName}" = %s') values.append(current_time_stamp) - sql = Statements.UPDATE_BULK_WHERE_COMMAND.format( + sql = PgStatements.UPDATE_BULK_WHERE_COMMAND.format( table_name=cls._get_name(), placeholder_values=", ".join(placeholder_values), placeholder_filters=", ".join([i[0] for i in placeholder_filters]), diff --git a/dataloom/model/statements.py b/dataloom/model/statements.py index 52042ae..f71845b 100644 --- a/dataloom/model/statements.py +++ b/dataloom/model/statements.py @@ -1,4 +1,31 @@ -class Statements: +class MySqlStatements: + + # dropping table + DROP_TABLE = "DROP TABLE IF EXISTS {table_name};" + # getting tables + GET_TABLES = "SHOW TABLES;" + + # creating table + CREATE_NEW_TABLE = "CREATE TABLE {table_name} ({fields_name});" + CREATE_NEW_TABLE_IF_NOT_EXITS = ( + "CREATE TABLE IF NOT EXISTS {table_name} ({fields_name});" + ) + + +class Sqlite3Statements: + # dropping table + DROP_TABLE = "DROP TABLE IF EXISTS {table_name};" + # getting tables + GET_TABLES = "SELECT name FROM sqlite_master WHERE type='{type}';" + + # creating table + CREATE_NEW_TABLE = "CREATE TABLE {table_name} ({fields_name});" + CREATE_NEW_TABLE_IF_NOT_EXITS = ( + "CREATE TABLE IF NOT EXISTS {table_name} ({fields_name});" + ) + + +class PgStatements: # updates UPDATE_BY_PK_COMMAND = ( "UPDATE {table_name} SET {placeholder_values} WHERE {pk_name} = {pk};" diff --git a/dataloom/statements/__init__.py b/dataloom/statements/__init__.py new file mode 100644 index 0000000..8c8cb96 --- /dev/null +++ b/dataloom/statements/__init__.py @@ -0,0 +1,297 @@ +from dataclasses import dataclass +from dataloom.model.column import ( + PrimaryKeyColumn, + Column, + CreatedAtColumn, + ForeignKeyColumn, + UpdatedAtColumn, +) +import inspect +from typing import Optional +from dataloom.model.statements import * +import re +from dataloom.exceptions import ( + UnsupportedDialectException, + PkNotDefinedException, + TooManyPkException, +) + + +@dataclass(kw_only=True) +class GetStatement[T](): + def __init__( + self, + dialect: str, + model: Optional[T] = None, + table_name: Optional[str] = None, + ignore_exists: bool = True, + ) -> None: + self.dialect = dialect + self.model = model + self.table_name = table_name + self.ignore_exists = ignore_exists + + @property + def _get_drop_table_command(self) -> Optional[str]: + if self.dialect == "postgres": + sql = PgStatements.DROP_TABLE.format(table_name=f'"{self.table_name}"') + elif self.dialect == "mysql": + sql = MySqlStatements.DROP_TABLE.format(table_name=f"`{self.table_name}`") + elif self.dialect == "sqlite": + sql = Sqlite3Statements.DROP_TABLE.format(table_name=self.table_name) + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return sql + + @property + def _get_tables_command(self) -> Optional[str]: + if self.dialect == "postgres": + sql = PgStatements.GET_TABLES.format(schema_name="public") + elif self.dialect == "mysql": + sql = MySqlStatements.GET_TABLES + elif self.dialect == "sqlite": + sql = Sqlite3Statements.GET_TABLES.format(type="table") + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return sql + + @property + def _get_create_table_command(self) -> Optional[str]: + # is the primary key defined in this table? + pks = list() + user_fields = list() + predefined_fields = list() + if self.dialect == "postgres": + for name, field in inspect.getmembers(self.model): + if isinstance(field, PrimaryKeyColumn): + pks.append(f'"{name}"') + _values = re.sub( + r"\s+", + " ", + "{_type} PRIMARY KEY {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + default=field.default_constraint, + nullable=field.nullable_constraint, + unique=field.unique_constraint, + ).strip(), + ) + user_fields.append((f'"{name}"', _values)) + elif isinstance(field, Column): + _values = re.sub( + r"\s+", + " ", + "{_type} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + unique=field.unique_constraint, + nullable=field.nullable_constraint, + default=field.default_constraint, + ).strip(), + ) + user_fields.append((f'"{name}"', _values)) + elif isinstance(field, CreatedAtColumn): + predefined_fields.append((f'"{name}"', field.created_at)) + elif isinstance(field, UpdatedAtColumn): + predefined_fields.append((f'"{name}"', field.updated_at)) + elif isinstance(field, ForeignKeyColumn): + # qns: + # 1. what is the pk in the parent table? + # 2. what is the type of the parent table pk? + # 3. what is the name of the parent table? + pk, pk_type = field.table._get_pk_attributes() + parent_table_name = "field.table._get_name()" + predefined_fields.append( + ( + f'"{name}"', + '{pk_type} {nullable} REFERENCES {parent_table_name}("{pk}") ON DELETE {onDelete} ON UPDATE {onUpdate}'.format( + onDelete=field.onDelete, + onUpdate=field.onUpdate, + pk_type=pk_type, + parent_table_name=f'"{parent_table_name}"', + pk=pk, + nullable="NOT NULL" if field.required else "NULL", + ), + ) + ) + + # do we have a single primary key or not? + if len(pks) == 0: + raise PkNotDefinedException( + "Your table does not have a primary key column." + ) + if len(pks) > 1: + raise TooManyPkException( + f"You have defined many field as primary keys which is not allowed. Fields ({', '.join(pks)}) are primary keys." + ) + fields = [*user_fields, *predefined_fields] + fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) + sql = ( + PgStatements.CREATE_NEW_TABLE.format( + table_name=f'"{self.table_name}"', fields_name=fields_name + ) + if not self.ignore_exists + else PgStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + table_name=f'"{self.table_name}"', fields_name=fields_name + ) + ) + return sql + elif self.dialect == "mysql": + for name, field in inspect.getmembers(self.model): + if isinstance(field, PrimaryKeyColumn): + pks.append(f"`{name}`") + _values = re.sub( + r"\s+", + " ", + "{_type} PRIMARY KEY {auto_increment} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + default=field.default_constraint, + nullable=field.nullable_constraint, + unique=field.unique_constraint, + auto_increment=( + "AUTO_INCREMENT" if field.auto_increment else "" + ), + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, Column): + _values = re.sub( + r"\s+", + " ", + "{_type} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + unique=field.unique_constraint, + nullable=field.nullable_constraint, + default=field.default_constraint, + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, CreatedAtColumn): + predefined_fields.append((f"`{name}`", field.created_at)) + elif isinstance(field, UpdatedAtColumn): + predefined_fields.append((f"`{name}`", field.updated_at)) + elif isinstance(field, ForeignKeyColumn): + # qns: + # 1. what is the pk in the parent table? + # 2. what is the type of the parent table pk? + # 3. what is the name of the parent table? + pk, pk_type = field.table._get_pk_attributes() + parent_table_name = field.table._get_name() + predefined_fields.append( + ( + f"`{name}`", + "{pk_type} {nullable} REFERENCES {parent_table_name}(`{pk}`) ON DELETE {onDelete} ON UPDATE {onUpdate}".format( + onDelete=field.onDelete, + onUpdate=field.onUpdate, + pk_type=pk_type, + parent_table_name=f"`{parent_table_name}`", + pk=pk, + nullable="NOT NULL" if field.required else "NULL", + ), + ) + ) + + # do we have a single primary key or not? + if len(pks) == 0: + raise PkNotDefinedException( + "Your table does not have a primary key column." + ) + if len(pks) > 1: + raise TooManyPkException( + f"You have defined many field as primary keys which is not allowed. Fields ({', '.join(pks)}) are primary keys." + ) + fields = [*user_fields, *predefined_fields] + fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) + sql = ( + MySqlStatements.CREATE_NEW_TABLE.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + if not self.ignore_exists + else MySqlStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + ) + return sql + + elif self.dialect == "sqlite": + for name, field in inspect.getmembers(self.model): + if isinstance(field, PrimaryKeyColumn): + pks.append(f"`{name}`") + _values = re.sub( + r"\s+", + " ", + "{_type} PRIMARY KEY {auto_increment} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + default=field.default_constraint, + nullable=field.nullable_constraint, + unique=field.unique_constraint, + auto_increment=( + "AUTOINCREMENT" if field.auto_increment else "" + ), + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, Column): + _values = re.sub( + r"\s+", + " ", + "{_type} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + unique=field.unique_constraint, + nullable=field.nullable_constraint, + default=field.default_constraint, + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, CreatedAtColumn): + predefined_fields.append((f"`{name}`", field.created_at)) + elif isinstance(field, UpdatedAtColumn): + predefined_fields.append((f"`{name}`", field.updated_at)) + elif isinstance(field, ForeignKeyColumn): + # qns: + # 1. what is the pk in the parent table? + # 2. what is the type of the parent table pk? + # 3. what is the name of the parent table? + pk, pk_type = field.table._get_pk_attributes() + parent_table_name = field.table._get_name() + predefined_fields.append( + ( + f"`{name}`", + "{pk_type} {nullable} REFERENCES {parent_table_name}(`{pk}`) ON DELETE {onDelete} ON UPDATE {onUpdate}".format( + onDelete=field.onDelete, + onUpdate=field.onUpdate, + pk_type=pk_type, + parent_table_name=f"`{parent_table_name}`", + pk=pk, + nullable="NOT NULL" if field.required else "NULL", + ), + ) + ) + + # do we have a single primary key or not? + if len(pks) == 0: + raise PkNotDefinedException( + "Your table does not have a primary key column." + ) + if len(pks) > 1: + raise TooManyPkException( + f"You have defined many field as primary keys which is not allowed. Fields ({', '.join(pks)}) are primary keys." + ) + fields = [*user_fields, *predefined_fields] + fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) + sql = ( + MySqlStatements.CREATE_NEW_TABLE.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + if not self.ignore_exists + else MySqlStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + ) + return sql + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) diff --git a/dataloom/tests/mysql/test_connection_mysql.py b/dataloom/tests/mysql/test_connection_mysql.py new file mode 100644 index 0000000..db2deed --- /dev/null +++ b/dataloom/tests/mysql/test_connection_mysql.py @@ -0,0 +1,71 @@ +import pytest +from mysql import connector + + +class TestConnectionMySQL: + def test_connect_with_non_existing_database(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="non-exists", password="root", user="root" + ) + with pytest.raises(connector.errors.ProgrammingError) as exc_info: + conn = mysql_loom.connect() + conn.close() + assert exc_info.value.msg == "Unknown database 'non-exists'" + assert exc_info.value.errno == 1049 + + def test_connect_with_wrong_password(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="hi", password="user", user="root" + ) + with pytest.raises(connector.errors.ProgrammingError) as exc_info: + conn = mysql_loom.connect() + conn.close() + assert ( + exc_info.value.msg + == "Access denied for user 'root'@'localhost' (using password: YES)" + ) + assert exc_info.value.errno == 1045 + + def test_connect_with_wrong_user(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="hi", password="root", user="hey" + ) + with pytest.raises(connector.errors.ProgrammingError) as exc_info: + conn = mysql_loom.connect() + conn.close() + assert ( + exc_info.value.msg + == "Access denied for user 'hey'@'localhost' (using password: YES)" + ) + assert exc_info.value.errno == 1045 + + def test_connect_with_wrong_dialect(self): + from dataloom import Dataloom, UnsupportedDialectException + + with pytest.raises(UnsupportedDialectException) as exc_info: + mysql_loom = Dataloom( + dialect="peew", database="hi", password="user", user="root" + ) + conn = mysql_loom.connect() + conn.close() + + assert ( + str(exc_info.value) + == "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + def test_connect_correct_connection(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="hi", password="root", user="root" + ) + conn = mysql_loom.connect() + conn.close() + assert conn is not None diff --git a/dataloom/tests/mysql/test_create_tables_mysql.py b/dataloom/tests/mysql/test_create_tables_mysql.py new file mode 100644 index 0000000..e69de29 diff --git a/dataloom/tests/postgres/test_connection_pg.py b/dataloom/tests/postgres/test_connection_pg.py new file mode 100644 index 0000000..f55908e --- /dev/null +++ b/dataloom/tests/postgres/test_connection_pg.py @@ -0,0 +1,72 @@ +import pytest + + +class TestConnectionPG: + def test_connect_with_non_existing_database(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="mew", password="root", user="postgres" + ) + with pytest.raises(Exception) as exc_info: + conn = pg_loom.connect() + conn.close() + assert ( + str(exc_info.value.args[0]).strip() + == 'connection to server at "localhost" (::1), port 5432 failed: FATAL: database "mew" does not exist' + ) + + def test_connect_with_wrong_password(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="hi", password="root-", user="postgres" + ) + with pytest.raises(Exception) as exc_info: + conn = pg_loom.connect() + conn.close() + + assert ( + str(exc_info.value.args[0]).strip() + == 'connection to server at "localhost" (::1), port 5432 failed: FATAL: password authentication failed for user "postgres"' + ) + + def test_connect_with_wrong_user(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="hi", password="root", user="postgre-u" + ) + with pytest.raises(Exception) as exc_info: + conn = pg_loom.connect() + conn.close() + + assert ( + str(exc_info.value.args[0]).strip() + == 'connection to server at "localhost" (::1), port 5432 failed: FATAL: password authentication failed for user "postgre-u"' + ) + + def test_connect_with_wrong_dialect(self): + from dataloom import Dataloom, UnsupportedDialectException + + with pytest.raises(UnsupportedDialectException) as exc_info: + pg_loom = Dataloom( + dialect="peew", database="hi", password="root", user="postgres" + ) + conn = pg_loom.connect() + conn.close() + assert ( + str(exc_info.value) + == "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + def test_connect_correct_connection(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="hi", password="root", user="postgres" + ) + conn = pg_loom.connect() + conn.close() + + assert conn is not None diff --git a/dataloom/tests/sqlite3/test_connection_sqlite.py b/dataloom/tests/sqlite3/test_connection_sqlite.py new file mode 100644 index 0000000..ea608e9 --- /dev/null +++ b/dataloom/tests/sqlite3/test_connection_sqlite.py @@ -0,0 +1,24 @@ +import pytest + + +class TestConnectionSQLite: + def test_connect_with_wrong_dialect(self): + from dataloom import Dataloom, UnsupportedDialectException + + with pytest.raises(UnsupportedDialectException) as exc_info: + sqlite_loom = Dataloom(dialect="hay", database="hi.db") + conn = sqlite_loom.connect() + conn.close() + + assert ( + str(exc_info.value) + == "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + def test_connect_correct_connection(self): + from dataloom import Dataloom + + sqlite_loom = Dataloom(dialect="sqlite", database="hi.db") + conn = sqlite_loom.connect() + conn.close() + assert conn is not None diff --git a/dataloom/tests/test_connection.py b/dataloom/tests/test_connection.py index 05bba6e..61cfaf5 100644 --- a/dataloom/tests/test_connection.py +++ b/dataloom/tests/test_connection.py @@ -1,36 +1,36 @@ -class TestConnectionPG: - def test_connect(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - conn = db.connect() - assert conn.status == 1 - conn.close() - - def test_connect_sync(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - from dataloom.model.model import Model - from dataloom.model.column import Column, PrimaryKeyColumn - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - - db = Database(database, password=password, user=user) - conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) - - assert len(tables) == 2 - assert conn.status == 1 - assert tables == ["users", "posts"] - - conn.close() +# class TestConnectionPG: +# def test_connect(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) +# conn = db.connect() +# assert conn.status == 1 +# conn.close() + +# def test_connect_sync(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user +# from dataloom.model.model import Model +# from dataloom.model.column import Column, PrimaryKeyColumn + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") + +# db = Database(database, password=password, user=user) +# conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) + +# assert len(tables) == 2 +# assert conn.status == 1 +# assert tables == ["users", "posts"] + +# conn.close() diff --git a/dataloom/tests/test_create_table.py b/dataloom/tests/test_create_table.py index ff0dcbb..5caaa0c 100644 --- a/dataloom/tests/test_create_table.py +++ b/dataloom/tests/test_create_table.py @@ -1,122 +1,122 @@ -class TestCreatingTablePG: - def test_2_pk_error(self): - from dataloom.db import Database - from dataloom.model.column import Column, PrimaryKeyColumn - from dataloom.model.model import Model - from dataloom.keys import password, database, user - import pytest - - db = Database(database, password=password, user=user) - conn = db.connect() - - class User(Model): - __tablename__ = "users" - _id = PrimaryKeyColumn(type="bigint", auto_increment=True) - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - with pytest.raises(Exception) as exc_info: - db.sync([User], drop=True, force=True) - - assert ( - str(exc_info.value) - == 'You have defined many field as primary keys which is not allowed. Fields ("_id", "id") are primary keys.' - ) - conn.close() - - def test_no_pk_error(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model - from dataloom.keys import password, database, user - import pytest - - db = Database(database, password=password, user=user) - conn = db.connect() - - class User(Model): - __tablename__ = "users" - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - with pytest.raises(Exception) as exc_info: - db.sync([User], drop=True, force=True) - - assert str(exc_info.value) == "Your table does not have a primary key column." - conn.close() - - def test_table_name(self): - from dataloom.db import Database - from dataloom.model.column import Column, PrimaryKeyColumn - from dataloom.model.model import Model - from dataloom.keys import database, password, user - - db = Database(database, password=password, user=user) - conn = db.connect() - - class Todos(Model): - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - completed = Column(type="boolean", default=False) - title = Column(type="varchar", length=255, nullable=False) - - class User(Model): - __tablename__ = "users" - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - assert User._get_name() == '"users"' - assert Todos._get_name() == '"todos"' - conn.close() - - def test_connect_sync(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - from dataloom.model.model import Model - from dataloom.model.column import Column, PrimaryKeyColumn - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - - db = Database(database, password=password, user=user) - conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) - - assert len(tables) == 2 - assert conn.status == 1 - assert sorted(tables) == sorted(["users", "posts"]) - - conn.close() - - def test_syncing_tables(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - from dataloom.model.model import Model - from dataloom.model.column import Column, PrimaryKeyColumn - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - - db = Database(database, password=password, user=user) - conn = db.connect() - tables = db.sync([User, Post], drop=True, force=True) - assert len(tables) == 2 - assert tables == ["users", "posts"] - conn.close() +# class TestCreatingTablePG: +# def test_2_pk_error(self): +# from dataloom.db import Database +# from dataloom.model.column import Column, PrimaryKeyColumn +# from dataloom.model.model import Model +# from dataloom.keys import password, database, user +# import pytest + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class User(Model): +# __tablename__ = "users" +# _id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# with pytest.raises(Exception) as exc_info: +# db.sync([User], drop=True, force=True) + +# assert ( +# str(exc_info.value) +# == 'You have defined many field as primary keys which is not allowed. Fields ("_id", "id") are primary keys.' +# ) +# conn.close() + +# def test_no_pk_error(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model +# from dataloom.keys import password, database, user +# import pytest + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class User(Model): +# __tablename__ = "users" +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# with pytest.raises(Exception) as exc_info: +# db.sync([User], drop=True, force=True) + +# assert str(exc_info.value) == "Your table does not have a primary key column." +# conn.close() + +# def test_table_name(self): +# from dataloom.db import Database +# from dataloom.model.column import Column, PrimaryKeyColumn +# from dataloom.model.model import Model +# from dataloom.keys import database, password, user + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class Todos(Model): +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# completed = Column(type="boolean", default=False) +# title = Column(type="varchar", length=255, nullable=False) + +# class User(Model): +# __tablename__ = "users" +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# assert User._get_name() == '"users"' +# assert Todos._get_name() == '"todos"' +# conn.close() + +# def test_connect_sync(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user +# from dataloom.model.model import Model +# from dataloom.model.column import Column, PrimaryKeyColumn + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") + +# db = Database(database, password=password, user=user) +# conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) + +# assert len(tables) == 2 +# assert conn.status == 1 +# assert sorted(tables) == sorted(["users", "posts"]) + +# conn.close() + +# def test_syncing_tables(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user +# from dataloom.model.model import Model +# from dataloom.model.column import Column, PrimaryKeyColumn + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") + +# db = Database(database, password=password, user=user) +# conn = db.connect() +# tables = db.sync([User, Post], drop=True, force=True) +# assert len(tables) == 2 +# assert tables == ["users", "posts"] +# conn.close() diff --git a/dataloom/tests/test_delete.py b/dataloom/tests/test_delete.py index a48a6b1..cc10469 100644 --- a/dataloom/tests/test_delete.py +++ b/dataloom/tests/test_delete.py @@ -1,102 +1,102 @@ -class TestDeletingOnPG: - def test_delete_by_pk_single_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user +# class TestDeletingOnPG: +# def test_delete_by_pk_single_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - affected_rows_1 = db.delete_by_pk(User, userId) - affected_rows_2 = db.delete_by_pk(User, 89) - assert affected_rows_1 == 1 - assert affected_rows_2 == 0 - conn.close() +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# affected_rows_1 = db.delete_by_pk(User, userId) +# affected_rows_2 = db.delete_by_pk(User, 89) +# assert affected_rows_1 == 1 +# assert affected_rows_2 == 0 +# conn.close() - def test_delete_one_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user +# def test_delete_one_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=False, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=False, length=255) - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - db.create_bulk( - [ - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - ] - ) - db.delete_one(User, {"name": "Crispen"}) - rows_1 = db.find_many(User, {"name": "Crispen"}) - db.delete_one(User, {"name": "Crispen", "id": 9}) - rows_2 = db.find_many(User, {"name": "Crispen"}) - db.delete_one(User, {"name": "Crispen", "id": 2}) - rows_3 = db.find_many(User, {"name": "Crispen"}) - assert len(rows_1) == 2 - assert len(rows_2) == 2 - assert len(rows_3) == 1 - conn.close() +# db.create_bulk( +# [ +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# ] +# ) +# db.delete_one(User, {"name": "Crispen"}) +# rows_1 = db.find_many(User, {"name": "Crispen"}) +# db.delete_one(User, {"name": "Crispen", "id": 9}) +# rows_2 = db.find_many(User, {"name": "Crispen"}) +# db.delete_one(User, {"name": "Crispen", "id": 2}) +# rows_3 = db.find_many(User, {"name": "Crispen"}) +# assert len(rows_1) == 2 +# assert len(rows_2) == 2 +# assert len(rows_3) == 1 +# conn.close() - def test_delete_bulk_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user +# def test_delete_bulk_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=False, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=False, length=255) - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - db.create_bulk( - [ - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - ] - ) - db.delete_bulk(User, {"name": "Crispen"}) - rows_1 = db.find_many(User, {"name": "Crispen"}) - db.create_bulk( - [ - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - ] - ) - db.delete_bulk(User, {"name": "Crispen", "id": 99}) - rows_2 = db.find_many(User, {"name": "Crispen"}) - db.delete_bulk(User, {"name": "Crispen", "id": 5}) - rows_3 = db.find_many(User, {"name": "Crispen"}) - assert len(rows_1) == 0 - assert len(rows_2) == 3 - assert len(rows_3) == 2 - conn.close() +# db.create_bulk( +# [ +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# ] +# ) +# db.delete_bulk(User, {"name": "Crispen"}) +# rows_1 = db.find_many(User, {"name": "Crispen"}) +# db.create_bulk( +# [ +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# ] +# ) +# db.delete_bulk(User, {"name": "Crispen", "id": 99}) +# rows_2 = db.find_many(User, {"name": "Crispen"}) +# db.delete_bulk(User, {"name": "Crispen", "id": 5}) +# rows_3 = db.find_many(User, {"name": "Crispen"}) +# assert len(rows_1) == 0 +# assert len(rows_2) == 3 +# assert len(rows_3) == 2 +# conn.close() diff --git a/dataloom/tests/test_insert.py b/dataloom/tests/test_insert.py index 94fe8b6..2f6bfb2 100644 --- a/dataloom/tests/test_insert.py +++ b/dataloom/tests/test_insert.py @@ -1,107 +1,107 @@ -class TestInsertingOnPG: - def test_insetting_single_document(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - conn = db.connect() - - class Users(Model): - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - db.sync([Users], drop=True, force=True) - - user = Users(name="Crispen", username="heyy") - userId = db.create(user) - assert userId == 1 - conn.close() - - def test_insetting_multiple_document(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - CreatedAtColumn, - UpdatedAtColumn, - ForeignKeyColumn, - PrimaryKeyColumn, - ) - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") - - conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - posts = [ - Post(userId=userId, title="What are you thinking"), - Post(userId=userId, title="What are you doing?"), - Post(userId=userId, title="What are we?"), - ] - row_count = db.create_bulk(posts) - - assert row_count == 3 - conn.close() - - def test_relational_instances(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - CreatedAtColumn, - UpdatedAtColumn, - ForeignKeyColumn, - PrimaryKeyColumn, - ) - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") - - db = Database("hi", password="root", user="postgres") - conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - postId = db.create( - Post(userId=userId, title="What are you thinking"), - ) - now = db.find_by_pk(Post, postId) - assert userId == now.userId - conn.close() +# class TestInsertingOnPG: +# def test_insetting_single_document(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class Users(Model): +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# db.sync([Users], drop=True, force=True) + +# user = Users(name="Crispen", username="heyy") +# userId = db.create(user) +# assert userId == 1 +# conn.close() + +# def test_insetting_multiple_document(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# CreatedAtColumn, +# UpdatedAtColumn, +# ForeignKeyColumn, +# PrimaryKeyColumn, +# ) +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() +# userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") + +# conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# posts = [ +# Post(userId=userId, title="What are you thinking"), +# Post(userId=userId, title="What are you doing?"), +# Post(userId=userId, title="What are we?"), +# ] +# row_count = db.create_bulk(posts) + +# assert row_count == 3 +# conn.close() + +# def test_relational_instances(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# CreatedAtColumn, +# UpdatedAtColumn, +# ForeignKeyColumn, +# PrimaryKeyColumn, +# ) +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() +# userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") + +# db = Database("hi", password="root", user="postgres") +# conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# postId = db.create( +# Post(userId=userId, title="What are you thinking"), +# ) +# now = db.find_by_pk(Post, postId) +# assert userId == now.userId +# conn.close() diff --git a/dataloom/tests/test_query.py b/dataloom/tests/test_query.py index 1be42f8..013bdd6 100644 --- a/dataloom/tests/test_query.py +++ b/dataloom/tests/test_query.py @@ -1,70 +1,70 @@ -class TestQueryingPG: - def test_querying_data(self): - from dataloom.db import Database - from dataloom.model.column import Column, PrimaryKeyColumn - from dataloom.model.model import Model - from dataloom.keys import password, database, user +# class TestQueryingPG: +# def test_querying_data(self): +# from dataloom.db import Database +# from dataloom.model.column import Column, PrimaryKeyColumn +# from dataloom.model.model import Model +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column( - type="varchar", - unique=True, - length=255, - ) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column( +# type="varchar", +# unique=True, +# length=255, +# ) - def __str__(self) -> str: - return f"User<{self.id}>" +# def __str__(self) -> str: +# return f"User<{self.id}>" - def __repr__(self) -> str: - return f"User<{self.id}>" +# def __repr__(self) -> str: +# return f"User<{self.id}>" - def to_dict(self): - return {"id": self.id, "name": self.name, "username": self.username} +# def to_dict(self): +# return {"id": self.id, "name": self.name, "username": self.username} - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - db.create(user) - users = db.find_all(User) - me = db.find_by_pk(User, 1).to_dict() - her = db.find_by_pk(User, 2) - many_0 = db.find_many(User, {"id": 5}) - many_1 = db.find_many(User, {"id": 1}) - many_2 = db.find_many(User, {"id": 1, "name": "Crispen"}) - many_3 = db.find_many(User, {"id": 5, "username": "hey"}) - many_4 = db.find_many(User, {"name": "Crispen", "username": "heyy"}) +# user = User(name="Crispen", username="heyy") +# db.create(user) +# users = db.find_all(User) +# me = db.find_by_pk(User, 1).to_dict() +# her = db.find_by_pk(User, 2) +# many_0 = db.find_many(User, {"id": 5}) +# many_1 = db.find_many(User, {"id": 1}) +# many_2 = db.find_many(User, {"id": 1, "name": "Crispen"}) +# many_3 = db.find_many(User, {"id": 5, "username": "hey"}) +# many_4 = db.find_many(User, {"name": "Crispen", "username": "heyy"}) - one_0 = db.find_one(User, {"id": 5}) - one_1 = db.find_one(User, {"id": 1}) - one_2 = db.find_one(User, {"id": 1, "name": "Crispen"}) - one_3 = db.find_one(User, {"id": 5, "username": "hey"}) - one_4 = db.find_one(User, {"name": "Crispen", "username": "heyy"}) +# one_0 = db.find_one(User, {"id": 5}) +# one_1 = db.find_one(User, {"id": 1}) +# one_2 = db.find_one(User, {"id": 1, "name": "Crispen"}) +# one_3 = db.find_one(User, {"id": 5, "username": "hey"}) +# one_4 = db.find_one(User, {"name": "Crispen", "username": "heyy"}) - assert [u.to_dict() for u in users] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] - assert [u.to_dict() for u in many_0] == [] - assert [u.to_dict() for u in many_3] == [] - assert [u.to_dict() for u in many_1] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] - assert [u.to_dict() for u in many_2] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] - assert [u.to_dict() for u in many_4] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] +# assert [u.to_dict() for u in users] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] +# assert [u.to_dict() for u in many_0] == [] +# assert [u.to_dict() for u in many_3] == [] +# assert [u.to_dict() for u in many_1] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] +# assert [u.to_dict() for u in many_2] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] +# assert [u.to_dict() for u in many_4] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] - assert one_0 is None - assert one_3 is None - assert one_1.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} - assert one_2.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} - assert one_4.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} - assert her is None - conn.close() +# assert one_0 is None +# assert one_3 is None +# assert one_1.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} +# assert one_2.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} +# assert one_4.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} +# assert her is None +# conn.close() diff --git a/dataloom/tests/test_update.py b/dataloom/tests/test_update.py index 4d87697..3868a63 100644 --- a/dataloom/tests/test_update.py +++ b/dataloom/tests/test_update.py @@ -1,112 +1,112 @@ -class TestDeletingOnPG: - def test_update_by_pk_single_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - PrimaryKeyColumn, - CreatedAtColumn, - UpdatedAtColumn, - ) - from dataloom.keys import password, database, user - import time, pytest +# class TestDeletingOnPG: +# def test_update_by_pk_single_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# PrimaryKeyColumn, +# CreatedAtColumn, +# UpdatedAtColumn, +# ) +# from dataloom.keys import password, database, user +# import time, pytest - db = Database(database, password=password, user=user) +# db = Database(database, password=password, user=user) - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) - createdAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() +# createdAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() - db.connect_and_sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - time.sleep(0.2) - res_1 = db.update_by_pk(User, 1, {"name": "Tinashe Gari"}) - me = db.find_by_pk(User, userId) - assert res_1 == 1 - assert me.createdAt != me.updatedAt - with pytest.raises(Exception) as exc_info: - db.update_by_pk(User, 1, {"haha": "Gari"}) - assert exc_info.value.pgcode == "42703" - with pytest.raises(Exception) as exc_info: - db.update_by_pk(User, 1, {"id": "Gari"}) - assert exc_info.value.pgcode == "25P02" +# db.connect_and_sync([User], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# time.sleep(0.2) +# res_1 = db.update_by_pk(User, 1, {"name": "Tinashe Gari"}) +# me = db.find_by_pk(User, userId) +# assert res_1 == 1 +# assert me.createdAt != me.updatedAt +# with pytest.raises(Exception) as exc_info: +# db.update_by_pk(User, 1, {"haha": "Gari"}) +# assert exc_info.value.pgcode == "42703" +# with pytest.raises(Exception) as exc_info: +# db.update_by_pk(User, 1, {"id": "Gari"}) +# assert exc_info.value.pgcode == "25P02" - def test_update_one_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - PrimaryKeyColumn, - CreatedAtColumn, - UpdatedAtColumn, - ) - from dataloom.keys import password, database, user - import time, pytest +# def test_update_one_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# PrimaryKeyColumn, +# CreatedAtColumn, +# UpdatedAtColumn, +# ) +# from dataloom.keys import password, database, user +# import time, pytest - db = Database(database, password=password, user=user) +# db = Database(database, password=password, user=user) - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) - createdAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() +# createdAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() - db.connect_and_sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - time.sleep(0.2) - res_1 = db.update_one(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) - me = db.find_by_pk(User, userId) - assert res_1 == 1 - assert me.createdAt != me.updatedAt - with pytest.raises(Exception) as exc_info: - db.update_one(User, {"name": "Crispen"}, {"haha": "Gari"}) - assert exc_info.value.pgcode == "42703" - with pytest.raises(Exception) as exc_info: - db.update_one(User, {"name": "HH"}, {"name": "Gari"}) - assert exc_info.value.pgcode == "25P02" +# db.connect_and_sync([User], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# time.sleep(0.2) +# res_1 = db.update_one(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) +# me = db.find_by_pk(User, userId) +# assert res_1 == 1 +# assert me.createdAt != me.updatedAt +# with pytest.raises(Exception) as exc_info: +# db.update_one(User, {"name": "Crispen"}, {"haha": "Gari"}) +# assert exc_info.value.pgcode == "42703" +# with pytest.raises(Exception) as exc_info: +# db.update_one(User, {"name": "HH"}, {"name": "Gari"}) +# assert exc_info.value.pgcode == "25P02" - def test_update_bulk_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - PrimaryKeyColumn, - CreatedAtColumn, - UpdatedAtColumn, - ) - from dataloom.keys import password, database, user - import time, pytest +# def test_update_bulk_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# PrimaryKeyColumn, +# CreatedAtColumn, +# UpdatedAtColumn, +# ) +# from dataloom.keys import password, database, user +# import time, pytest - db = Database(database, password=password, user=user) +# db = Database(database, password=password, user=user) - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=False, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=False, length=255) - createdAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() +# createdAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() - db.connect_and_sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - db.create_bulk([user for u in range(4)]) - res_1 = db.update_bulk(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) - assert res_1 == 4 +# db.connect_and_sync([User], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# db.create_bulk([user for u in range(4)]) +# res_1 = db.update_bulk(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) +# assert res_1 == 4 - with pytest.raises(Exception) as exc_info: - db.update_bulk(User, {"name": "Crispen"}, {"haha": "Gari"}) - assert exc_info.value.pgcode == "42703" - with pytest.raises(Exception) as exc_info: - db.update_bulk(User, {"name": "HH"}, {"name": "Gari"}) - assert exc_info.value.pgcode == "25P02" +# with pytest.raises(Exception) as exc_info: +# db.update_bulk(User, {"name": "Crispen"}, {"haha": "Gari"}) +# assert exc_info.value.pgcode == "42703" +# with pytest.raises(Exception) as exc_info: +# db.update_bulk(User, {"name": "HH"}, {"name": "Gari"}) +# assert exc_info.value.pgcode == "25P02" diff --git a/dataloom/types/__init__.py b/dataloom/types/__init__.py index 95bb22a..1b7c4c7 100644 --- a/dataloom/types/__init__.py +++ b/dataloom/types/__init__.py @@ -34,3 +34,40 @@ "circle": "CIRCLE", "hstore": "HSTORE", } + + +MYSQL_SQL_TYPES = { + "int": "INT", + "smallint": "SMALLINT", + "bigint": "BIGINT", + "float": "FLOAT", + "double": "DOUBLE", + "numeric": "DECIMAL", + "text": "TEXT", + "varchar": "VARCHAR", + "char": "CHAR", + "boolean": "BOOLEAN", + "date": "DATE", + "time": "TIME", + "timestamp": "TIMESTAMP", + "json": "JSON", + "blob": "BLOB", +} + +SQLITE3_SQL_TYPES = { + "int": "INTEGER", + "smallint": "SMALLINT", + "bigint": "BIGINT", + "float": "REAL", + "double precision": "DOUBLE", + "numeric": "NUMERIC", + "text": "TEXT", + "varchar": "VARCHAR", + "char": "CHAR", + "boolean": "BOOLEAN", + "date": "DATE", + "time": "TIME", + "timestamp": "TIMESTAMP", + "json": "JSON", + "blob": "BLOB", +} diff --git a/hello.sql b/hello.sql new file mode 100644 index 0000000..6edab26 --- /dev/null +++ b/hello.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS "posts" ( + "id" BIGSERIAL UNIQUE NOT NULL, + "title" TEXT, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file diff --git a/hi.db b/hi.db index e69de29..b98f274 100644 Binary files a/hi.db and b/hi.db differ diff --git a/playground.py b/playground.py index efc8b34..c2d433f 100644 --- a/playground.py +++ b/playground.py @@ -1,13 +1,56 @@ -from dataloom import dataloom +from dataloom import Dataloom -# conn = dataloom.connect("postgres", database="hi", password="root", user="postgres") -# conn = dataloom.connect("mysql", database="hi", password="root", user="root") -conn = dataloom.connect("sqlite", database="hi.db") +from dataloom.model import Model +from dataloom.model.column import ( + PrimaryKeyColumn, + Column, + CreatedAtColumn, + UpdatedAtColumn, + TableColumn, +) -print(dir(conn)) -if __name__ == "__main__": - conn.close() - pass +from typing import Optional + + +pg_loom = Dataloom(dialect="postgres", database="hi", password="root", user="postgres") +mysql_loom = Dataloom(dialect="mysql", database="hi", password="root", user="root") +sqlite_loom = Dataloom(dialect="sqlite", database="hi.db") + + +class Post(Model): + __tablename__: Optional[TableColumn] = TableColumn(name="posts") + id: Optional[PrimaryKeyColumn] = PrimaryKeyColumn(type="int", auto_increment=True) + title = Column(type="text") + createdAt = CreatedAtColumn() + updatedAt = UpdatedAtColumn() + + +class User(Model): + __tablename__: Optional[TableColumn] = TableColumn(name="users") + id: Optional[PrimaryKeyColumn] = PrimaryKeyColumn(type="int", auto_increment=True) + title = Column(type="text") + createdAt = CreatedAtColumn() + updatedAt = UpdatedAtColumn() + + +conn = sqlite_loom.connect() + +tables = sqlite_loom.sync([Post, User], drop=True, force=True) +print(tables) + +post = Post( + id=2, +) + + +# instance = [*db, dataloom.logging] + +# Post = Model[TypePost](TypePost, instance=instance) +# Post.create(TypePost(title="Hi")) + + +# dataloom.connect("mysql", database="hi", password="root", user="root") +# dataloom.connect("sqlite", database="hi.db") # from dataloom.db import Database diff --git a/requirements.txt b/requirements.txt index d74722d..19343f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,31 @@ +certifi==2024.2.2 +charset-normalizer==3.3.2 colorama==0.4.6 +docutils==0.20.1 +idna==3.6 +importlib-metadata==7.0.1 iniconfig==2.0.0 +jaraco.classes==3.3.0 +keyring==24.3.0 +markdown-it-py==3.0.0 +mdurl==0.1.2 +more-itertools==10.2.0 mysql-connector-python==8.3.0 +nh3==0.2.15 packaging==23.2 +pkginfo==1.9.6 pluggy==1.4.0 psycopg2==2.9.9 +Pygments==2.17.2 pytest==8.0.0 +pywin32-ctypes==0.2.2 +readme-renderer==42.0 +requests==2.31.0 +requests-toolbelt==1.0.0 +rfc3986==2.0.0 +rich==13.7.0 +twine==4.0.2 typing_extensions==4.9.0 +urllib3==2.2.0 +wheel==0.42.0 +zipp==3.17.0 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..babf0d0 --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +from setuptools import setup, find_packages +import os +import codecs + +here = os.path.abspath(os.path.dirname(__file__)) + +with codecs.open(os.path.join(here, "README.md"), encoding="utf-8") as fh: + LON_DESCRIPTION = "\n" + fh.read() + +VERSION = "0.0.1" +DESCRIPTION = "dataloom stands as a bespoke Object-Relational Mapping (ORM) solution meticulously crafted to empower Python developers in efficiently managing diverse databases. Unlike conventional ORMs, Dataloom has been built from the ground up, providing native support for SQLite3, PostgreSQL, and MySQL. Navigate effortlessly between database engines while enjoying a tailored and performant ORM experience." +# setting up +setup( + name="dataloom", + version=VERSION, + author="Crispen Gari", + author_email="", + description=DESCRIPTION, + long_description_content_type="text/markdown", + long_description=LON_DESCRIPTION, + packages=find_packages(), + install_requires=[ + "mysql-connector-python==8.3.0", + "psycopg2==2.9.9", + "typing_extensions==4.9.0", + ], + keywords=["ORM", "database", "data management", "SQLAlchemy"], + classifiers=[ + "Development Status :: 1 - Planning", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3.12", + "Operating System :: Unix", + "Operating System :: MacOS :: MacOS X", + "Operating System :: Microsoft :: Windows", + ], +)