Skip to content

Commit

Permalink
SNOW-1058245-sqlalchemy-20-support: refactor autoincrement test - part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mraba committed Feb 21, 2024
1 parent 5c1d232 commit edfb780
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 87 deletions.
53 changes: 26 additions & 27 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1435,37 +1435,36 @@ def test_autoincrement(engine_testaccount):
Column("name", String(39)),
)

insert_stmt = insert(users)
select_stmt = select(users.c.name)

try:
metadata.create_all(engine_testaccount)

with engine_testaccount.connect() as connection:
insert_stmt = insert(users)
select_stmt = select(users.c.name)

with connection.begin():
connection.execute(insert_stmt, ({"name": "sf1"}))
result = connection.execute(select_stmt).all()
assert result == [("sf1",)], result

connection.execute(insert_stmt, ({"name": "sf2"}, {"name": "sf3"}))
result = connection.execute(select_stmt).all()
assert result == [("sf1",), ("sf2",), ("sf3",)], result

connection.execute(insert_stmt, ({"name": "sf4"}))
result = connection.execute(select_stmt).all()
assert result == [("sf1",), ("sf2",), ("sf3",), ("sf4",)], result

seq = Sequence("id_seq")
nextid = connection.execute(seq)
connection.execute(insert_stmt, ({"uid": nextid, "name": "sf5"}))
result = connection.execute(select_stmt).all()
assert result == [
("sf1",),
("sf2",),
("sf3",),
("sf4",),
("sf5",),
], result
connection.execute(insert_stmt, ({"name": "sf1"}))
result = connection.execute(select_stmt).all()
assert result == [("sf1",)], result

connection.execute(insert_stmt, ({"name": "sf2"}, {"name": "sf3"}))
result = connection.execute(select_stmt).all()
assert result == [("sf1",), ("sf2",), ("sf3",)], result

connection.execute(insert_stmt, ({"name": "sf4"}))
result = connection.execute(select_stmt).all()
assert result == [("sf1",), ("sf2",), ("sf3",), ("sf4",)], result

seq = Sequence("id_seq")
nextid = connection.execute(seq)
connection.execute(insert_stmt, ({"uid": nextid, "name": "sf5"}))
result = connection.execute(select_stmt).all()
assert result == [
("sf1",),
("sf2",),
("sf3",),
("sf4",),
("sf5",),
], result

finally:
metadata.drop_all(engine_testaccount)
Expand Down
168 changes: 108 additions & 60 deletions tests/test_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,89 +2,137 @@
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from sqlalchemy import Column, Integer, MetaData, Sequence, String, Table, select
from sqlalchemy import (
Column,
Integer,
MetaData,
Sequence,
String,
Table,
insert,
select,
)


def test_table_with_sequence(engine_testaccount, db_parameters):
def test_table_with_sequence(engine_testaccount):
"""Snowflake does not guarantee generating sequence numbers without gaps.
The generated numbers are not necesairly contigous.
https://docs.snowflake.com/en/user-guide/querying-sequences
"""
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/124
test_table_name = "sequence"
test_sequence_name = f"{test_table_name}_id_seq"
metadata = MetaData()

sequence_table = Table(
test_table_name,
MetaData(),
metadata,
Column("id", Integer, Sequence(test_sequence_name), primary_key=True),
Column("data", String(39)),
)
sequence_table.create(engine_testaccount)

metadata.create_all(engine_testaccount)
seq = Sequence(test_sequence_name)

sequence_insert_stmt = insert(sequence_table)
sequence_select_stmt = select(sequence_table.c.data)

second_metadata = MetaData()
autoload_sequence_table = Table(
test_table_name,
second_metadata,
autoload_with=engine_testaccount,
)

try:
with engine_testaccount.connect() as conn:
with conn.begin():
conn.execute(sequence_table.insert(), [{"data": "test_insert_1"}])
select_stmt = select(sequence_table).order_by("id")
result = conn.execute(select_stmt).fetchall()
assert result == [(1, "test_insert_1")]
autoload_sequence_table = Table(
test_table_name, MetaData(), autoload_with=engine_testaccount
)
conn.execute(
autoload_sequence_table.insert(),
[{"data": "multi_insert_1"}, {"data": "multi_insert_2"}],
)
conn.execute(
autoload_sequence_table.insert(), [{"data": "test_insert_2"}]
)
nextid = conn.execute(seq)
conn.execute(
autoload_sequence_table.insert(),
[{"id": nextid, "data": "test_insert_seq"}],
)
result = conn.execute(select_stmt).fetchall()
assert result == [
(1, "test_insert_1"),
(2, "multi_insert_1"),
(3, "multi_insert_2"),
(4, "test_insert_2"),
(5, "test_insert_seq"),
]
conn.execute(sequence_insert_stmt, ({"data": "test_insert_1"}))
result = conn.execute(sequence_select_stmt).fetchall()
assert result == [("test_insert_1",)]

autoload_sequence_table = Table(
test_table_name,
second_metadata,
autoload_with=engine_testaccount,
)

conn.execute(
insert(autoload_sequence_table),
[
{"data": "multi_insert_1"},
{"data": "multi_insert_2"},
],
)
conn.execute(
insert(autoload_sequence_table),
[
{"data": "test_insert_2"},
],
)
nextid = conn.execute(seq)
conn.execute(
insert(autoload_sequence_table),
[{"id": nextid, "data": "test_insert_seq"}],
)
conn.commit()
result = conn.execute(sequence_select_stmt).fetchall()
assert result == [
("test_insert_1",),
("multi_insert_1",),
("multi_insert_2",),
("test_insert_2",),
("test_insert_seq",),
], result

finally:
sequence_table.drop(engine_testaccount)
seq.drop(engine_testaccount)
metadata.drop_all(engine_testaccount)


def test_table_with_autoincrement(engine_testaccount):
"""Snowflake does not guarantee generating sequence numbers without gaps.
def test_table_with_autoincrement(engine_testaccount, db_parameters):
The generated numbers are not necesairly contigous.
https://docs.snowflake.com/en/user-guide/querying-sequences
"""
# https://github.com/snowflakedb/snowflake-sqlalchemy/issues/124
test_table_name = "sequence"
metadata = MetaData()
autoincrement_table = Table(
test_table_name,
MetaData(),
metadata,
Column("id", Integer, autoincrement=True, primary_key=True),
Column("data", String(39)),
)
autoincrement_table.create(engine_testaccount)
metadata.create_all(engine_testaccount)

select_stmt = select(autoincrement_table.c.data)

try:
with engine_testaccount.connect() as conn:
with conn.begin():
conn.execute(autoincrement_table.insert(), [{"data": "test_insert_1"}])
select_stmt = select(autoincrement_table).order_by("id")
result = conn.execute(select_stmt).fetchall()
assert result == [(1, "test_insert_1")]
autoload_sequence_table = Table(
test_table_name, MetaData(), autoload_with=engine_testaccount
)
conn.execute(
autoload_sequence_table.insert(),
[{"data": "multi_insert_1"}, {"data": "multi_insert_2"}],
)
conn.execute(
autoload_sequence_table.insert(), [{"data": "test_insert_2"}]
)
result = conn.execute(select_stmt).fetchall()
assert result == [
(1, "test_insert_1"),
(2, "multi_insert_1"),
(3, "multi_insert_2"),
(4, "test_insert_2"),
]
conn.execute(insert(autoincrement_table), ({"data": "test_insert_1"}))
result = conn.execute(select_stmt).fetchall()
assert result == [("test_insert_1",)], result

autoload_sequence_table = Table(
test_table_name,
MetaData(),
autoload_with=engine_testaccount,
)
conn.execute(
insert(autoload_sequence_table),
[
{"data": "multi_insert_1"},
{"data": "multi_insert_2"},
],
)
conn.execute(insert(autoload_sequence_table), [{"data": "test_insert_2"}])
result = conn.execute(select_stmt).fetchall()
assert result == [
("test_insert_1",),
("multi_insert_1",),
("multi_insert_2",),
("test_insert_2",),
]
finally:
autoincrement_table.drop(engine_testaccount)
metadata.drop_all(engine_testaccount)

0 comments on commit edfb780

Please sign in to comment.