Skip to content

Commit

Permalink
SNOW-1476313: Wait for result with AsyncJob.result("no_result") (#1764)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-yixie authored Jun 13, 2024
1 parent f2b23f2 commit 31c18e0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

- Fixed a bug where python stored procedure with table return type fails when run in a task.
- Fixed a bug where df.dropna fails due to `RecursionError: maximum recursion depth exceeded` when the DataFrame has more than 500 columns.
- Fixed a bug where `AsyncJob.result("no_result")` doesn't wait for the query to finish execution.

### Snowpark Local Testing Updates

Expand Down
18 changes: 16 additions & 2 deletions src/snowflake/snowpark/async_job.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

import time
from enum import Enum
from logging import getLogger
from typing import TYPE_CHECKING, Iterator, List, Literal, Optional, Union

import snowflake.snowpark
from snowflake.connector.cursor import ASYNC_RETRY_PATTERN
from snowflake.connector.errors import DatabaseError
from snowflake.connector.options import pandas
from snowflake.snowpark._internal.analyzer.analyzer_utils import result_scan_statement
Expand Down Expand Up @@ -318,7 +319,7 @@ def result(
type of :meth:`DataFrame.collect`.
- "row_iterator": returns an iterator of :class:`Row` objects, which is the same as
the return type of :meth:`DataFrame.to_local_iterator`.
- "row": returns a ``pandas.DataFrame``, which is the same as the return type of
- "pandas": returns a ``pandas.DataFrame``, which is the same as the return type of
:meth:`DataFrame.to_pandas`.
- "pandas_batches": returns an iterator of ``pandas.DataFrame`` s, which is the same
as the return type of :meth:`DataFrame.to_pandas_batches`.
Expand Down Expand Up @@ -355,6 +356,19 @@ def result(
)

if async_result_type == _AsyncResultType.NO_RESULT:
# The following section is copied from python connector.
# Later we should expose it from python connector and reuse it.
retry_pattern_pos = 0
while True:
status = self._session.connection.get_query_status(self.query_id)
if not self._session.connection.is_still_running(status):
break
time.sleep(
0.5 * ASYNC_RETRY_PATTERN[retry_pattern_pos]
) # Same wait as JDBC
# If we can advance in ASYNC_RETRY_PATTERN then do so
if retry_pattern_pos < (len(ASYNC_RETRY_PATTERN) - 1):
retry_pattern_pos += 1
result = None
elif async_result_type == _AsyncResultType.PANDAS:
result = self._session._conn._to_data_or_iter(
Expand Down
9 changes: 9 additions & 0 deletions tests/integ/scala/test_async_job_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,12 @@ def test_async_job_to_df(session, create_async_job_from_query_id):
new_df = async_job.to_df()
assert "result_scan" in new_df.queries["queries"][0].lower()
Utils.check_answer(df, new_df)


def test_async_job_result_wait_no_result(session):
async_job = session.sql("select system$wait(3)").collect_nowait()
t0 = time()
result = async_job.result("no_result")
t1 = time()
assert t1 - t0 >= 3.0
assert result is None

0 comments on commit 31c18e0

Please sign in to comment.