Skip to content

Commit

Permalink
SNOW-1546396: use scoped temp tables for large query breakdown (#2197)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.
   
Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1546396

2. Fill out the following pre-review checklist:

- [ ] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.

3. Please describe how your code solves the related issue.

   Use scoped temp table for large query breakdown.
  • Loading branch information
sfc-gh-aalam authored Aug 30, 2024
1 parent 200c485 commit 60ec43a
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
8 changes: 6 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,9 @@ def create_table_as_select_statement(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
*,
use_scoped_temp_objects: bool = False,
is_generated: bool = False,
) -> str:
column_definition_sql = (
f"{LEFT_PARENTHESIS}{column_definition}{RIGHT_PARENTHESIS}"
Expand All @@ -877,8 +880,9 @@ def create_table_as_select_statement(
}
)
return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING} {table_type.upper()} {TABLE}"
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{TABLE}{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
)
Expand Down
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ def get_create_table_as_select_plan(child: SnowflakePlan, replace, error):
max_data_extension_time=max_data_extension_time,
change_tracking=change_tracking,
copy_grants=copy_grants,
use_scoped_temp_objects=use_scoped_temp_objects,
is_generated=is_generated,
),
child,
source_plan,
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/compiler/test_query_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_table_create_from_large_query_breakdown(session, plan_source_generator)

assert (
queries[PlanQueryType.QUERIES][0].sql
== f" CREATE TEMP TABLE {table_name} AS SELECT * FROM (select 1 as a, 2 as b)"
== f" CREATE SCOPED TEMPORARY TABLE {table_name} AS SELECT * FROM (select 1 as a, 2 as b)"
)


Expand Down
6 changes: 4 additions & 2 deletions tests/integ/scala/test_snowflake_plan_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def test_create_scoped_temp_table(session):
)
.queries[0]
.sql
== f" CREATE TEMP TABLE {temp_table_name} AS SELECT * FROM ( SELECT * FROM ({table_name}))"
== f" CREATE TEMPORARY TABLE {temp_table_name} AS SELECT * FROM ( SELECT * FROM ({table_name}))"
)
expected_sql = f' CREATE TEMPORARY TABLE {temp_table_name}("NUM" BIGINT, "STR" STRING(8))'
assert expected_sql in (
Expand All @@ -342,7 +342,9 @@ def test_create_scoped_temp_table(session):
.queries[0]
.sql
)
expected_sql = f" CREATE TEMPORARY TABLE {temp_table_name} AS SELECT"
expected_sql = (
f" CREATE SCOPED TEMPORARY TABLE {temp_table_name} AS SELECT"
)
assert expected_sql in (
session._plan_builder.save_as_table(
table_name=[temp_table_name],
Expand Down
34 changes: 21 additions & 13 deletions tests/integ/test_large_query_breakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_large_query_breakdown_with_cte_optimization(session):
check_result_with_and_without_breakdown(session, df4)

assert len(df4.queries["queries"]) == 2
assert df4.queries["queries"][0].startswith("CREATE TEMP TABLE")
assert df4.queries["queries"][0].startswith("CREATE SCOPED TEMPORARY TABLE")
assert df4.queries["queries"][1].startswith("WITH SNOWPARK_TEMP_CTE_")

assert len(df4.queries["post_actions"]) == 1
Expand All @@ -115,7 +115,7 @@ def test_save_as_table(session, large_query_df):

assert len(history.queries) == 4
assert history.queries[0].sql_text == "SELECT CURRENT_TRANSACTION()"
assert history.queries[1].sql_text.startswith("CREATE TEMP TABLE")
assert history.queries[1].sql_text.startswith("CREATE SCOPED TEMPORARY TABLE")
assert history.queries[2].sql_text.startswith(
f"CREATE OR REPLACE TABLE {table_name}"
)
Expand All @@ -135,7 +135,7 @@ def test_update_delete_merge(session, large_query_df):
t.update({"B": 0}, t.a == large_query_df.a, large_query_df)
assert len(history.queries) == 4
assert history.queries[0].sql_text == "SELECT CURRENT_TRANSACTION()"
assert history.queries[1].sql_text.startswith("CREATE TEMP TABLE")
assert history.queries[1].sql_text.startswith("CREATE SCOPED TEMPORARY TABLE")
assert history.queries[2].sql_text.startswith(f"UPDATE {table_name}")
assert history.queries[3].sql_text.startswith("DROP TABLE If EXISTS")

Expand All @@ -144,7 +144,7 @@ def test_update_delete_merge(session, large_query_df):
t.delete(t.a == large_query_df.a, large_query_df)
assert len(history.queries) == 4
assert history.queries[0].sql_text == "SELECT CURRENT_TRANSACTION()"
assert history.queries[1].sql_text.startswith("CREATE TEMP TABLE")
assert history.queries[1].sql_text.startswith("CREATE SCOPED TEMPORARY TABLE")
assert history.queries[2].sql_text.startswith(f"DELETE FROM {table_name} USING")
assert history.queries[3].sql_text.startswith("DROP TABLE If EXISTS")

Expand All @@ -157,7 +157,7 @@ def test_update_delete_merge(session, large_query_df):
)
assert len(history.queries) == 4
assert history.queries[0].sql_text == "SELECT CURRENT_TRANSACTION()"
assert history.queries[1].sql_text.startswith("CREATE TEMP TABLE")
assert history.queries[1].sql_text.startswith("CREATE SCOPED TEMPORARY TABLE")
assert history.queries[2].sql_text.startswith(f"MERGE INTO {table_name} USING")
assert history.queries[3].sql_text.startswith("DROP TABLE If EXISTS")

Expand All @@ -176,7 +176,7 @@ def test_copy_into_location(session, large_query_df):
)
assert len(history.queries) == 4, history.queries
assert history.queries[0].sql_text == "SELECT CURRENT_TRANSACTION()"
assert history.queries[1].sql_text.startswith("CREATE TEMP TABLE")
assert history.queries[1].sql_text.startswith("CREATE SCOPED TEMPORARY TABLE")
assert history.queries[2].sql_text.startswith(f"COPY INTO '{remote_file_path}'")
assert history.queries[3].sql_text.startswith("DROP TABLE If EXISTS")

Expand Down Expand Up @@ -215,7 +215,7 @@ def test_pivot_unpivot(session):

plan_queries = final_df.queries
assert len(plan_queries["queries"]) == 2
assert plan_queries["queries"][0].startswith("CREATE TEMP TABLE")
assert plan_queries["queries"][0].startswith("CREATE SCOPED TEMPORARY TABLE")

assert len(plan_queries["post_actions"]) == 1
assert plan_queries["post_actions"][0].startswith("DROP TABLE If EXISTS")
Expand All @@ -239,7 +239,7 @@ def test_sort(session):

plan_queries = final_df.queries
assert len(plan_queries["queries"]) == 2
assert plan_queries["queries"][0].startswith("CREATE TEMP TABLE")
assert plan_queries["queries"][0].startswith("CREATE SCOPED TEMPORARY TABLE")

assert len(plan_queries["post_actions"]) == 1
assert plan_queries["post_actions"][0].startswith("DROP TABLE If EXISTS")
Expand Down Expand Up @@ -283,7 +283,7 @@ def test_multiple_query_plan(session, large_query_df):
"CREATE OR REPLACE SCOPED TEMPORARY TABLE"
)
assert plan_queries["queries"][1].startswith("INSERT INTO")
assert plan_queries["queries"][2].startswith("CREATE TEMP TABLE")
assert plan_queries["queries"][2].startswith("CREATE SCOPED TEMPORARY TABLE")

assert len(plan_queries["post_actions"]) == 2
for query in plan_queries["post_actions"]:
Expand Down Expand Up @@ -349,7 +349,9 @@ def test_async_job_with_large_query_breakdown(session, large_query_df):
result = job.result()
assert result == large_query_df.collect()
assert len(large_query_df.queries["queries"]) == 2
assert large_query_df.queries["queries"][0].startswith("CREATE TEMP TABLE")
assert large_query_df.queries["queries"][0].startswith(
"CREATE SCOPED TEMPORARY TABLE"
)

assert len(large_query_df.queries["post_actions"]) == 1
assert large_query_df.queries["post_actions"][0].startswith(
Expand All @@ -365,7 +367,9 @@ def test_complexity_bounds_affect_num_partitions(session, large_query_df):
session._large_query_breakdown_enabled = True
assert len(large_query_df.queries["queries"]) == 2
assert len(large_query_df.queries["post_actions"]) == 1
assert large_query_df.queries["queries"][0].startswith("CREATE TEMP TABLE")
assert large_query_df.queries["queries"][0].startswith(
"CREATE SCOPED TEMPORARY TABLE"
)
assert large_query_df.queries["post_actions"][0].startswith(
"DROP TABLE If EXISTS"
)
Expand All @@ -374,8 +378,12 @@ def test_complexity_bounds_affect_num_partitions(session, large_query_df):
session._large_query_breakdown_enabled = True
assert len(large_query_df.queries["queries"]) == 3
assert len(large_query_df.queries["post_actions"]) == 2
assert large_query_df.queries["queries"][0].startswith("CREATE TEMP TABLE")
assert large_query_df.queries["queries"][1].startswith("CREATE TEMP TABLE")
assert large_query_df.queries["queries"][0].startswith(
"CREATE SCOPED TEMPORARY TABLE"
)
assert large_query_df.queries["queries"][1].startswith(
"CREATE SCOPED TEMPORARY TABLE"
)
assert large_query_df.queries["post_actions"][0].startswith(
"DROP TABLE If EXISTS"
)
Expand Down

0 comments on commit 60ec43a

Please sign in to comment.