Skip to content

Commit

Permalink
SNOW-1846847 Add support for autocommit (#555)
Browse files Browse the repository at this point in the history
* Add support for autocommit
  • Loading branch information
sfc-gh-jvasquezrojas authored Dec 16, 2024
1 parent 0471c1f commit d3e40d1
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 0 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Source code is also available at:
- v1.7.1(December 02, 2024)
- Add support for partition by to copy into <location>
- Fix BOOLEAN type not found in snowdialect
- Add support for autocommit Isolation Level

- v1.7.0(November 21, 2024)
- Add support for dynamic tables and required options
Expand Down
34 changes: 34 additions & 0 deletions src/snowflake/sqlalchemy/snowdialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import operator
import re
from collections import defaultdict
from enum import Enum
from functools import reduce
from typing import Any, Collection, Optional
from urllib.parse import unquote_plus
Expand Down Expand Up @@ -59,6 +60,11 @@
_ENABLE_SQLALCHEMY_AS_APPLICATION_NAME = True


class SnowflakeIsolationLevel(Enum):
READ_COMMITTED = "READ COMMITTED"
AUTOCOMMIT = "AUTOCOMMIT"


class SnowflakeDialect(default.DefaultDialect):
name = DIALECT_NAME
driver = "snowflake"
Expand Down Expand Up @@ -139,6 +145,13 @@ class SnowflakeDialect(default.DefaultDialect):

supports_identity_columns = True

def __init__(
self,
isolation_level: Optional[str] = SnowflakeIsolationLevel.READ_COMMITTED.value,
**kwargs: Any,
):
super().__init__(isolation_level=isolation_level, **kwargs)

@classmethod
def dbapi(cls):
return cls.import_dbapi()
Expand Down Expand Up @@ -216,6 +229,27 @@ def has_table(self, connection, table_name, schema=None, **kw):
"""
return self._has_object(connection, "TABLE", table_name, schema)

def get_isolation_level_values(self, dbapi_connection):
return [
SnowflakeIsolationLevel.READ_COMMITTED.value,
SnowflakeIsolationLevel.AUTOCOMMIT.value,
]

def do_rollback(self, dbapi_connection):
dbapi_connection.rollback()

def do_commit(self, dbapi_connection):
dbapi_connection.commit()

def get_default_isolation_level(self, dbapi_conn):
return SnowflakeIsolationLevel.READ_COMMITTED.value

def set_isolation_level(self, dbapi_connection, level):
if level == SnowflakeIsolationLevel.AUTOCOMMIT.value:
dbapi_connection.autocommit(True)
else:
dbapi_connection.autocommit(False)

@reflection.cache
def has_sequence(self, connection, sequence_name, schema=None, **kw):
"""
Expand Down
157 changes: 157 additions & 0 deletions tests/test_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
#

from sqlalchemy import Column, Integer, MetaData, String, select, text

from snowflake.sqlalchemy import SnowflakeTable

CURRENT_TRANSACTION = text("SELECT CURRENT_TRANSACTION()")


def test_connect_read_commited(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_connect_read_commited"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="READ COMMITTED"
) as connection:
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] != (
None,
), "AUTOCOMMIT DISABLED, transaction should be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 0, results # No insert commited
assert_text_in_buf("ROLLBACK", occurrences=1)
finally:
metadata.drop_all(engine_testaccount)


def test_begin_read_commited(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_begin_read_commited"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="READ COMMITTED"
) as connection, connection.begin():
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] != (
None,
), "AUTOCOMMIT DISABLED, transaction should be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 1, results # Insert commited
assert_text_in_buf("COMMIT", occurrences=2)
finally:
metadata.drop_all(engine_testaccount)


def test_connect_autocommit(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_connect_autocommit"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as connection:
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (
None,
), "Autocommit enabled, transaction should not be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 1, results
assert_text_in_buf(
"ROLLBACK using DBAPI connection.rollback(), DBAPI should ignore due to autocommit mode",
occurrences=1,
)

finally:
metadata.drop_all(engine_testaccount)


def test_begin_autocommit(engine_testaccount, assert_text_in_buf):
metadata = MetaData()
table_name = "test_begin_autocommit"

test_table_1 = SnowflakeTable(
table_name,
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
cluster_by=["id", text("id > 5")],
)

metadata.create_all(engine_testaccount)
try:
with engine_testaccount.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as connection, connection.begin():
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (None,), result
ins = test_table_1.insert().values(id=1, name="test")
connection.execute(ins)
result = connection.execute(CURRENT_TRANSACTION).fetchall()
assert result[0] == (
None,
), "Autocommit enabled, transaction should not be started"

with engine_testaccount.connect() as conn:
s = select(test_table_1)
results = conn.execute(s).fetchall()
assert len(results) == 1, results
assert_text_in_buf(
"COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode",
occurrences=1,
)

finally:
metadata.drop_all(engine_testaccount)

0 comments on commit d3e40d1

Please sign in to comment.