From 31c18e0a20fa4f1a66b6fbb8957bf9ebddabf25f Mon Sep 17 00:00:00 2001 From: Yijun Xie Date: Wed, 12 Jun 2024 23:52:24 -0700 Subject: [PATCH] SNOW-1476313: Wait for result with AsyncJob.result("no_result") (#1764) --- CHANGELOG.md | 1 + src/snowflake/snowpark/async_job.py | 18 ++++++++++++++++-- tests/integ/scala/test_async_job_suite.py | 9 +++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f242c881d5..42b2f878c45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Fixed a bug where python stored procedure with table return type fails when run in a task. - Fixed a bug where df.dropna fails due to `RecursionError: maximum recursion depth exceeded` when the DataFrame has more than 500 columns. +- Fixed a bug where `AsyncJob.result("no_result")` doesn't wait for the query to finish execution. ### Snowpark Local Testing Updates diff --git a/src/snowflake/snowpark/async_job.py b/src/snowflake/snowpark/async_job.py index 48f804bae98..1adebcc343e 100644 --- a/src/snowflake/snowpark/async_job.py +++ b/src/snowflake/snowpark/async_job.py @@ -1,12 +1,13 @@ # # Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. # - +import time from enum import Enum from logging import getLogger from typing import TYPE_CHECKING, Iterator, List, Literal, Optional, Union import snowflake.snowpark +from snowflake.connector.cursor import ASYNC_RETRY_PATTERN from snowflake.connector.errors import DatabaseError from snowflake.connector.options import pandas from snowflake.snowpark._internal.analyzer.analyzer_utils import result_scan_statement @@ -318,7 +319,7 @@ def result( type of :meth:`DataFrame.collect`. - "row_iterator": returns an iterator of :class:`Row` objects, which is the same as the return type of :meth:`DataFrame.to_local_iterator`. - - "row": returns a ``pandas.DataFrame``, which is the same as the return type of + - "pandas": returns a ``pandas.DataFrame``, which is the same as the return type of :meth:`DataFrame.to_pandas`. - "pandas_batches": returns an iterator of ``pandas.DataFrame`` s, which is the same as the return type of :meth:`DataFrame.to_pandas_batches`. @@ -355,6 +356,19 @@ def result( ) if async_result_type == _AsyncResultType.NO_RESULT: + # The following section is copied from python connector. + # Later we should expose it from python connector and reuse it. + retry_pattern_pos = 0 + while True: + status = self._session.connection.get_query_status(self.query_id) + if not self._session.connection.is_still_running(status): + break + time.sleep( + 0.5 * ASYNC_RETRY_PATTERN[retry_pattern_pos] + ) # Same wait as JDBC + # If we can advance in ASYNC_RETRY_PATTERN then do so + if retry_pattern_pos < (len(ASYNC_RETRY_PATTERN) - 1): + retry_pattern_pos += 1 result = None elif async_result_type == _AsyncResultType.PANDAS: result = self._session._conn._to_data_or_iter( diff --git a/tests/integ/scala/test_async_job_suite.py b/tests/integ/scala/test_async_job_suite.py index e78f7f8ae69..e03cc836272 100644 --- a/tests/integ/scala/test_async_job_suite.py +++ b/tests/integ/scala/test_async_job_suite.py @@ -473,3 +473,12 @@ def test_async_job_to_df(session, create_async_job_from_query_id): new_df = async_job.to_df() assert "result_scan" in new_df.queries["queries"][0].lower() Utils.check_answer(df, new_df) + + +def test_async_job_result_wait_no_result(session): + async_job = session.sql("select system$wait(3)").collect_nowait() + t0 = time() + result = async_job.result("no_result") + t1 = time() + assert t1 - t0 >= 3.0 + assert result is None