From 5a3b7077c26cd2fec648ebe49494bc639aeeb932 Mon Sep 17 00:00:00 2001 From: Crispen Gari Date: Fri, 2 Feb 2024 17:25:26 +0200 Subject: [PATCH] supporting-other-dialects --- dataloom/__init__.py | 26 +- dataloom/conn/__init__.py | 44 +- dataloom/constants/__init__.py | 10 +- dataloom/db/__init__.py | 625 +++++++++++------- dataloom/exceptions/__init__.py | 6 +- dataloom/model/__init__.py | 103 +++ dataloom/model/column.py | 145 +++- dataloom/model/model.py | 40 +- dataloom/model/statements.py | 29 +- dataloom/statements/__init__.py | 297 +++++++++ dataloom/tests/mysql/test_connection_mysql.py | 71 ++ .../tests/mysql/test_create_tables_mysql.py | 0 dataloom/tests/postgres/test_connection_pg.py | 72 ++ .../tests/sqlite3/test_connection_sqlite.py | 24 + dataloom/tests/test_connection.py | 72 +- dataloom/tests/test_create_table.py | 244 +++---- dataloom/tests/test_delete.py | 176 ++--- dataloom/tests/test_insert.py | 214 +++--- dataloom/tests/test_query.py | 120 ++-- dataloom/tests/test_update.py | 194 +++--- dataloom/types/__init__.py | 37 ++ hello.sql | 6 + hi.db | Bin 0 -> 24576 bytes playground.py | 59 +- requirements.txt | 23 + setup.py | 36 + 26 files changed, 1844 insertions(+), 829 deletions(-) create mode 100644 dataloom/statements/__init__.py create mode 100644 dataloom/tests/mysql/test_connection_mysql.py create mode 100644 dataloom/tests/mysql/test_create_tables_mysql.py create mode 100644 dataloom/tests/postgres/test_connection_pg.py create mode 100644 dataloom/tests/sqlite3/test_connection_sqlite.py create mode 100644 hello.sql create mode 100644 setup.py diff --git a/dataloom/__init__.py b/dataloom/__init__.py index fc446a0..c48cdef 100644 --- a/dataloom/__init__.py +++ b/dataloom/__init__.py @@ -1,3 +1,27 @@ from dataloom.db import Dataloom +from dataloom import exceptions +from dataloom.model import Model +from dataloom.model.column import ( + PrimaryKeyColumn, + CreatedAtColumn, + ForeignKeyColumn, + UpdatedAtColumn, + Column, +) -dataloom = Dataloom() +Dataloom = Dataloom +# Columns +PrimaryKeyColumn = PrimaryKeyColumn +CreatedAtColumn = CreatedAtColumn +ForeignKeyColumn = ForeignKeyColumn +UpdatedAtColumn = UpdatedAtColumn +Column = Column + + +# exceptions +PkNotDefinedException = exceptions.PkNotDefinedException +TooManyPkException = exceptions.TooManyPkException +UnsupportedDialectException = exceptions.UnsupportedDialectException + +# models +Model = Model diff --git a/dataloom/conn/__init__.py b/dataloom/conn/__init__.py index 443427a..fa98c28 100644 --- a/dataloom/conn/__init__.py +++ b/dataloom/conn/__init__.py @@ -1,8 +1,12 @@ from dataclasses import dataclass, field -from dataloom.exceptions import UnsupportedDialect +from dataloom.exceptions import UnsupportedDialectException from sqlite3.dbapi2 import Connection from os import PathLike from typing_extensions import TypeAlias +from typing import Optional + +from dataloom.constants import instances + StrOrBytesPath: TypeAlias = str | bytes | PathLike[str] | PathLike[bytes] @@ -16,12 +20,12 @@ def connection_options[T](self) -> T: @dataclass(kw_only=True) class PostgresDialect(Dialect): # "postgres://postgres:postgres@localhost:5432/db" - connection_string: str | None = field(default=None) + connection_string: Optional[str] = field(default=None) database: str - user: str | None = field(default="postgres") - host: str | None = field(default="localhost") - port: int | None = field(default=5432) - password: str | None = field(default="postgres") + user: Optional[str] = field(default=instances["postgres"].get("user")) + host: Optional[str] = field(default=instances["postgres"].get("host")) + port: Optional[int] = field(default=instances["postgres"].get("port")) + password: Optional[str] = field(default=instances["postgres"].get("user")) def connection_options[T](self) -> T: return ( @@ -31,12 +35,12 @@ def connection_options[T](self) -> T: @dataclass(kw_only=True) class MySQLDialect(Dialect): - connection_string: str | None = field(default=None) + connection_string: Optional[str] = field(default=None) database: str - user: str | None = field(default="root") - host: str | None = field(default="localhost") - port: int | None = field(default=3306) - password: str | None = field(default="root") + user: Optional[str] = field(default=instances["mysql"].get("user")) + host: Optional[str] = field(default=instances["mysql"].get("host")) + port: Optional[int] = field(default=instances["mysql"].get("port")) + password: Optional[str] = field(default=instances["mysql"].get("password")) def connection_options[T](self) -> T: return ( @@ -46,14 +50,14 @@ def connection_options[T](self) -> T: @dataclass(kw_only=True) class SQLiteDialect(Dialect): - database: StrOrBytesPath = field(default="users.db") - timeout: float | None = field(default=None) - detect_types: int | None = field(default=None) - isolation_level: str | None = field(default=None) + database: StrOrBytesPath = field(default=instances["sqlite"].get("database")) + timeout: Optional[float] = field(default=None) + detect_types: Optional[int] = field(default=None) + isolation_level: Optional[str] = field(default=None) check_same_thread: bool = field(default=None) factory: type[Connection] | None = field(default=None) - cached_statements: int | None = field(default=None) - uri: bool | None = field(default=None) + cached_statements: Optional[int] = field(default=None) + uri: Optional[bool] = field(default=None) def connection_options[T](self) -> T: return vars(self) @@ -62,7 +66,9 @@ def connection_options[T](self) -> T: @dataclass class ConnectionOptionsFactory: @staticmethod - def get_connection_options(dialect, **kwargs): + def get_connection_options(**kwargs): + dialect = kwargs.get("dialect") + kwargs = {k: v for k, v in kwargs.items() if k != "dialect"} if dialect == "postgres": return { k: v @@ -78,6 +84,6 @@ def get_connection_options(dialect, **kwargs): if v is not None } else: - raise UnsupportedDialect( + raise UnsupportedDialectException( "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" ) diff --git a/dataloom/constants/__init__.py b/dataloom/constants/__init__.py index a60a7c0..b377890 100644 --- a/dataloom/constants/__init__.py +++ b/dataloom/constants/__init__.py @@ -4,6 +4,14 @@ "port": 5432, "user": "postgres", "password": "postgres", + "host": "localhost" or "127.0.0.1", + }, + "mysql": { + "type": "mysql", + "port": 3306, + "user": "root", + "password": "root", "host": "127.0.0.1" or "localhost", - } + }, + "sqlite": {"database": "dataloom_instance.db", "type": "sqlite"}, } diff --git a/dataloom/db/__init__.py b/dataloom/db/__init__.py index 9c3a5ca..5634eb3 100644 --- a/dataloom/db/__init__.py +++ b/dataloom/db/__init__.py @@ -3,9 +3,10 @@ import sqlite3 import inspect from dataloom.constants import instances -from dataloom.model.statements import Statements -from dataloom.exceptions import UnsupportedDialect -from dataloom.model.model import Model +from dataloom.model.statements import PgStatements +from dataloom.exceptions import UnsupportedDialectException +from dataloom.model import Model +from dataloom.statements import GetStatement from dataloom.conn import ConnectionOptionsFactory from dataloom.model.column import ( Column, @@ -14,59 +15,52 @@ CreatedAtColumn, ForeignKeyColumn, ) +from typing import Optional class Dataloom: - conn = None - - def connect(self, dialect, **kwargs): - if dialect == "postgres": - options = ConnectionOptionsFactory.get_connection_options(dialect, **kwargs) - self.conn = psycopg2.connect(**options) - return self.conn - elif dialect == "mysql": - options = ConnectionOptionsFactory.get_connection_options(dialect, **kwargs) - self.conn = connector.connect(**options) - return self.conn - elif dialect == "sqlite": - options = ConnectionOptionsFactory.get_connection_options(dialect, **kwargs) - self.conn = ( - sqlite3.connect(options.get("database")) - if "database" in options - else sqlite3.connect(**options) - ) - return self.conn - else: - raise UnsupportedDialect( - "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" - ) - - -class Database: def __init__( self, database: str, - dialect: str = "postgres", - user: str | None = None, - host: str | None = None, - port: int | None = None, - password: str | None = None, - logs: bool = True, + dialect: str, + user: Optional[str] = None, + host: Optional[str] = None, + port: Optional[int] = None, + password: Optional[str] = None, + logging: bool = True, ) -> None: - config = instances[dialect] - self.user = user if user else config["user"] - self.password = password if password else config["password"] - self.port = port if port else config["port"] - self.host = host if host else config["host"] self.database = database self.conn = None - self.logs = logs + self.logging = logging + self.dialect = dialect + try: + config = instances[dialect] + except KeyError: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + if dialect != "sqlite": + self.connection_options = { + "database": self.database, + "dialect": self.dialect, + "user": user if user else config["user"], + "host": host if host else config["host"], + "port": port if port else config["port"], + "password": password if password else config["password"], + } + else: + self.connection_options = { + "database": self.database, + "dialect": self.dialect, + } @property def tables(self): - res = self._execute_sql( - Statements.GET_TABLES.format(schema_name="public"), fetchall=True - ) + sql = GetStatement(self.dialect)._get_tables_command + res = self._execute_sql(sql, fetchall=True) + if self.dialect == "sqlite": + return [t[0] for t in res if not str(t[0]).lower().startswith("sqlite_")] return [t[0] for t in res] def _execute_sql( @@ -81,19 +75,50 @@ def _execute_sql( affected_rows: bool = False, ): # do we need to log the executed SQL? - if self.logs: + if self.logging: print(sql) - - with self.conn.cursor() as cursor: - if args is None: + if self.dialect == "postgres": + with self.conn.cursor() as cursor: + if args is None: + cursor.execute(sql) + else: + if bulk: + cursor.executemany(sql, vars_list=args) + else: + cursor.execute(sql, vars=args) + # options + if bulk or affected_rows: + result = cursor.rowcount + else: + if fetchmany: + result = cursor.fetchmany() + elif fetchall: + result = cursor.fetchall() + elif fetchone: + result = cursor.fetchone() + else: + result = None + if mutation: + self.conn.commit() + elif self.dialect == "mysql": + with self.conn.cursor(buffered=True) as cursor: cursor.execute(sql) + if bulk or affected_rows: + result = cursor.rowcount else: - ( - cursor.executemany(sql, vars_list=args) - if bulk - else cursor.execute(sql, vars=args) - ) - # options + if fetchmany: + result = cursor.fetchmany() + elif fetchall: + result = cursor.fetchall() + elif fetchone: + result = cursor.fetchone() + else: + result = None + if mutation: + self.conn.commit() + elif self.dialect == "sqlite": + cursor = self.conn.cursor() + cursor.execute(sql) if bulk or affected_rows: result = cursor.rowcount else: @@ -107,209 +132,327 @@ def _execute_sql( result = None if mutation: self.conn.commit() + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) return result def connect(self): - try: - self.conn = psycopg2.connect( - host=self.host, - database=self.database, - user=self.user, - password=self.password, - port=self.port, + if self.dialect == "postgres": + options = ConnectionOptionsFactory.get_connection_options( + **self.connection_options ) - return self.conn - except Exception as e: - raise Exception(e) - - def connect_and_sync( - self, models: list[Model], drop=False, force=False, alter=False - ): - try: - self.conn = psycopg2.connect( - host=self.host, - database=self.database, - user=self.user, - password=self.password, - port=self.port, + with psycopg2.connect(**options) as conn: + self.conn = conn + elif self.dialect == "mysql": + options = ConnectionOptionsFactory.get_connection_options( + **self.connection_options ) - for model in models: - if drop or force: - self._execute_sql(model._drop_sql()) - self._execute_sql(model._create_sql()) - elif alter: - pass - else: - self._execute_sql(model._create_sql(ignore_exists=True)) - return self.conn, self.tables + self.conn = connector.connect(**options) - except Exception as e: - raise Exception(e) + elif self.dialect == "sqlite": + options = ConnectionOptionsFactory.get_connection_options( + **self.connection_options + ) + if "database" in options: + with sqlite3.connect(options.get("database")) as conn: + self.conn = conn + else: + with sqlite3.connect(**options) as conn: + self.conn = conn + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return self.conn def sync(self, models: list[Model], drop=False, force=False, alter=False): try: for model in models: if drop or force: - self._execute_sql(model._drop_sql()) - self._execute_sql(model._create_sql()) + self._execute_sql(model._drop_sql(dialect=self.dialect)) + self._execute_sql(model._create_sql(dialect=self.dialect)) elif alter: pass else: - self._execute_sql(model._create_sql(ignore_exists=True)) - + self._execute_sql( + model._create_sql(dialect=self.dialect, ignore_exists=True) + ) return self.tables except Exception as e: raise Exception(e) - def create(self, instance: Model): - sql, values = instance._get_insert_one_stm() - row = self._execute_sql(sql, args=tuple(values), fetchone=True) - return row[0] - - def create_bulk(self, instances: list[Model]): - columns = None - placeholders = None - data = list() - for instance in instances: - ( - column_names, - placeholder_values, - _values, - ) = instance._get_insert_bulk_attrs() - if columns is None: - columns = column_names - if placeholders is None: - placeholders = placeholder_values - - data.append(_values) - sql, values = instance._get_insert_bulk_smt(placeholders, columns, data) - row_count = self._execute_sql(sql, args=tuple(values), fetchall=True, bulk=True) - return row_count - - def find_all(self, instance: Model): - fields = list() - for name, field in inspect.getmembers(instance): - if isinstance(field, Column): - fields.append(name) - sql, _, __ = instance._get_select_where_stm(fields) - data = list() - rows = self._execute_sql(sql, fetchall=True) - for row in rows: - res = dict(zip(fields, row)) - data.append(instance(**res)) - return data - - def find_many(self, instance: Model, filters: dict = {}): - fields = list() - for name, field in inspect.getmembers(instance): - if isinstance(field, Column): - fields.append(name) - sql, _, params = instance._get_select_where_stm(fields, filters) - data = list() - rows = self._execute_sql(sql, args=params, fetchall=True) - for row in rows: - res = dict(zip(fields, row)) - data.append(instance(**res)) - return data - - def find_by_pk(self, instance: Model, pk, options: dict = {}): - # what is the name of the primary key column? - """ - SELECT - posts.post_id, - posts.content, - posts.created_at, - users.user_id, - users.username - FROM - posts - JOIN - users ON posts.user_id = users.user_id - WHERE - posts.post_id = 1; -- Replace 1 with the specific post_id you are interested in - """ - pk_name = "id" - fields = list() - for name, field in inspect.getmembers(instance): - if ( - isinstance(field, Column) - or isinstance(field, ForeignKeyColumn) - or isinstance(field, CreatedAtColumn) - or isinstance(field, UpdatedAtColumn) - ): - fields.append(name) - elif isinstance(field, PrimaryKeyColumn): - pk_name = name - fields.append(name) - sql, fields = instance._get_select_by_pk_stm(pk, pk_name, fields=fields) - row = self._execute_sql(sql, fetchone=True) - return None if row is None else instance(**dict(zip(fields, row))) - - def find_one(self, instance: Model, filters: dict = {}): - fields = list() - for name, field in inspect.getmembers(instance): - if isinstance(field, Column): - fields.append(name) - sql, _, params = instance._get_select_where_stm(fields, filters) - row = self._execute_sql(sql, args=params, fetchone=True) - return None if row is None else instance(**dict(zip(fields, row))) - - def delete_bulk(self, instance: Model, filters: dict = {}): - sql, params = instance._get_delete_bulk_where_stm(filters) - affected_rows = self._execute_sql( - sql, args=params, affected_rows=True, fetchall=True - ) - return affected_rows - - def delete_one(self, instance: Model, filters: dict = {}): - pk = None - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk = name - sql, params = instance._get_delete_where_stm(pk=pk, args=filters) - affected_rows = self._execute_sql(sql, args=params, affected_rows=True) - return affected_rows - - def delete_by_pk(self, instance: Model, pk): - # what is the name of the primary key column? - pk_name = "id" - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk_name = name - - sql, pk = instance._get_delete_by_pk_stm(pk, pk_name) - affected_rows = self._execute_sql( - sql, args=(pk,), affected_rows=True, fetchall=True - ) - return affected_rows - - def update_by_pk(self, instance: Model, pk, values: dict = {}): - pk_name = "id" - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk_name = name - sql, values = instance._get_update_by_pk_stm(pk_name, values) - values.append(pk) - affected_rows = self._execute_sql(sql, args=values, affected_rows=True) - return affected_rows - - def update_one(self, instance: Model, filters: dict = {}, values: dict = {}): - pk_name = "id" - for name, field in inspect.getmembers(instance): - if isinstance(field, PrimaryKeyColumn): - pk_name = name - sql, new_values, filter_values = instance._get_update_one_stm( - pk_name, filters, values - ) - args = [*new_values, *filter_values] - affected_rows = self._execute_sql(sql, args=args, affected_rows=True) - return affected_rows - - def update_bulk(self, instance: Model, filters: dict = {}, values: dict = {}): - sql, new_values, filter_values = instance._get_update_bulk_where_stm( - filters, values - ) - args = [*new_values, *filter_values] - affected_rows = self._execute_sql(sql, args=args, affected_rows=True) - return affected_rows + +# class Database: +# def __init__( +# self, +# database: str, +# dialect: str = "postgres", +# user: str | None = None, +# host: str | None = None, +# port: int | None = None, +# password: str | None = None, +# logs: bool = True, +# ) -> None: +# config = instances[dialect] +# self.user = user if user else config["user"] +# self.password = password if password else config["password"] +# self.port = port if port else config["port"] +# self.host = host if host else config["host"] +# self.database = database +# self.conn = None +# self.logs = logs + +# @property +# def tables(self): +# res = self._execute_sql( +# PgStatements.GET_TABLES.format(schema_name="public"), fetchall=True +# ) +# return [t[0] for t in res] + +# def _execute_sql( +# self, +# sql, +# args=None, +# fetchone=False, +# fetchmany=False, +# fetchall=False, +# mutation=True, +# bulk: bool = False, +# affected_rows: bool = False, +# ): +# # do we need to log the executed SQL? +# if self.logs: +# print(sql) + +# with self.conn.cursor() as cursor: +# if args is None: +# cursor.execute(sql) +# else: +# ( +# cursor.executemany(sql, vars_list=args) +# if bulk +# else cursor.execute(sql, vars=args) +# ) +# # options +# if bulk or affected_rows: +# result = cursor.rowcount +# else: +# if fetchmany: +# result = cursor.fetchmany() +# elif fetchall: +# result = cursor.fetchall() +# elif fetchone: +# result = cursor.fetchone() +# else: +# result = None +# if mutation: +# self.conn.commit() + +# return result + +# def connect(self): +# try: +# self.conn = psycopg2.connect( +# host=self.host, +# database=self.database, +# user=self.user, +# password=self.password, +# port=self.port, +# ) +# return self.conn +# except Exception as e: +# raise Exception(e) + +# def connect_and_sync( +# self, models: list[Model], drop=False, force=False, alter=False +# ): +# try: +# self.conn = psycopg2.connect( +# host=self.host, +# database=self.database, +# user=self.user, +# password=self.password, +# port=self.port, +# ) +# for model in models: +# if drop or force: +# self._execute_sql(model._drop_sql()) +# self._execute_sql(model._create_sql()) +# elif alter: +# pass +# else: +# self._execute_sql(model._create_sql(ignore_exists=True)) +# return self.conn, self.tables + +# except Exception as e: +# raise Exception(e) + +# def sync(self, models: list[Model], drop=False, force=False, alter=False): +# try: +# for model in models: +# if drop or force: +# self._execute_sql(model._drop_sql()) +# self._execute_sql(model._create_sql()) +# elif alter: +# pass +# else: +# self._execute_sql(model._create_sql(ignore_exists=True)) + +# return self.tables +# except Exception as e: +# raise Exception(e) + +# def create(self, instance: Model): +# sql, values = instance._get_insert_one_stm() +# row = self._execute_sql(sql, args=tuple(values), fetchone=True) +# return row[0] + +# def create_bulk(self, instances: list[Model]): +# columns = None +# placeholders = None +# data = list() +# for instance in instances: +# ( +# column_names, +# placeholder_values, +# _values, +# ) = instance._get_insert_bulk_attrs() +# if columns is None: +# columns = column_names +# if placeholders is None: +# placeholders = placeholder_values + +# data.append(_values) +# sql, values = instance._get_insert_bulk_smt(placeholders, columns, data) +# row_count = self._execute_sql(sql, args=tuple(values), fetchall=True, bulk=True) +# return row_count + +# def find_all(self, instance: Model): +# fields = list() +# for name, field in inspect.getmembers(instance): +# if isinstance(field, Column): +# fields.append(name) +# sql, _, __ = instance._get_select_where_stm(fields) +# data = list() +# rows = self._execute_sql(sql, fetchall=True) +# for row in rows: +# res = dict(zip(fields, row)) +# data.append(instance(**res)) +# return data + +# def find_many(self, instance: Model, filters: dict = {}): +# fields = list() +# for name, field in inspect.getmembers(instance): +# if isinstance(field, Column): +# fields.append(name) +# sql, _, params = instance._get_select_where_stm(fields, filters) +# data = list() +# rows = self._execute_sql(sql, args=params, fetchall=True) +# for row in rows: +# res = dict(zip(fields, row)) +# data.append(instance(**res)) +# return data + +# def find_by_pk(self, instance: Model, pk, options: dict = {}): +# # what is the name of the primary key column? +# """ +# SELECT +# posts.post_id, +# posts.content, +# posts.created_at, +# users.user_id, +# users.username +# FROM +# posts +# JOIN +# users ON posts.user_id = users.user_id +# WHERE +# posts.post_id = 1; -- Replace 1 with the specific post_id you are interested in +# """ +# pk_name = "id" +# fields = list() +# for name, field in inspect.getmembers(instance): +# if ( +# isinstance(field, Column) +# or isinstance(field, ForeignKeyColumn) +# or isinstance(field, CreatedAtColumn) +# or isinstance(field, UpdatedAtColumn) +# ): +# fields.append(name) +# elif isinstance(field, PrimaryKeyColumn): +# pk_name = name +# fields.append(name) +# sql, fields = instance._get_select_by_pk_stm(pk, pk_name, fields=fields) +# row = self._execute_sql(sql, fetchone=True) +# return None if row is None else instance(**dict(zip(fields, row))) + +# def find_one(self, instance: Model, filters: dict = {}): +# fields = list() +# for name, field in inspect.getmembers(instance): +# if isinstance(field, Column): +# fields.append(name) +# sql, _, params = instance._get_select_where_stm(fields, filters) +# row = self._execute_sql(sql, args=params, fetchone=True) +# return None if row is None else instance(**dict(zip(fields, row))) + +# def delete_bulk(self, instance: Model, filters: dict = {}): +# sql, params = instance._get_delete_bulk_where_stm(filters) +# affected_rows = self._execute_sql( +# sql, args=params, affected_rows=True, fetchall=True +# ) +# return affected_rows + +# def delete_one(self, instance: Model, filters: dict = {}): +# pk = None +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk = name +# sql, params = instance._get_delete_where_stm(pk=pk, args=filters) +# affected_rows = self._execute_sql(sql, args=params, affected_rows=True) +# return affected_rows + +# def delete_by_pk(self, instance: Model, pk): +# # what is the name of the primary key column? +# pk_name = "id" +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk_name = name + +# sql, pk = instance._get_delete_by_pk_stm(pk, pk_name) +# affected_rows = self._execute_sql( +# sql, args=(pk,), affected_rows=True, fetchall=True +# ) +# return affected_rows + +# def update_by_pk(self, instance: Model, pk, values: dict = {}): +# pk_name = "id" +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk_name = name +# sql, values = instance._get_update_by_pk_stm(pk_name, values) +# values.append(pk) +# affected_rows = self._execute_sql(sql, args=values, affected_rows=True) +# return affected_rows + +# def update_one(self, instance: Model, filters: dict = {}, values: dict = {}): +# pk_name = "id" +# for name, field in inspect.getmembers(instance): +# if isinstance(field, PrimaryKeyColumn): +# pk_name = name +# sql, new_values, filter_values = instance._get_update_one_stm( +# pk_name, filters, values +# ) +# args = [*new_values, *filter_values] +# affected_rows = self._execute_sql(sql, args=args, affected_rows=True) +# return affected_rows + +# def update_bulk(self, instance: Model, filters: dict = {}, values: dict = {}): +# sql, new_values, filter_values = instance._get_update_bulk_where_stm( +# filters, values +# ) +# args = [*new_values, *filter_values] +# affected_rows = self._execute_sql(sql, args=args, affected_rows=True) +# return affected_rows diff --git a/dataloom/exceptions/__init__.py b/dataloom/exceptions/__init__.py index 5eccfd4..5c330bc 100644 --- a/dataloom/exceptions/__init__.py +++ b/dataloom/exceptions/__init__.py @@ -6,5 +6,9 @@ class TooManyPkException(Exception): pass -class UnsupportedDialect(ValueError): +class UnsupportedDialectException(ValueError): + pass + + +class UnsupportedTypeException(ValueError): pass diff --git a/dataloom/model/__init__.py b/dataloom/model/__init__.py index e69de29..6943579 100644 --- a/dataloom/model/__init__.py +++ b/dataloom/model/__init__.py @@ -0,0 +1,103 @@ +from dataloom.exceptions import UnsupportedDialectException +from dataloom.statements import GetStatement +import inspect +from dataloom.model.column import ( + PrimaryKeyColumn, + TableColumn, +) + + +class Model: + def __init__(self, **args) -> None: + self._data = {} + for k, v in args.items(): + self._data[k] = v + + def __getattribute__(self, key: str): + _data = object.__getattribute__(self, "_data") + if key in _data: + return _data[key] + return object.__getattribute__(self, key) + + @classmethod + def _create_sql(cls, dialect: str, ignore_exists=True): + sql = GetStatement( + dialect=dialect, model=cls, table_name=cls._get_table_name() + )._get_create_table_command + return sql + + @classmethod + def _get_table_name(self): + __tablename__ = None + for _, field in inspect.getmembers(self): + if isinstance(field, TableColumn): + __tablename__ = field.name + return ( + f"{self.__name__.lower()}" if __tablename__ is None else f"{__tablename__}" + ) + + @classmethod + def _get_pk_attributes(cls, dialect: str): + pk = None + pk_type = "BIGSERIAL" + for name, field in inspect.getmembers(cls): + if isinstance(field, PrimaryKeyColumn): + pk = name + pk_type = field.sql_type(dialect) + return pk, pk_type + + @classmethod + def _drop_sql(cls, dialect: str): + if dialect == "postgres" or "mysql" or "sqlite": + sql = GetStatement( + dialect=dialect, model=cls, table_name=cls._get_table_name() + )._get_drop_table_command.format(table_name=cls._get_table_name()) + + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return sql + + +# class IModel[T](ABC): + +# @abstractmethod +# def find_one(self, filters: dict = {}) -> T: +# raise NotImplemented + +# @abstractmethod +# def create(self, TModel: T) -> None: +# raise NotImplemented + + +# @dataclass(kw_only=True) +# class Model[T](IModel[T]): + +# def _get_pk_attributes(self): +# pk = None +# pk_type = "BIGSERIAL" +# for name, field in inspect.getmembers(self.model): +# if isinstance(field, PrimaryKeyColumn): +# pk = name +# pk_type = field.sql_type +# return pk, pk_type + + +# def __create_table(self) -> None: +# [dialect, cursor, _] = self.instance + +# self._execute_sql(sql) + +# def __init__[Y](self, model: T, instance: Y) -> None: +# super().__init__() +# self.model = model +# self.instance = instance +# self.logging = instance[-1] +# self.__create_table() + +# def create(self, TModel: T) -> None: +# pass + +# def find_one(self, filters: dict = {}) -> T: +# pass diff --git a/dataloom/model/column.py b/dataloom/model/column.py index 7f839fe..e15f7c5 100644 --- a/dataloom/model/column.py +++ b/dataloom/model/column.py @@ -1,4 +1,6 @@ -from dataloom.types import POSTGRES_SQL_TYPES +from dataloom.types import POSTGRES_SQL_TYPES, MYSQL_SQL_TYPES, SQLITE3_SQL_TYPES +from dataclasses import dataclass +from dataloom.exceptions import UnsupportedTypeException, UnsupportedDialectException class CreatedAtColumn: @@ -23,6 +25,11 @@ def updated_at(self): ) +@dataclass +class TableColumn: + name: str + + class ForeignKeyColumn: def __init__( self, @@ -38,18 +45,50 @@ def __init__( self.onUpdate = onUpdate self.type = type - @property - def sql_type(self): - if self.type in POSTGRES_SQL_TYPES: - return POSTGRES_SQL_TYPES[self.type] + def sql_type(self, dialect: str): + if dialect == "postgres": + if self.type in POSTGRES_SQL_TYPES: + return ( + f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" + if self.length + else POSTGRES_SQL_TYPES[self.type] + ) + else: + types = POSTGRES_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + + elif dialect == "mysql": + if self.type in MYSQL_SQL_TYPES: + return ( + f"{MYSQL_SQL_TYPES[self.type]}({self.length})" + if self.length + else MYSQL_SQL_TYPES[self.type] + ) + else: + types = MYSQL_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + elif dialect == "sqlite": + if self.type in SQLITE3_SQL_TYPES: + return SQLITE3_SQL_TYPES[self.type] + else: + types = SQLITE3_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) else: - raise ValueError(f"Unsupported column type: {self.type}") + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) class PrimaryKeyColumn: def __init__( self, - type: str = "bigserial", + type: str, length: int | None = None, auto_increment: bool = False, nullable: bool = False, @@ -77,18 +116,46 @@ def unique_constraint(self): def nullable_constraint(self): return "NOT NULL" if not self.nullable else "" - @property - def sql_type(self): - if self.type in POSTGRES_SQL_TYPES: - if self.auto_increment: - return "BIGSERIAL" - return ( - f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" - if self.length - else POSTGRES_SQL_TYPES[self.type] + def sql_type(self, dialect: str): + if dialect == "postgres": + if self.type in POSTGRES_SQL_TYPES: + if self.auto_increment: + return "BIGSERIAL" + return ( + f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" + if self.length + else POSTGRES_SQL_TYPES[self.type] + ) + else: + types = POSTGRES_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" ) + + elif dialect == "mysql": + if self.type in MYSQL_SQL_TYPES: + return ( + f"{MYSQL_SQL_TYPES[self.type]}({self.length})" + if self.length + else MYSQL_SQL_TYPES[self.type] + ) + else: + types = MYSQL_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + elif dialect == "sqlite": + if self.type in SQLITE3_SQL_TYPES: + return SQLITE3_SQL_TYPES[self.type] + else: + types = SQLITE3_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) else: - raise ValueError(f"Unsupported column type: {self.type}") + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) class Column: @@ -124,13 +191,41 @@ def default_constraint(self): "DEFAULT '{default}'".format(default=self.default) if self.default else "" ) - @property - def sql_type(self): - if self.type in POSTGRES_SQL_TYPES: - return ( - f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" - if self.length - else POSTGRES_SQL_TYPES[self.type] + def sql_type(self, dialect: str): + if dialect == "postgres": + if self.type in POSTGRES_SQL_TYPES: + return ( + f"{POSTGRES_SQL_TYPES[self.type]}({self.length})" + if self.length + else POSTGRES_SQL_TYPES[self.type] + ) + else: + types = POSTGRES_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" ) + + elif dialect == "mysql": + if self.type in MYSQL_SQL_TYPES: + return ( + f"{MYSQL_SQL_TYPES[self.type]}({self.length})" + if self.length + else MYSQL_SQL_TYPES[self.type] + ) + else: + types = MYSQL_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) + elif dialect == "sqlite": + if self.type in SQLITE3_SQL_TYPES: + return SQLITE3_SQL_TYPES[self.type] + else: + types = SQLITE3_SQL_TYPES.keys() + raise UnsupportedTypeException( + f"Unsupported column type: {self.type} for dialect '{dialect}' supported types are ({', '.join(types)})" + ) else: - raise ValueError(f"Unsupported column type: {self.type}") + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) diff --git a/dataloom/model/model.py b/dataloom/model/model.py index 8cd9512..e295314 100644 --- a/dataloom/model/model.py +++ b/dataloom/model/model.py @@ -6,7 +6,7 @@ ForeignKeyColumn, PrimaryKeyColumn, ) -from dataloom.model.statements import Statements +from dataloom.model.statements import PgStatements from dataloom.exceptions import * import inspect from datetime import datetime @@ -30,10 +30,6 @@ def __getattribute__(self, key: str): return _data[key] return object.__getattribute__(self, key) - # def __setattr__(self, __name: str, __value: Any) -> None: - # if __name in self._data: - # self._data[__name] = __value - @classmethod def _get_name(cls): __tablename__ = None @@ -58,7 +54,7 @@ def _get_pk_attributes(cls): @classmethod def _drop_sql(cls): - sql = Statements.DROP_TABLE.format(table_name=cls._get_name()) + sql = PgStatements.DROP_TABLE.format(table_name=cls._get_name()) return sql @classmethod @@ -130,11 +126,11 @@ def _create_sql(cls, ignore_exists=True): fields = [*user_fields, *predefined_fields] fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) sql = ( - Statements.CREATE_NEW_TABLE.format( + PgStatements.CREATE_NEW_TABLE.format( table_name=cls._get_name(), fields_name=fields_name ) if not ignore_exists - else Statements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + else PgStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( table_name=cls._get_name(), fields_name=fields_name ) ) @@ -178,7 +174,7 @@ def _get_insert_one_stm(self): placeholders.append("%s") elif isinstance(field, PrimaryKeyColumn): pk = f'"{_name}"' - sql = Statements.INSERT_COMMAND_ONE.format( + sql = PgStatements.INSERT_COMMAND_ONE.format( table_name=self.__class__._get_name(), column_name=", ".join([f'"{f}"' for f in fields]), placeholder_values=", ".join(placeholders), @@ -197,7 +193,7 @@ def _get_select_by_pk_stm(cls, pk, pk_name: str = "id", fields: list = []): or isinstance(field, PrimaryKeyColumn) ) and name not in fields: fields.append(name) - sql = Statements.SELECT_BY_PK.format( + sql = PgStatements.SELECT_BY_PK.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), pk=pk, @@ -207,7 +203,7 @@ def _get_select_by_pk_stm(cls, pk, pk_name: str = "id", fields: list = []): @classmethod def _get_delete_by_pk_stm(cls, pk, pk_name: str = "id"): - sql = Statements.DELETE_BY_PK.format( + sql = PgStatements.DELETE_BY_PK.format( table_name=cls._get_name(), pk="%s", # mask it to avoid SQL Injection pk_name=pk_name, @@ -218,7 +214,7 @@ def _get_delete_by_pk_stm(cls, pk, pk_name: str = "id"): def _get_insert_bulk_smt(cls, placeholders, columns, data): column_names = columns placeholders = placeholders - sql = Statements.INSERT_COMMAND_MANY.format( + sql = PgStatements.INSERT_COMMAND_MANY.format( column_names=column_names, table_name=cls._get_name(), placeholder_values=placeholders, @@ -236,7 +232,7 @@ def _get_select_one_stm(cls, pk, pk_name: str = "id", fields: list = []): or isinstance(field, PrimaryKeyColumn) ) and name not in fields: fields.append(name) - sql = Statements.SELECT_BY_PK.format( + sql = PgStatements.SELECT_BY_PK.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), pk=pk, @@ -262,12 +258,12 @@ def _get_select_where_stm(cls, fields: list = [], args: dict = {}): filters.append(f"{key} = %s") params.append(value) if len(filters) == 0: - sql = Statements.SELECT_COMMAND.format( + sql = PgStatements.SELECT_COMMAND.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), ) else: - sql = Statements.SELECT_WHERE_COMMAND.format( + sql = PgStatements.SELECT_WHERE_COMMAND.format( column_names=", ".join([f'"{f}"' for f in fields]), table_name=cls._get_name(), filters=" AND ".join(filters), @@ -282,11 +278,11 @@ def _get_delete_where_stm(cls, pk: str = "id", args: dict = {}): filters.append(f"{key} = %s") params.append(value) if len(filters) == 0: - sql = Statements.DELETE_ALL_COMMAND.format( + sql = PgStatements.DELETE_ALL_COMMAND.format( table_name=cls._get_name(), ) else: - sql = Statements.DELETE_ONE_WHERE_COMMAND.format( + sql = PgStatements.DELETE_ONE_WHERE_COMMAND.format( table_name=cls._get_name(), filters=" AND ".join(filters), pk_name=pk ) return sql, params @@ -299,11 +295,11 @@ def _get_delete_bulk_where_stm(cls, args: dict = {}): filters.append(f"{key} = %s") params.append(value) if len(filters) == 0: - sql = Statements.DELETE_ALL_COMMAND.format( + sql = PgStatements.DELETE_ALL_COMMAND.format( table_name=cls._get_name(), ) else: - sql = Statements.DELETE_BULK_WHERE_COMMAND.format( + sql = PgStatements.DELETE_BULK_WHERE_COMMAND.format( table_name=cls._get_name(), filters=" AND ".join(filters), ) @@ -326,7 +322,7 @@ def _get_update_by_pk_stm(cls, pk_name: str = "id", args: dict = {}): placeholders.append(f'"{updatedAtColumName}" = %s') values.append(current_time_stamp) - sql = Statements.UPDATE_BY_PK_COMMAND.format( + sql = PgStatements.UPDATE_BY_PK_COMMAND.format( table_name=cls._get_name(), pk="%s", pk_name=pk_name, @@ -358,7 +354,7 @@ def _get_update_one_stm( placeholder_values.append(f'"{updatedAtColumName}" = %s') values.append(current_time_stamp) - sql = Statements.UPDATE_BY_ONE_COMMAND.format( + sql = PgStatements.UPDATE_BY_ONE_COMMAND.format( table_name=cls._get_name(), pk_name=pk_name, placeholder_values=", ".join(placeholder_values), @@ -388,7 +384,7 @@ def _get_update_bulk_where_stm(cls, filters: dict = {}, args: dict = {}): placeholder_values.append(f'"{updatedAtColumName}" = %s') values.append(current_time_stamp) - sql = Statements.UPDATE_BULK_WHERE_COMMAND.format( + sql = PgStatements.UPDATE_BULK_WHERE_COMMAND.format( table_name=cls._get_name(), placeholder_values=", ".join(placeholder_values), placeholder_filters=", ".join([i[0] for i in placeholder_filters]), diff --git a/dataloom/model/statements.py b/dataloom/model/statements.py index 52042ae..f71845b 100644 --- a/dataloom/model/statements.py +++ b/dataloom/model/statements.py @@ -1,4 +1,31 @@ -class Statements: +class MySqlStatements: + + # dropping table + DROP_TABLE = "DROP TABLE IF EXISTS {table_name};" + # getting tables + GET_TABLES = "SHOW TABLES;" + + # creating table + CREATE_NEW_TABLE = "CREATE TABLE {table_name} ({fields_name});" + CREATE_NEW_TABLE_IF_NOT_EXITS = ( + "CREATE TABLE IF NOT EXISTS {table_name} ({fields_name});" + ) + + +class Sqlite3Statements: + # dropping table + DROP_TABLE = "DROP TABLE IF EXISTS {table_name};" + # getting tables + GET_TABLES = "SELECT name FROM sqlite_master WHERE type='{type}';" + + # creating table + CREATE_NEW_TABLE = "CREATE TABLE {table_name} ({fields_name});" + CREATE_NEW_TABLE_IF_NOT_EXITS = ( + "CREATE TABLE IF NOT EXISTS {table_name} ({fields_name});" + ) + + +class PgStatements: # updates UPDATE_BY_PK_COMMAND = ( "UPDATE {table_name} SET {placeholder_values} WHERE {pk_name} = {pk};" diff --git a/dataloom/statements/__init__.py b/dataloom/statements/__init__.py new file mode 100644 index 0000000..8c8cb96 --- /dev/null +++ b/dataloom/statements/__init__.py @@ -0,0 +1,297 @@ +from dataclasses import dataclass +from dataloom.model.column import ( + PrimaryKeyColumn, + Column, + CreatedAtColumn, + ForeignKeyColumn, + UpdatedAtColumn, +) +import inspect +from typing import Optional +from dataloom.model.statements import * +import re +from dataloom.exceptions import ( + UnsupportedDialectException, + PkNotDefinedException, + TooManyPkException, +) + + +@dataclass(kw_only=True) +class GetStatement[T](): + def __init__( + self, + dialect: str, + model: Optional[T] = None, + table_name: Optional[str] = None, + ignore_exists: bool = True, + ) -> None: + self.dialect = dialect + self.model = model + self.table_name = table_name + self.ignore_exists = ignore_exists + + @property + def _get_drop_table_command(self) -> Optional[str]: + if self.dialect == "postgres": + sql = PgStatements.DROP_TABLE.format(table_name=f'"{self.table_name}"') + elif self.dialect == "mysql": + sql = MySqlStatements.DROP_TABLE.format(table_name=f"`{self.table_name}`") + elif self.dialect == "sqlite": + sql = Sqlite3Statements.DROP_TABLE.format(table_name=self.table_name) + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return sql + + @property + def _get_tables_command(self) -> Optional[str]: + if self.dialect == "postgres": + sql = PgStatements.GET_TABLES.format(schema_name="public") + elif self.dialect == "mysql": + sql = MySqlStatements.GET_TABLES + elif self.dialect == "sqlite": + sql = Sqlite3Statements.GET_TABLES.format(type="table") + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + return sql + + @property + def _get_create_table_command(self) -> Optional[str]: + # is the primary key defined in this table? + pks = list() + user_fields = list() + predefined_fields = list() + if self.dialect == "postgres": + for name, field in inspect.getmembers(self.model): + if isinstance(field, PrimaryKeyColumn): + pks.append(f'"{name}"') + _values = re.sub( + r"\s+", + " ", + "{_type} PRIMARY KEY {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + default=field.default_constraint, + nullable=field.nullable_constraint, + unique=field.unique_constraint, + ).strip(), + ) + user_fields.append((f'"{name}"', _values)) + elif isinstance(field, Column): + _values = re.sub( + r"\s+", + " ", + "{_type} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + unique=field.unique_constraint, + nullable=field.nullable_constraint, + default=field.default_constraint, + ).strip(), + ) + user_fields.append((f'"{name}"', _values)) + elif isinstance(field, CreatedAtColumn): + predefined_fields.append((f'"{name}"', field.created_at)) + elif isinstance(field, UpdatedAtColumn): + predefined_fields.append((f'"{name}"', field.updated_at)) + elif isinstance(field, ForeignKeyColumn): + # qns: + # 1. what is the pk in the parent table? + # 2. what is the type of the parent table pk? + # 3. what is the name of the parent table? + pk, pk_type = field.table._get_pk_attributes() + parent_table_name = "field.table._get_name()" + predefined_fields.append( + ( + f'"{name}"', + '{pk_type} {nullable} REFERENCES {parent_table_name}("{pk}") ON DELETE {onDelete} ON UPDATE {onUpdate}'.format( + onDelete=field.onDelete, + onUpdate=field.onUpdate, + pk_type=pk_type, + parent_table_name=f'"{parent_table_name}"', + pk=pk, + nullable="NOT NULL" if field.required else "NULL", + ), + ) + ) + + # do we have a single primary key or not? + if len(pks) == 0: + raise PkNotDefinedException( + "Your table does not have a primary key column." + ) + if len(pks) > 1: + raise TooManyPkException( + f"You have defined many field as primary keys which is not allowed. Fields ({', '.join(pks)}) are primary keys." + ) + fields = [*user_fields, *predefined_fields] + fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) + sql = ( + PgStatements.CREATE_NEW_TABLE.format( + table_name=f'"{self.table_name}"', fields_name=fields_name + ) + if not self.ignore_exists + else PgStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + table_name=f'"{self.table_name}"', fields_name=fields_name + ) + ) + return sql + elif self.dialect == "mysql": + for name, field in inspect.getmembers(self.model): + if isinstance(field, PrimaryKeyColumn): + pks.append(f"`{name}`") + _values = re.sub( + r"\s+", + " ", + "{_type} PRIMARY KEY {auto_increment} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + default=field.default_constraint, + nullable=field.nullable_constraint, + unique=field.unique_constraint, + auto_increment=( + "AUTO_INCREMENT" if field.auto_increment else "" + ), + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, Column): + _values = re.sub( + r"\s+", + " ", + "{_type} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + unique=field.unique_constraint, + nullable=field.nullable_constraint, + default=field.default_constraint, + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, CreatedAtColumn): + predefined_fields.append((f"`{name}`", field.created_at)) + elif isinstance(field, UpdatedAtColumn): + predefined_fields.append((f"`{name}`", field.updated_at)) + elif isinstance(field, ForeignKeyColumn): + # qns: + # 1. what is the pk in the parent table? + # 2. what is the type of the parent table pk? + # 3. what is the name of the parent table? + pk, pk_type = field.table._get_pk_attributes() + parent_table_name = field.table._get_name() + predefined_fields.append( + ( + f"`{name}`", + "{pk_type} {nullable} REFERENCES {parent_table_name}(`{pk}`) ON DELETE {onDelete} ON UPDATE {onUpdate}".format( + onDelete=field.onDelete, + onUpdate=field.onUpdate, + pk_type=pk_type, + parent_table_name=f"`{parent_table_name}`", + pk=pk, + nullable="NOT NULL" if field.required else "NULL", + ), + ) + ) + + # do we have a single primary key or not? + if len(pks) == 0: + raise PkNotDefinedException( + "Your table does not have a primary key column." + ) + if len(pks) > 1: + raise TooManyPkException( + f"You have defined many field as primary keys which is not allowed. Fields ({', '.join(pks)}) are primary keys." + ) + fields = [*user_fields, *predefined_fields] + fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) + sql = ( + MySqlStatements.CREATE_NEW_TABLE.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + if not self.ignore_exists + else MySqlStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + ) + return sql + + elif self.dialect == "sqlite": + for name, field in inspect.getmembers(self.model): + if isinstance(field, PrimaryKeyColumn): + pks.append(f"`{name}`") + _values = re.sub( + r"\s+", + " ", + "{_type} PRIMARY KEY {auto_increment} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + default=field.default_constraint, + nullable=field.nullable_constraint, + unique=field.unique_constraint, + auto_increment=( + "AUTOINCREMENT" if field.auto_increment else "" + ), + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, Column): + _values = re.sub( + r"\s+", + " ", + "{_type} {unique} {nullable} {default} ".format( + _type=field.sql_type(self.dialect), + unique=field.unique_constraint, + nullable=field.nullable_constraint, + default=field.default_constraint, + ).strip(), + ) + user_fields.append((f"`{name}`", _values)) + elif isinstance(field, CreatedAtColumn): + predefined_fields.append((f"`{name}`", field.created_at)) + elif isinstance(field, UpdatedAtColumn): + predefined_fields.append((f"`{name}`", field.updated_at)) + elif isinstance(field, ForeignKeyColumn): + # qns: + # 1. what is the pk in the parent table? + # 2. what is the type of the parent table pk? + # 3. what is the name of the parent table? + pk, pk_type = field.table._get_pk_attributes() + parent_table_name = field.table._get_name() + predefined_fields.append( + ( + f"`{name}`", + "{pk_type} {nullable} REFERENCES {parent_table_name}(`{pk}`) ON DELETE {onDelete} ON UPDATE {onUpdate}".format( + onDelete=field.onDelete, + onUpdate=field.onUpdate, + pk_type=pk_type, + parent_table_name=f"`{parent_table_name}`", + pk=pk, + nullable="NOT NULL" if field.required else "NULL", + ), + ) + ) + + # do we have a single primary key or not? + if len(pks) == 0: + raise PkNotDefinedException( + "Your table does not have a primary key column." + ) + if len(pks) > 1: + raise TooManyPkException( + f"You have defined many field as primary keys which is not allowed. Fields ({', '.join(pks)}) are primary keys." + ) + fields = [*user_fields, *predefined_fields] + fields_name = ", ".join(f for f in [" ".join(field) for field in fields]) + sql = ( + MySqlStatements.CREATE_NEW_TABLE.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + if not self.ignore_exists + else MySqlStatements.CREATE_NEW_TABLE_IF_NOT_EXITS.format( + table_name=f"`{self.table_name}`", fields_name=fields_name + ) + ) + return sql + else: + raise UnsupportedDialectException( + "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) diff --git a/dataloom/tests/mysql/test_connection_mysql.py b/dataloom/tests/mysql/test_connection_mysql.py new file mode 100644 index 0000000..db2deed --- /dev/null +++ b/dataloom/tests/mysql/test_connection_mysql.py @@ -0,0 +1,71 @@ +import pytest +from mysql import connector + + +class TestConnectionMySQL: + def test_connect_with_non_existing_database(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="non-exists", password="root", user="root" + ) + with pytest.raises(connector.errors.ProgrammingError) as exc_info: + conn = mysql_loom.connect() + conn.close() + assert exc_info.value.msg == "Unknown database 'non-exists'" + assert exc_info.value.errno == 1049 + + def test_connect_with_wrong_password(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="hi", password="user", user="root" + ) + with pytest.raises(connector.errors.ProgrammingError) as exc_info: + conn = mysql_loom.connect() + conn.close() + assert ( + exc_info.value.msg + == "Access denied for user 'root'@'localhost' (using password: YES)" + ) + assert exc_info.value.errno == 1045 + + def test_connect_with_wrong_user(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="hi", password="root", user="hey" + ) + with pytest.raises(connector.errors.ProgrammingError) as exc_info: + conn = mysql_loom.connect() + conn.close() + assert ( + exc_info.value.msg + == "Access denied for user 'hey'@'localhost' (using password: YES)" + ) + assert exc_info.value.errno == 1045 + + def test_connect_with_wrong_dialect(self): + from dataloom import Dataloom, UnsupportedDialectException + + with pytest.raises(UnsupportedDialectException) as exc_info: + mysql_loom = Dataloom( + dialect="peew", database="hi", password="user", user="root" + ) + conn = mysql_loom.connect() + conn.close() + + assert ( + str(exc_info.value) + == "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + def test_connect_correct_connection(self): + from dataloom import Dataloom + + mysql_loom = Dataloom( + dialect="mysql", database="hi", password="root", user="root" + ) + conn = mysql_loom.connect() + conn.close() + assert conn is not None diff --git a/dataloom/tests/mysql/test_create_tables_mysql.py b/dataloom/tests/mysql/test_create_tables_mysql.py new file mode 100644 index 0000000..e69de29 diff --git a/dataloom/tests/postgres/test_connection_pg.py b/dataloom/tests/postgres/test_connection_pg.py new file mode 100644 index 0000000..f55908e --- /dev/null +++ b/dataloom/tests/postgres/test_connection_pg.py @@ -0,0 +1,72 @@ +import pytest + + +class TestConnectionPG: + def test_connect_with_non_existing_database(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="mew", password="root", user="postgres" + ) + with pytest.raises(Exception) as exc_info: + conn = pg_loom.connect() + conn.close() + assert ( + str(exc_info.value.args[0]).strip() + == 'connection to server at "localhost" (::1), port 5432 failed: FATAL: database "mew" does not exist' + ) + + def test_connect_with_wrong_password(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="hi", password="root-", user="postgres" + ) + with pytest.raises(Exception) as exc_info: + conn = pg_loom.connect() + conn.close() + + assert ( + str(exc_info.value.args[0]).strip() + == 'connection to server at "localhost" (::1), port 5432 failed: FATAL: password authentication failed for user "postgres"' + ) + + def test_connect_with_wrong_user(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="hi", password="root", user="postgre-u" + ) + with pytest.raises(Exception) as exc_info: + conn = pg_loom.connect() + conn.close() + + assert ( + str(exc_info.value.args[0]).strip() + == 'connection to server at "localhost" (::1), port 5432 failed: FATAL: password authentication failed for user "postgre-u"' + ) + + def test_connect_with_wrong_dialect(self): + from dataloom import Dataloom, UnsupportedDialectException + + with pytest.raises(UnsupportedDialectException) as exc_info: + pg_loom = Dataloom( + dialect="peew", database="hi", password="root", user="postgres" + ) + conn = pg_loom.connect() + conn.close() + assert ( + str(exc_info.value) + == "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + def test_connect_correct_connection(self): + from dataloom import Dataloom + + pg_loom = Dataloom( + dialect="postgres", database="hi", password="root", user="postgres" + ) + conn = pg_loom.connect() + conn.close() + + assert conn is not None diff --git a/dataloom/tests/sqlite3/test_connection_sqlite.py b/dataloom/tests/sqlite3/test_connection_sqlite.py new file mode 100644 index 0000000..ea608e9 --- /dev/null +++ b/dataloom/tests/sqlite3/test_connection_sqlite.py @@ -0,0 +1,24 @@ +import pytest + + +class TestConnectionSQLite: + def test_connect_with_wrong_dialect(self): + from dataloom import Dataloom, UnsupportedDialectException + + with pytest.raises(UnsupportedDialectException) as exc_info: + sqlite_loom = Dataloom(dialect="hay", database="hi.db") + conn = sqlite_loom.connect() + conn.close() + + assert ( + str(exc_info.value) + == "The dialect passed is not supported the supported dialects are: {'postgres', 'mysql', 'sqlite'}" + ) + + def test_connect_correct_connection(self): + from dataloom import Dataloom + + sqlite_loom = Dataloom(dialect="sqlite", database="hi.db") + conn = sqlite_loom.connect() + conn.close() + assert conn is not None diff --git a/dataloom/tests/test_connection.py b/dataloom/tests/test_connection.py index 05bba6e..61cfaf5 100644 --- a/dataloom/tests/test_connection.py +++ b/dataloom/tests/test_connection.py @@ -1,36 +1,36 @@ -class TestConnectionPG: - def test_connect(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - conn = db.connect() - assert conn.status == 1 - conn.close() - - def test_connect_sync(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - from dataloom.model.model import Model - from dataloom.model.column import Column, PrimaryKeyColumn - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - - db = Database(database, password=password, user=user) - conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) - - assert len(tables) == 2 - assert conn.status == 1 - assert tables == ["users", "posts"] - - conn.close() +# class TestConnectionPG: +# def test_connect(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) +# conn = db.connect() +# assert conn.status == 1 +# conn.close() + +# def test_connect_sync(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user +# from dataloom.model.model import Model +# from dataloom.model.column import Column, PrimaryKeyColumn + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") + +# db = Database(database, password=password, user=user) +# conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) + +# assert len(tables) == 2 +# assert conn.status == 1 +# assert tables == ["users", "posts"] + +# conn.close() diff --git a/dataloom/tests/test_create_table.py b/dataloom/tests/test_create_table.py index ff0dcbb..5caaa0c 100644 --- a/dataloom/tests/test_create_table.py +++ b/dataloom/tests/test_create_table.py @@ -1,122 +1,122 @@ -class TestCreatingTablePG: - def test_2_pk_error(self): - from dataloom.db import Database - from dataloom.model.column import Column, PrimaryKeyColumn - from dataloom.model.model import Model - from dataloom.keys import password, database, user - import pytest - - db = Database(database, password=password, user=user) - conn = db.connect() - - class User(Model): - __tablename__ = "users" - _id = PrimaryKeyColumn(type="bigint", auto_increment=True) - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - with pytest.raises(Exception) as exc_info: - db.sync([User], drop=True, force=True) - - assert ( - str(exc_info.value) - == 'You have defined many field as primary keys which is not allowed. Fields ("_id", "id") are primary keys.' - ) - conn.close() - - def test_no_pk_error(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model - from dataloom.keys import password, database, user - import pytest - - db = Database(database, password=password, user=user) - conn = db.connect() - - class User(Model): - __tablename__ = "users" - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - with pytest.raises(Exception) as exc_info: - db.sync([User], drop=True, force=True) - - assert str(exc_info.value) == "Your table does not have a primary key column." - conn.close() - - def test_table_name(self): - from dataloom.db import Database - from dataloom.model.column import Column, PrimaryKeyColumn - from dataloom.model.model import Model - from dataloom.keys import database, password, user - - db = Database(database, password=password, user=user) - conn = db.connect() - - class Todos(Model): - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - completed = Column(type="boolean", default=False) - title = Column(type="varchar", length=255, nullable=False) - - class User(Model): - __tablename__ = "users" - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - assert User._get_name() == '"users"' - assert Todos._get_name() == '"todos"' - conn.close() - - def test_connect_sync(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - from dataloom.model.model import Model - from dataloom.model.column import Column, PrimaryKeyColumn - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - - db = Database(database, password=password, user=user) - conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) - - assert len(tables) == 2 - assert conn.status == 1 - assert sorted(tables) == sorted(["users", "posts"]) - - conn.close() - - def test_syncing_tables(self): - from dataloom.db import Database - from dataloom.keys import password, database, user - from dataloom.model.model import Model - from dataloom.model.column import Column, PrimaryKeyColumn - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - - db = Database(database, password=password, user=user) - conn = db.connect() - tables = db.sync([User, Post], drop=True, force=True) - assert len(tables) == 2 - assert tables == ["users", "posts"] - conn.close() +# class TestCreatingTablePG: +# def test_2_pk_error(self): +# from dataloom.db import Database +# from dataloom.model.column import Column, PrimaryKeyColumn +# from dataloom.model.model import Model +# from dataloom.keys import password, database, user +# import pytest + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class User(Model): +# __tablename__ = "users" +# _id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# with pytest.raises(Exception) as exc_info: +# db.sync([User], drop=True, force=True) + +# assert ( +# str(exc_info.value) +# == 'You have defined many field as primary keys which is not allowed. Fields ("_id", "id") are primary keys.' +# ) +# conn.close() + +# def test_no_pk_error(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model +# from dataloom.keys import password, database, user +# import pytest + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class User(Model): +# __tablename__ = "users" +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# with pytest.raises(Exception) as exc_info: +# db.sync([User], drop=True, force=True) + +# assert str(exc_info.value) == "Your table does not have a primary key column." +# conn.close() + +# def test_table_name(self): +# from dataloom.db import Database +# from dataloom.model.column import Column, PrimaryKeyColumn +# from dataloom.model.model import Model +# from dataloom.keys import database, password, user + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class Todos(Model): +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# completed = Column(type="boolean", default=False) +# title = Column(type="varchar", length=255, nullable=False) + +# class User(Model): +# __tablename__ = "users" +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# assert User._get_name() == '"users"' +# assert Todos._get_name() == '"todos"' +# conn.close() + +# def test_connect_sync(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user +# from dataloom.model.model import Model +# from dataloom.model.column import Column, PrimaryKeyColumn + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") + +# db = Database(database, password=password, user=user) +# conn, tables = db.connect_and_sync([User, Post], drop=True, force=True) + +# assert len(tables) == 2 +# assert conn.status == 1 +# assert sorted(tables) == sorted(["users", "posts"]) + +# conn.close() + +# def test_syncing_tables(self): +# from dataloom.db import Database +# from dataloom.keys import password, database, user +# from dataloom.model.model import Model +# from dataloom.model.column import Column, PrimaryKeyColumn + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") + +# db = Database(database, password=password, user=user) +# conn = db.connect() +# tables = db.sync([User, Post], drop=True, force=True) +# assert len(tables) == 2 +# assert tables == ["users", "posts"] +# conn.close() diff --git a/dataloom/tests/test_delete.py b/dataloom/tests/test_delete.py index a48a6b1..cc10469 100644 --- a/dataloom/tests/test_delete.py +++ b/dataloom/tests/test_delete.py @@ -1,102 +1,102 @@ -class TestDeletingOnPG: - def test_delete_by_pk_single_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user +# class TestDeletingOnPG: +# def test_delete_by_pk_single_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - affected_rows_1 = db.delete_by_pk(User, userId) - affected_rows_2 = db.delete_by_pk(User, 89) - assert affected_rows_1 == 1 - assert affected_rows_2 == 0 - conn.close() +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# affected_rows_1 = db.delete_by_pk(User, userId) +# affected_rows_2 = db.delete_by_pk(User, 89) +# assert affected_rows_1 == 1 +# assert affected_rows_2 == 0 +# conn.close() - def test_delete_one_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user +# def test_delete_one_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=False, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=False, length=255) - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - db.create_bulk( - [ - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - ] - ) - db.delete_one(User, {"name": "Crispen"}) - rows_1 = db.find_many(User, {"name": "Crispen"}) - db.delete_one(User, {"name": "Crispen", "id": 9}) - rows_2 = db.find_many(User, {"name": "Crispen"}) - db.delete_one(User, {"name": "Crispen", "id": 2}) - rows_3 = db.find_many(User, {"name": "Crispen"}) - assert len(rows_1) == 2 - assert len(rows_2) == 2 - assert len(rows_3) == 1 - conn.close() +# db.create_bulk( +# [ +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# ] +# ) +# db.delete_one(User, {"name": "Crispen"}) +# rows_1 = db.find_many(User, {"name": "Crispen"}) +# db.delete_one(User, {"name": "Crispen", "id": 9}) +# rows_2 = db.find_many(User, {"name": "Crispen"}) +# db.delete_one(User, {"name": "Crispen", "id": 2}) +# rows_3 = db.find_many(User, {"name": "Crispen"}) +# assert len(rows_1) == 2 +# assert len(rows_2) == 2 +# assert len(rows_3) == 1 +# conn.close() - def test_delete_bulk_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user +# def test_delete_bulk_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=False, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=False, length=255) - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - db.create_bulk( - [ - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - ] - ) - db.delete_bulk(User, {"name": "Crispen"}) - rows_1 = db.find_many(User, {"name": "Crispen"}) - db.create_bulk( - [ - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - User(name="Crispen", username="heyy"), - ] - ) - db.delete_bulk(User, {"name": "Crispen", "id": 99}) - rows_2 = db.find_many(User, {"name": "Crispen"}) - db.delete_bulk(User, {"name": "Crispen", "id": 5}) - rows_3 = db.find_many(User, {"name": "Crispen"}) - assert len(rows_1) == 0 - assert len(rows_2) == 3 - assert len(rows_3) == 2 - conn.close() +# db.create_bulk( +# [ +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# ] +# ) +# db.delete_bulk(User, {"name": "Crispen"}) +# rows_1 = db.find_many(User, {"name": "Crispen"}) +# db.create_bulk( +# [ +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# User(name="Crispen", username="heyy"), +# ] +# ) +# db.delete_bulk(User, {"name": "Crispen", "id": 99}) +# rows_2 = db.find_many(User, {"name": "Crispen"}) +# db.delete_bulk(User, {"name": "Crispen", "id": 5}) +# rows_3 = db.find_many(User, {"name": "Crispen"}) +# assert len(rows_1) == 0 +# assert len(rows_2) == 3 +# assert len(rows_3) == 2 +# conn.close() diff --git a/dataloom/tests/test_insert.py b/dataloom/tests/test_insert.py index 94fe8b6..2f6bfb2 100644 --- a/dataloom/tests/test_insert.py +++ b/dataloom/tests/test_insert.py @@ -1,107 +1,107 @@ -class TestInsertingOnPG: - def test_insetting_single_document(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import Model, PrimaryKeyColumn - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - conn = db.connect() - - class Users(Model): - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) - - db.sync([Users], drop=True, force=True) - - user = Users(name="Crispen", username="heyy") - userId = db.create(user) - assert userId == 1 - conn.close() - - def test_insetting_multiple_document(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - CreatedAtColumn, - UpdatedAtColumn, - ForeignKeyColumn, - PrimaryKeyColumn, - ) - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") - - conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - posts = [ - Post(userId=userId, title="What are you thinking"), - Post(userId=userId, title="What are you doing?"), - Post(userId=userId, title="What are we?"), - ] - row_count = db.create_bulk(posts) - - assert row_count == 3 - conn.close() - - def test_relational_instances(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - CreatedAtColumn, - UpdatedAtColumn, - ForeignKeyColumn, - PrimaryKeyColumn, - ) - from dataloom.keys import password, database, user - - db = Database(database, password=password, user=user) - - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False) - name = Column(type="varchar", unique=False, length=255) - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - - class Post(Model): - __tablename__ = "posts" - - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - title = Column(type="text", nullable=False, default="Hello there!!") - createAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() - userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") - - db = Database("hi", password="root", user="postgres") - conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - postId = db.create( - Post(userId=userId, title="What are you thinking"), - ) - now = db.find_by_pk(Post, postId) - assert userId == now.userId - conn.close() +# class TestInsertingOnPG: +# def test_insetting_single_document(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import Model, PrimaryKeyColumn +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) +# conn = db.connect() + +# class Users(Model): +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) + +# db.sync([Users], drop=True, force=True) + +# user = Users(name="Crispen", username="heyy") +# userId = db.create(user) +# assert userId == 1 +# conn.close() + +# def test_insetting_multiple_document(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# CreatedAtColumn, +# UpdatedAtColumn, +# ForeignKeyColumn, +# PrimaryKeyColumn, +# ) +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() +# userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") + +# conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# posts = [ +# Post(userId=userId, title="What are you thinking"), +# Post(userId=userId, title="What are you doing?"), +# Post(userId=userId, title="What are we?"), +# ] +# row_count = db.create_bulk(posts) + +# assert row_count == 3 +# conn.close() + +# def test_relational_instances(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# CreatedAtColumn, +# UpdatedAtColumn, +# ForeignKeyColumn, +# PrimaryKeyColumn, +# ) +# from dataloom.keys import password, database, user + +# db = Database(database, password=password, user=user) + +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False) +# name = Column(type="varchar", unique=False, length=255) +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() + +# class Post(Model): +# __tablename__ = "posts" + +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# title = Column(type="text", nullable=False, default="Hello there!!") +# createAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() +# userId = ForeignKeyColumn(User, onDelete="CASCADE", onUpdate="CASCADE") + +# db = Database("hi", password="root", user="postgres") +# conn, _ = db.connect_and_sync([User, Post], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# postId = db.create( +# Post(userId=userId, title="What are you thinking"), +# ) +# now = db.find_by_pk(Post, postId) +# assert userId == now.userId +# conn.close() diff --git a/dataloom/tests/test_query.py b/dataloom/tests/test_query.py index 1be42f8..013bdd6 100644 --- a/dataloom/tests/test_query.py +++ b/dataloom/tests/test_query.py @@ -1,70 +1,70 @@ -class TestQueryingPG: - def test_querying_data(self): - from dataloom.db import Database - from dataloom.model.column import Column, PrimaryKeyColumn - from dataloom.model.model import Model - from dataloom.keys import password, database, user +# class TestQueryingPG: +# def test_querying_data(self): +# from dataloom.db import Database +# from dataloom.model.column import Column, PrimaryKeyColumn +# from dataloom.model.model import Model +# from dataloom.keys import password, database, user - db = Database(database, password=password, user=user) - conn = db.connect() +# db = Database(database, password=password, user=user) +# conn = db.connect() - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column( - type="varchar", - unique=True, - length=255, - ) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", nullable=False, auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column( +# type="varchar", +# unique=True, +# length=255, +# ) - def __str__(self) -> str: - return f"User<{self.id}>" +# def __str__(self) -> str: +# return f"User<{self.id}>" - def __repr__(self) -> str: - return f"User<{self.id}>" +# def __repr__(self) -> str: +# return f"User<{self.id}>" - def to_dict(self): - return {"id": self.id, "name": self.name, "username": self.username} +# def to_dict(self): +# return {"id": self.id, "name": self.name, "username": self.username} - db.sync([User], drop=True, force=True) +# db.sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - db.create(user) - users = db.find_all(User) - me = db.find_by_pk(User, 1).to_dict() - her = db.find_by_pk(User, 2) - many_0 = db.find_many(User, {"id": 5}) - many_1 = db.find_many(User, {"id": 1}) - many_2 = db.find_many(User, {"id": 1, "name": "Crispen"}) - many_3 = db.find_many(User, {"id": 5, "username": "hey"}) - many_4 = db.find_many(User, {"name": "Crispen", "username": "heyy"}) +# user = User(name="Crispen", username="heyy") +# db.create(user) +# users = db.find_all(User) +# me = db.find_by_pk(User, 1).to_dict() +# her = db.find_by_pk(User, 2) +# many_0 = db.find_many(User, {"id": 5}) +# many_1 = db.find_many(User, {"id": 1}) +# many_2 = db.find_many(User, {"id": 1, "name": "Crispen"}) +# many_3 = db.find_many(User, {"id": 5, "username": "hey"}) +# many_4 = db.find_many(User, {"name": "Crispen", "username": "heyy"}) - one_0 = db.find_one(User, {"id": 5}) - one_1 = db.find_one(User, {"id": 1}) - one_2 = db.find_one(User, {"id": 1, "name": "Crispen"}) - one_3 = db.find_one(User, {"id": 5, "username": "hey"}) - one_4 = db.find_one(User, {"name": "Crispen", "username": "heyy"}) +# one_0 = db.find_one(User, {"id": 5}) +# one_1 = db.find_one(User, {"id": 1}) +# one_2 = db.find_one(User, {"id": 1, "name": "Crispen"}) +# one_3 = db.find_one(User, {"id": 5, "username": "hey"}) +# one_4 = db.find_one(User, {"name": "Crispen", "username": "heyy"}) - assert [u.to_dict() for u in users] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] - assert [u.to_dict() for u in many_0] == [] - assert [u.to_dict() for u in many_3] == [] - assert [u.to_dict() for u in many_1] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] - assert [u.to_dict() for u in many_2] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] - assert [u.to_dict() for u in many_4] == [ - {"id": 1, "name": "Crispen", "username": "heyy"} - ] +# assert [u.to_dict() for u in users] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] +# assert [u.to_dict() for u in many_0] == [] +# assert [u.to_dict() for u in many_3] == [] +# assert [u.to_dict() for u in many_1] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] +# assert [u.to_dict() for u in many_2] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] +# assert [u.to_dict() for u in many_4] == [ +# {"id": 1, "name": "Crispen", "username": "heyy"} +# ] - assert one_0 is None - assert one_3 is None - assert one_1.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} - assert one_2.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} - assert one_4.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} - assert her is None - conn.close() +# assert one_0 is None +# assert one_3 is None +# assert one_1.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} +# assert one_2.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} +# assert one_4.to_dict() == {"id": 1, "name": "Crispen", "username": "heyy"} +# assert her is None +# conn.close() diff --git a/dataloom/tests/test_update.py b/dataloom/tests/test_update.py index 4d87697..3868a63 100644 --- a/dataloom/tests/test_update.py +++ b/dataloom/tests/test_update.py @@ -1,112 +1,112 @@ -class TestDeletingOnPG: - def test_update_by_pk_single_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - PrimaryKeyColumn, - CreatedAtColumn, - UpdatedAtColumn, - ) - from dataloom.keys import password, database, user - import time, pytest +# class TestDeletingOnPG: +# def test_update_by_pk_single_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# PrimaryKeyColumn, +# CreatedAtColumn, +# UpdatedAtColumn, +# ) +# from dataloom.keys import password, database, user +# import time, pytest - db = Database(database, password=password, user=user) +# db = Database(database, password=password, user=user) - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) - createdAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() +# createdAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() - db.connect_and_sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - time.sleep(0.2) - res_1 = db.update_by_pk(User, 1, {"name": "Tinashe Gari"}) - me = db.find_by_pk(User, userId) - assert res_1 == 1 - assert me.createdAt != me.updatedAt - with pytest.raises(Exception) as exc_info: - db.update_by_pk(User, 1, {"haha": "Gari"}) - assert exc_info.value.pgcode == "42703" - with pytest.raises(Exception) as exc_info: - db.update_by_pk(User, 1, {"id": "Gari"}) - assert exc_info.value.pgcode == "25P02" +# db.connect_and_sync([User], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# time.sleep(0.2) +# res_1 = db.update_by_pk(User, 1, {"name": "Tinashe Gari"}) +# me = db.find_by_pk(User, userId) +# assert res_1 == 1 +# assert me.createdAt != me.updatedAt +# with pytest.raises(Exception) as exc_info: +# db.update_by_pk(User, 1, {"haha": "Gari"}) +# assert exc_info.value.pgcode == "42703" +# with pytest.raises(Exception) as exc_info: +# db.update_by_pk(User, 1, {"id": "Gari"}) +# assert exc_info.value.pgcode == "25P02" - def test_update_one_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - PrimaryKeyColumn, - CreatedAtColumn, - UpdatedAtColumn, - ) - from dataloom.keys import password, database, user - import time, pytest +# def test_update_one_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# PrimaryKeyColumn, +# CreatedAtColumn, +# UpdatedAtColumn, +# ) +# from dataloom.keys import password, database, user +# import time, pytest - db = Database(database, password=password, user=user) +# db = Database(database, password=password, user=user) - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=True, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=True, length=255) - createdAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() +# createdAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() - db.connect_and_sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - userId = db.create(user) - time.sleep(0.2) - res_1 = db.update_one(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) - me = db.find_by_pk(User, userId) - assert res_1 == 1 - assert me.createdAt != me.updatedAt - with pytest.raises(Exception) as exc_info: - db.update_one(User, {"name": "Crispen"}, {"haha": "Gari"}) - assert exc_info.value.pgcode == "42703" - with pytest.raises(Exception) as exc_info: - db.update_one(User, {"name": "HH"}, {"name": "Gari"}) - assert exc_info.value.pgcode == "25P02" +# db.connect_and_sync([User], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# userId = db.create(user) +# time.sleep(0.2) +# res_1 = db.update_one(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) +# me = db.find_by_pk(User, userId) +# assert res_1 == 1 +# assert me.createdAt != me.updatedAt +# with pytest.raises(Exception) as exc_info: +# db.update_one(User, {"name": "Crispen"}, {"haha": "Gari"}) +# assert exc_info.value.pgcode == "42703" +# with pytest.raises(Exception) as exc_info: +# db.update_one(User, {"name": "HH"}, {"name": "Gari"}) +# assert exc_info.value.pgcode == "25P02" - def test_update_bulk_fn(self): - from dataloom.db import Database - from dataloom.model.column import Column - from dataloom.model.model import ( - Model, - PrimaryKeyColumn, - CreatedAtColumn, - UpdatedAtColumn, - ) - from dataloom.keys import password, database, user - import time, pytest +# def test_update_bulk_fn(self): +# from dataloom.db import Database +# from dataloom.model.column import Column +# from dataloom.model.model import ( +# Model, +# PrimaryKeyColumn, +# CreatedAtColumn, +# UpdatedAtColumn, +# ) +# from dataloom.keys import password, database, user +# import time, pytest - db = Database(database, password=password, user=user) +# db = Database(database, password=password, user=user) - class User(Model): - __tablename__ = "users" - id = PrimaryKeyColumn(type="bigint", auto_increment=True) - username = Column(type="text", nullable=False, default="Hello there!!") - name = Column(type="varchar", unique=False, length=255) +# class User(Model): +# __tablename__ = "users" +# id = PrimaryKeyColumn(type="bigint", auto_increment=True) +# username = Column(type="text", nullable=False, default="Hello there!!") +# name = Column(type="varchar", unique=False, length=255) - createdAt = CreatedAtColumn() - updatedAt = UpdatedAtColumn() +# createdAt = CreatedAtColumn() +# updatedAt = UpdatedAtColumn() - db.connect_and_sync([User], drop=True, force=True) - user = User(name="Crispen", username="heyy") - db.create_bulk([user for u in range(4)]) - res_1 = db.update_bulk(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) - assert res_1 == 4 +# db.connect_and_sync([User], drop=True, force=True) +# user = User(name="Crispen", username="heyy") +# db.create_bulk([user for u in range(4)]) +# res_1 = db.update_bulk(User, {"name": "Crispen"}, {"name": "Tinashe Gari"}) +# assert res_1 == 4 - with pytest.raises(Exception) as exc_info: - db.update_bulk(User, {"name": "Crispen"}, {"haha": "Gari"}) - assert exc_info.value.pgcode == "42703" - with pytest.raises(Exception) as exc_info: - db.update_bulk(User, {"name": "HH"}, {"name": "Gari"}) - assert exc_info.value.pgcode == "25P02" +# with pytest.raises(Exception) as exc_info: +# db.update_bulk(User, {"name": "Crispen"}, {"haha": "Gari"}) +# assert exc_info.value.pgcode == "42703" +# with pytest.raises(Exception) as exc_info: +# db.update_bulk(User, {"name": "HH"}, {"name": "Gari"}) +# assert exc_info.value.pgcode == "25P02" diff --git a/dataloom/types/__init__.py b/dataloom/types/__init__.py index 95bb22a..1b7c4c7 100644 --- a/dataloom/types/__init__.py +++ b/dataloom/types/__init__.py @@ -34,3 +34,40 @@ "circle": "CIRCLE", "hstore": "HSTORE", } + + +MYSQL_SQL_TYPES = { + "int": "INT", + "smallint": "SMALLINT", + "bigint": "BIGINT", + "float": "FLOAT", + "double": "DOUBLE", + "numeric": "DECIMAL", + "text": "TEXT", + "varchar": "VARCHAR", + "char": "CHAR", + "boolean": "BOOLEAN", + "date": "DATE", + "time": "TIME", + "timestamp": "TIMESTAMP", + "json": "JSON", + "blob": "BLOB", +} + +SQLITE3_SQL_TYPES = { + "int": "INTEGER", + "smallint": "SMALLINT", + "bigint": "BIGINT", + "float": "REAL", + "double precision": "DOUBLE", + "numeric": "NUMERIC", + "text": "TEXT", + "varchar": "VARCHAR", + "char": "CHAR", + "boolean": "BOOLEAN", + "date": "DATE", + "time": "TIME", + "timestamp": "TIMESTAMP", + "json": "JSON", + "blob": "BLOB", +} diff --git a/hello.sql b/hello.sql new file mode 100644 index 0000000..6edab26 --- /dev/null +++ b/hello.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS "posts" ( + "id" BIGSERIAL UNIQUE NOT NULL, + "title" TEXT, + "createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); \ No newline at end of file diff --git a/hi.db b/hi.db index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..b98f27442a9f0fd70f9fb3a402a7bdc02494668d 100644 GIT binary patch literal 24576 zcmeI&(QDH{9Ki9*wss=4+f$=&_qehl44(z#JSCWBU3LNcl*ILfO4n{p3i>$wPlo@F z;9ny?dO4kRg9$#`d;_`5?~-5O_et*INKQ|pNoC~w`Qp5&!V z1+Mg?ZQu93&)Ze;ZfcgLS(L769VIHzO6uT6q-5^Ka=DjJ#<>h*tzN4{4wG;YBqRAo zjbxDO<1n^K0~KqT#^Fh-WPGe;oJP@s%&SQ?HM!L4t+vtG!W5Mm2bCQO2kKM@gQ0w> zUIl5S_9oe%Gd5v)G5&{ye*baXf988)G8>zZ<>l0_LbGC7&0UTQ$9ymsRPSja&J-8~cw;{lA&I zVzNWmZQ2?2pq^Eoy4kH8HUtnr009ILKmY**5I_I{1Q1wPfroWrzt;a7>sljqA%Fk^ z2q1s}0tg_000Iag&~yR5|2O>&pne1pKmY**5I_I{1Q0*~fhGvB{%^ulrbYx1KmY** r5I_I{1Q0*~fu;+r{Quu-x&`$kfB*srAb