-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1478982: Create table with Timestamp? #506
Comments
Any info on this? |
hi; i experimented with the permutations of the ..
def theTime(engine):
connection = engine.connect()
return connection.execute("SELECT CURRENT_TIMESTAMP();")
..
TABLE_ID = Column(Integer, primary_key=True, autoincrement=True)
TABLE_TIMESTAMP_ONE = Column(DateTime, default=datetime.now)
TABLE_TIMESTAMP_TWO = Column(DateTime, default=func.now()) # func.CURRENT_TIMESTAMP() also doesn't work
TABLE_TIMESTAMP_THREE = Column(DateTime, default=theTime(engine)) but none of them seem to populate their cells upon inserting a value in then tried
which at least allowed me to proceed to the next error, the SQL generated was CREATE TABLE sqlalchemy506 (
"TABLE_ID" INTEGER NOT NULL AUTOINCREMENT,
"TABLE_TIMESTAMP_ONE" datetime DEFAULT 'CURRENT_TIMESTAMP',
PRIMARY KEY ("TABLE_ID")
) failed to execute on the Snowflake backend, due to
which makes sense because 860 default = self.get_column_default_string(column)
861 if default is not None:
862 defaulttrimmed = default.replace("'", "").replace('"', '')
863 colspec.append("DEFAULT " + defaulttrimmed)
864 #colspec.append("DEFAULT " + default) which when running the script (below), resulted in SQL CREATE TABLE sqlalchemy506 (
"TABLE_ID" INTEGER NOT NULL AUTOINCREMENT,
"TIMESTAMP_ONE" datetime DEFAULT CURRENT_TIMESTAMP,
"STRING_ONE" VARCHAR DEFAULT CURRENT_USER,
PRIMARY KEY ("TABLE_ID")
) and when inserting a value in the table which has been just created, it populates with the expected values: insert into test_db.public.sqlalchemy506 (table_id) values (1);
select * from test_db.public.sqlalchemy506;
TABLE_ID TIMESTAMP_ONE STRING_ONE
1 2024-06-27 06:46:33.994 ADMIN now, i'm not sure if this is how it's supposed to work and if i'm doing it wrong in the first place with how finally the script i used: # cat test.py
from sqlalchemy import Column, DateTime, Integer, Text, String, select, func, DefaultClause
from sqlalchemy.orm import declarative_base
from sqlalchemy import create_engine
from datetime import datetime
from os import environ
Base = declarative_base()
engine = create_engine(
'snowflake://{user}:{password}@{account_identifier}/{database}/{schema}?warehouse={warehouse}'.format(
account_identifier=environ["SFACCOUNT"],
user=environ["SFUSER"],
password=environ["SFPASS"],
database=environ["SFDB"],
warehouse=environ["SFWH"],
schema=environ["SFSCHEMA"]
))
def theTime(engine):
connection = engine.connect()
return connection.execute("SELECT CURRENT_TIMESTAMP();")
class My_Table(Base):
__tablename__ = 'sqlalchemy506'
TABLE_ID = Column(Integer, primary_key=True, autoincrement=True)
TIMESTAMP_ONE = Column(DateTime, server_default=DefaultClause("CURRENT_TIMESTAMP"))
STRING_ONE = Column(String, server_default=DefaultClause("CURRENT_USER"))
#TABLE_TIMESTAMP_ONE = Column(DateTime, default=datetime.now)
#TABLE_TIMESTAMP_TWO = Column(DateTime, default=func.now())
#TABLE_TIMESTAMP_THREE = Column(DateTime, default=theTime(engine))
def main(engine):
Base.metadata.create_all(engine)
if __name__ == "__main__":
main(engine) |
okay, apparently my approach was not correct. My colleague shared the folowing solution: from sqlalchemy import (
Column,
Integer,
String,
func,
)
from sqlalchemy.orm import Session, declarative_base
from snowflake.sqlalchemy import TIMESTAMP_NTZ
Base = declarative_base()
class TWTS(Base):
__tablename__ = "table_with_timestamp"
pk = Column(Integer, primary_key=True)
name = Column(String(30))
created = Column(TIMESTAMP_NTZ, server_default=func.now())
def __repr__(self) -> str:
return f"TWTS({self.pk=}, {self.name=}, {self.created=})"
Base.metadata.create_all(engine_testaccount)
session = Session(bind=engine_testaccount)
r1 = TWTS(pk=1, name="edward")
r2 = TWTS(pk=2, name="eddy")
assert r1.created is None
assert r2.created is None
session.add(r1)
session.add(r2)
session.commit()
rows = session.query(TWTS).all()
assert len(rows) == 2
for row in rows:
print(row) hope it helps! |
closing it for now as there's been a solution provided and also a PR for adding the example, but please do comment if you need further help |
I am creating table definitions with SqlAlchemy in Python. I am able to successfully create a table with primary key which autoincrements and some other fields. However, I haven't been able to find any documentation on how to add a field with a timestamp which will be populated when a new record is added. For reference, this is how you would create this field directly in a Snowflake worksheet.
This is my Python code...
Can someone let me know how I could do this?
The text was updated successfully, but these errors were encountered: