Skip to content

Commit

Permalink
update functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
CrispenGari committed Feb 4, 2024
1 parent fa1bda8 commit d11f722
Show file tree
Hide file tree
Showing 11 changed files with 611 additions and 7,730 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# loggs files

*.sql

# sqlite dababase
*.db


# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
7,513 changes: 273 additions & 7,240 deletions dataloom.sql

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dataloom/constants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime

CURRENT_TIME_STAMP = datetime.now()
SQLITE_CURRENT_TIME_STAMP = 'datetime("now")'
instances = {
"postgres": {
"type": "postgres",
Expand Down
8 changes: 4 additions & 4 deletions dataloom/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
CreatedAtColumn,
UpdatedAtColumn,
)
from dataloom.constants import CURRENT_TIME_STAMP
from dataloom.constants import CURRENT_TIME_STAMP, SQLITE_CURRENT_TIME_STAMP


class Model:
Expand Down Expand Up @@ -280,7 +280,7 @@ def _get_update_by_pk_stm(cls, dialect: str, args: dict = {}):

if updatedAtColumName is not None:
placeholders.append(f'{updatedAtColumName} = {'?' if dialect == 'sqlite' else '%s'}')
values.append(CURRENT_TIME_STAMP)
values.append(SQLITE_CURRENT_TIME_STAMP if dialect =='sqlite' else CURRENT_TIME_STAMP)

if dialect == "postgres" or "mysql" or "sqlite":
sql = GetStatement(
Expand Down Expand Up @@ -337,7 +337,7 @@ def _get_update_one_stm(cls, dialect: str,

if updatedAtColumName is not None:
placeholders_of_new_values.append(f'{updatedAtColumName} = {'?' if dialect == 'sqlite' else '%s'}')
new_values.append(CURRENT_TIME_STAMP)
new_values.append(SQLITE_CURRENT_TIME_STAMP if dialect =='sqlite' else CURRENT_TIME_STAMP)

if dialect == "postgres" or "mysql" or "sqlite":
sql = GetStatement(
Expand Down Expand Up @@ -392,7 +392,7 @@ def _get_update_bulk_where_stm(cls, dialect: str,

if updatedAtColumName is not None:
placeholders_of_new_values.append(f'{updatedAtColumName} = {'?' if dialect == 'sqlite' else '%s'}')
new_values.append(CURRENT_TIME_STAMP)
new_values.append(SQLITE_CURRENT_TIME_STAMP if dialect =='sqlite' else CURRENT_TIME_STAMP)

if dialect == "postgres" or "mysql" or "sqlite":
sql = GetStatement(
Expand Down
7 changes: 4 additions & 3 deletions dataloom/model/statements.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
class MySqlStatements:

# updates
UPDATE_BY_PK_COMMAND = (
"UPDATE {table_name} SET {placeholder_values} WHERE {pk_name} = {pk};"
)
UPDATE_ONE_WHERE_COMMAND = """
UPDATE {table_name} SET {placeholder_values} WHERE {pk_name} = (
SELECT {pk_name} FROM {table_name} WHERE {placeholder_filters} LIMIT 1
UPDATE {table_name} SET {placeholder_values} WHERE {pk_name} IN (
SELECT {pk_name} FROM (
SELECT {pk_name} FROM {table_name} WHERE {placeholder_filters} LIMIT 1
) AS subquery
);
"""
UPDATE_BULK_WHERE_COMMAND = (
Expand Down
204 changes: 107 additions & 97 deletions dataloom/tests/mysql/test_update_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Post(Model):
title = Column(type="varchar", length=255, nullable=False)
# timestamps
createdAt = CreatedAtColumn()

# relations
userId = ForeignKeyColumn(
User, type="int", required=True, onDelete="CASCADE", onUpdate="CASCADE"
Expand All @@ -123,111 +124,120 @@ class Post(Model):
post = Post(title="What are you doing?", userId=userId)
_ = mysql_loom.insert_bulk([post for i in range(5)])
time.sleep(0.05)
res_1 = mysql_loom.update_one(Post, {"id": userId}, {"title": "What's up!"})
res_2 = mysql_loom.update_one(Post, {"id": 10}, {"title": "What's up!"})
res_1 = mysql_loom.update_one(Post, {"userId": userId}, {"title": "John"})
res_2 = mysql_loom.update_one(Post, {"userId": 2}, {"title": "John"})

post = mysql_loom.find_by_pk(Post, 1)

# with pytest.raises(Exception) as exc_info:
# mysql_loom.update_one(Post, {"userId": userId}, {"id": "Gari"})
with pytest.raises(Exception) as exc_info:
mysql_loom.update_one(Post, {"userId": userId}, {"userId": "Gari"})

# print(exc_info.value.errno, exc_info.value.msg)
# assert exc_info.value.errno == 1366
# exc_info.value.msg = "Incorrect integer value: 'Gari' for column 'id' at row 1"
assert exc_info.value.errno == 1366
exc_info.value.msg = "Incorrect integer value: 'Gari' for column 'id' at row 1"

# with pytest.raises(InvalidFiltersForTableColumnException) as exc_info:
# mysql_loom.update_one(Post, {"wrong_key": "@miller"}, {"Title": "hey"})
with pytest.raises(InvalidFiltersForTableColumnException) as exc_info:
mysql_loom.update_one(Post, {"wrong_key": "@miller"}, {"userId": 3})

# assert (
# str(exc_info.value)
# == "There are no column filter passed to perform the UPDATE ONE operation or you passed filters that does not match columns in table 'users'."
# )
assert (
str(exc_info.value)
== "There are no column filter passed to perform the UPDATE ONE operation or you passed filters that does not match columns in table 'posts'."
)

# with pytest.raises(InvalidColumnValuesException) as exc_info:
# mysql_loom.update_one(Post, {"userId": userId}, values={"loca": "miller"})
# assert (
# str(exc_info.value)
# == "There are no new values passed to perform the UPDATE ONE operation, or you don't have the CreatedAtColumn field in your table 'posts'."
# )
with pytest.raises(InvalidColumnValuesException) as exc_info:
mysql_loom.update_one(Post, {"userId": userId}, values={"loca": "miller"})
assert (
str(exc_info.value)
== "There are no new values passed to perform the UPDATE ONE operation, or you don't have the CreatedAtColumn field in your table 'posts'."
)

assert post.createdAt != post.updatedAt
post.title == "John"
assert res_1 == 1
assert res_2 == 0
conn.close()

# def test_update_bulk_fn(self):
# from dataloom import (
# Dataloom,
# Model,
# Column,
# PrimaryKeyColumn,
# CreatedAtColumn,
# UpdatedAtColumn,
# TableColumn,
# ForeignKeyColumn,
# InvalidFiltersForTableColumnException,
# InvalidColumnValuesException,
# )
# from dataloom.keys import MySQLConfig
# from typing import Optional
# import time, pytest

# mysql_loom = Dataloom(
# dialect="mysql",
# database=MySQLConfig.database,
# password=MySQLConfig.password,
# user=MySQLConfig.user,
# )

# class User(Model):
# __tablename__: Optional[TableColumn] = TableColumn(name="users")
# id = PrimaryKeyColumn(type="int", auto_increment=True)
# name = Column(type="text", nullable=False, default="Bob")
# username = Column(type="varchar", unique=True, length=255)

# # timestamps
# createdAt = CreatedAtColumn()
# updatedAt = UpdatedAtColumn()

# class Post(Model):
# __tablename__: Optional[TableColumn] = TableColumn(name="posts")
# id = PrimaryKeyColumn(
# type="int", auto_increment=True, nullable=False, unique=True
# )
# completed = Column(type="boolean", default=False)
# title = Column(type="varchar", length=255, nullable=False)
# # timestamps
# createdAt = CreatedAtColumn()
# # relations
# userId = ForeignKeyColumn(
# User, type="int", required=True, onDelete="CASCADE", onUpdate="CASCADE"
# )

# conn, _ = mysql_loom.connect_and_sync([Post, User], drop=True, force=True)

# user = User(username="@miller")
# userId = mysql_loom.insert_one(user)
# post = Post(title="What are you doing?", userId=userId)
# _ = mysql_loom.insert_bulk([post for i in range(5)])
# res_1 = mysql_loom.update_bulk(Post, {"userId": userId}, {"title": "John"})
# res_2 = mysql_loom.update_bulk(Post, {"userId": 2}, {"title": "John"})

# with pytest.raises(Exception) as exc_info:
# mysql_loom.update_bulk(Post, {"userId": userId}, {"userId": "Gari"})
# assert exc_info.value.pgcode == "22P02"
# with pytest.raises(InvalidFiltersForTableColumnException) as exc_info:
# mysql_loom.update_bulk(Post, {"wrong_key": "@miller"}, {"userId": 3})

# assert (
# str(exc_info.value)
# == "There are no column filter passed to perform the UPDATE ONE operation or you passed filters that does not match columns in table 'posts'."
# )

# with pytest.raises(InvalidColumnValuesException) as exc_info:
# mysql_loom.update_bulk(Post, {"userId": userId}, values={"loca": "miller"})
# assert (
# str(exc_info.value)
# == "There are no new values passed to perform the UPDATE ONE operation, or you don't have the CreatedAtColumn field in your table 'posts'."
# )
# assert res_1 == 5
# assert res_2 == 0
# conn.close()
def test_update_bulk_fn(self):
from dataloom import (
Dataloom,
Model,
Column,
PrimaryKeyColumn,
CreatedAtColumn,
UpdatedAtColumn,
TableColumn,
ForeignKeyColumn,
InvalidFiltersForTableColumnException,
InvalidColumnValuesException,
)
from dataloom.keys import MySQLConfig
from typing import Optional
import time, pytest

mysql_loom = Dataloom(
dialect="mysql",
database=MySQLConfig.database,
password=MySQLConfig.password,
user=MySQLConfig.user,
)

class User(Model):
__tablename__: Optional[TableColumn] = TableColumn(name="users")
id = PrimaryKeyColumn(type="int", auto_increment=True)
name = Column(type="text", nullable=False, default="Bob")
username = Column(type="varchar", unique=True, length=255)

# timestamps
createdAt = CreatedAtColumn()
updatedAt = UpdatedAtColumn()

class Post(Model):
__tablename__: Optional[TableColumn] = TableColumn(name="posts")
id = PrimaryKeyColumn(
type="int", auto_increment=True, nullable=False, unique=True
)
completed = Column(type="boolean", default=False)
title = Column(type="varchar", length=255, nullable=False)
# timestamps
createdAt = CreatedAtColumn()

# relations
userId = ForeignKeyColumn(
User, type="int", required=True, onDelete="CASCADE", onUpdate="CASCADE"
)

conn, _ = mysql_loom.connect_and_sync([Post, User], drop=True, force=True)

user = User(username="@miller")
userId = mysql_loom.insert_one(user)
post = Post(title="What are you doing?", userId=userId)
_ = mysql_loom.insert_bulk([post for i in range(5)])
time.sleep(0.05)
res_1 = mysql_loom.update_bulk(Post, {"userId": userId}, {"title": "John"})
res_2 = mysql_loom.update_bulk(Post, {"userId": 2}, {"title": "John"})

post = mysql_loom.find_by_pk(Post, 1)

with pytest.raises(Exception) as exc_info:
mysql_loom.update_bulk(Post, {"userId": userId}, {"userId": "Gari"})

assert exc_info.value.errno == 1366
exc_info.value.msg = "Incorrect integer value: 'Gari' for column 'id' at row 1"

with pytest.raises(InvalidFiltersForTableColumnException) as exc_info:
mysql_loom.update_bulk(Post, {"wrong_key": "@miller"}, {"userId": 3})

assert (
str(exc_info.value)
== "There are no column filter passed to perform the UPDATE ONE operation or you passed filters that does not match columns in table 'posts'."
)

with pytest.raises(InvalidColumnValuesException) as exc_info:
mysql_loom.update_bulk(Post, {"userId": userId}, values={"loca": "miller"})
assert (
str(exc_info.value)
== "There are no new values passed to perform the UPDATE ONE operation, or you don't have the CreatedAtColumn field in your table 'posts'."
)

post.title == "John"
assert res_1 == 5
assert res_2 == 0
conn.close()
Loading

0 comments on commit d11f722

Please sign in to comment.