From 2a121992f3a2561f01d8ab25165a38528d2919a0 Mon Sep 17 00:00:00 2001 From: Jianzhun Du <68252326+sfc-gh-jdu@users.noreply.github.com> Date: Wed, 4 Sep 2024 09:56:19 -0700 Subject: [PATCH] SNOW-1649210: Fix a bug where using `to_pandas_batches` with async jobs caused an error due to improper handling of waiting for asynchronous query completion. (#2211) 1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR. Fixes SNOW-1649210 2. Fill out the following pre-review checklist: - [x] I am adding a new automated test(s) to verify correctness of my new code - [ ] If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing - [ ] I am adding new logging messages - [ ] I am adding a new telemetry message - [ ] I am adding new credentials - [ ] I am adding a new dependency - [ ] If this is a new feature/behavior, I'm adding the Local Testing parity changes. 3. Please describe how your code solves the related issue. We should rely on connector's get_results_from_sfqid to wait for results instead of executing result scan directly in Snowpark. --- CHANGELOG.md | 8 ++++++++ .../snowpark/_internal/server_connection.py | 2 +- tests/integ/scala/test_async_job_suite.py | 13 +++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88b091ad114..7ca0d402ecc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Release History +## 1.21.1 (2024-09-05) + +### Snowpark Python API Updates + +#### Bug Fixes + +- Fixed a bug where using `to_pandas_batches` with async jobs caused an error due to improper handling of waiting for asynchronous query completion. + ## 1.21.0 (2024-08-19) ### Snowpark Python API Updates diff --git a/src/snowflake/snowpark/_internal/server_connection.py b/src/snowflake/snowpark/_internal/server_connection.py index 22a394cda0f..498a1fe406d 100644 --- a/src/snowflake/snowpark/_internal/server_connection.py +++ b/src/snowflake/snowpark/_internal/server_connection.py @@ -465,7 +465,7 @@ def _to_data_or_iter( qid = results_cursor.sfqid if to_iter: new_cursor = results_cursor.connection.cursor() - new_cursor.execute(f"SELECT * FROM TABLE(RESULT_SCAN('{qid}'))") + new_cursor.get_results_from_sfqid(qid) results_cursor = new_cursor if to_pandas: diff --git a/tests/integ/scala/test_async_job_suite.py b/tests/integ/scala/test_async_job_suite.py index 6c7027d8d7d..798196e1017 100644 --- a/tests/integ/scala/test_async_job_suite.py +++ b/tests/integ/scala/test_async_job_suite.py @@ -487,3 +487,16 @@ def test_async_job_result_wait_no_result(session): t1 = time() assert t1 - t0 >= 3.0 assert result is None + + +@pytest.mark.parametrize( + "action", + [ + lambda df: df.to_local_iterator(block=False), + lambda df: df.to_pandas_batches(block=False), + ], +) +def test_iter_cursor_wait_for_result(session, action): + df = session.sql("call system$wait(5)") + async_job = action(df) + assert async_job.result() is not None