Skip to content

Commit

Permalink
make unit tests work for bk
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Feb 8, 2023
1 parent 18ce05c commit 884306b
Showing 1 changed file with 41 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@
"password": os.getenv("SNOWFLAKE_BUILDKITE_PASSWORD", ""),
}

SHARED_BUILDKITE_SNOWFLAKE_CONF = {
"account": os.getenv("SNOWFLAKE_ACCOUNT", ""),
"user": "[email protected]",
"password": os.getenv("SNOWFLAKE_PASSWORD", ""),
}
# SHARED_BUILDKITE_SNOWFLAKE_CONF = {
# "account": os.getenv("SNOWFLAKE_ACCOUNT", ""),
# "user": "[email protected]",
# "password": os.getenv("SNOWFLAKE_PASSWORD", ""),
# }



Expand All @@ -65,8 +65,8 @@ def temporary_snowflake_table(schema_name: str, db_name: str, column_str: str) -
try:
yield table_name
finally:
pass
# conn.cursor().execute(f"drop table {schema_name}.{table_name}")
# pass
conn.cursor().execute(f"drop table {schema_name}.{table_name}")


def test_handle_output():
Expand Down Expand Up @@ -170,18 +170,18 @@ def io_manager_test_pipeline():
assert res.success


# @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
def test_io_manager_with_snowflake_pandas_timestamp_data():
# with temporary_snowflake_table(
# schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
# db_name="TEST_SNOWFLAKE_IO_MANAGER",
# column_str="foo string, date TIMESTAMP_NTZ(9)",
# ) as table_name:
with temporary_snowflake_table(
schema_name="JAMIE",
db_name="SANDBOX",
schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
db_name="TEST_SNOWFLAKE_IO_MANAGER",
column_str="foo string, date TIMESTAMP_NTZ(9)",
) as table_name:
# with temporary_snowflake_table(
# schema_name="JAMIE",
# db_name="SANDBOX",
# column_str="foo string, date TIMESTAMP_NTZ(9)",
# ) as table_name:
time_df = pandas.DataFrame(
{
"foo": ["bar", "baz"],
Expand All @@ -195,7 +195,7 @@ def test_io_manager_with_snowflake_pandas_timestamp_data():
@op(
out={
table_name: Out(
io_manager_key="snowflake", metadata={"schema": "JAMIE"}
io_manager_key="snowflake", metadata={"schema": "SNOWFLAKE_IO_MANAGER_SCHEMA"}
)
}
)
Expand All @@ -215,7 +215,7 @@ def read_time_df(df: pandas.DataFrame):
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": "SANDBOX",
"database": "TEST_SNOWFLAKE_IO_MANAGER",
}
}
}
Expand All @@ -228,18 +228,18 @@ def io_manager_timestamp_test_job():
assert res.success


# @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
def test_io_manager_with_snowflake_pandas_timestamp_data_error():
# with temporary_snowflake_table(
# schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
# db_name="TEST_SNOWFLAKE_IO_MANAGER",
# column_str="foo string, date TIMESTAMP_NTZ(9)",
# ) as table_name:
with temporary_snowflake_table(
schema_name="JAMIE",
db_name="SANDBOX",
schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
db_name="TEST_SNOWFLAKE_IO_MANAGER",
column_str="foo string, date TIMESTAMP_NTZ(9)",
) as table_name:
# with temporary_snowflake_table(
# schema_name="JAMIE",
# db_name="SANDBOX",
# column_str="foo string, date TIMESTAMP_NTZ(9)",
# ) as table_name:
time_df = pandas.DataFrame(
{
"foo": ["bar", "baz"],
Expand All @@ -253,7 +253,7 @@ def test_io_manager_with_snowflake_pandas_timestamp_data_error():
@op(
out={
table_name: Out(
io_manager_key="snowflake", metadata={"schema": "JAMIE"}
io_manager_key="snowflake", metadata={"schema": "SNOWFLAKE_IO_MANAGER_SCHEMA"}
)
}
)
Expand All @@ -273,7 +273,7 @@ def read_time_df(df: pandas.DataFrame):
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": "SANDBOX",
"database": "TEST_SNOWFLAKE_IO_MANAGER",
}
}
}
Expand All @@ -286,24 +286,24 @@ def io_manager_timestamp_test_job():
io_manager_timestamp_test_job.execute_in_process()


# @pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
def test_time_window_partitioned_asset(tmp_path):
# with temporary_snowflake_table(
# schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
# db_name="TEST_SNOWFLAKE_IO_MANAGER",
# column_str="TIME TIMESTAMP_NTZ(9), A string, B int",
# ) as table_name:
with temporary_snowflake_table(
schema_name="JAMIE",
db_name="SANDBOX",
schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
db_name="TEST_SNOWFLAKE_IO_MANAGER",
column_str="TIME TIMESTAMP_NTZ(9), A string, B int",
) as table_name:
# with temporary_snowflake_table(
# schema_name="JAMIE",
# db_name="SANDBOX",
# column_str="TIME TIMESTAMP_NTZ(9), A string, B int",
# ) as table_name:

@asset(
partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"),
metadata={"partition_expr": "time"},
config_schema={"value": str},
key_prefix="JAMIE",
key_prefix="SNOWFLAKE_IO_MANAGER_SCHEMA",
name=table_name,
)
def daily_partitioned(context):
Expand All @@ -318,14 +318,14 @@ def daily_partitioned(context):
}
)

# asset_full_name = f"SNOWFLAKE_IO_MANAGER_SCHEMA__{table_name}"
# snowflake_table_path = f"SNOWFLAKE_IO_MANAGER_SCHEMA.{table_name}"
asset_full_name = f"JAMIE__{table_name}"
snowflake_table_path = f"JAMIE.{table_name}"
asset_full_name = f"SNOWFLAKE_IO_MANAGER_SCHEMA__{table_name}"
snowflake_table_path = f"SNOWFLAKE_IO_MANAGER_SCHEMA.{table_name}"
# asset_full_name = f"JAMIE__{table_name}"
# snowflake_table_path = f"JAMIE.{table_name}"

snowflake_config = {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": "SANDBOX",
"database": "TEST_SNOWFLAKE_IO_MANAGER",
}
snowflake_conn = SnowflakeConnection(
snowflake_config, logging.getLogger("temporary_snowflake_table")
Expand Down

0 comments on commit 884306b

Please sign in to comment.