From 1707707fd6ddc5e7fd1f0fb8967d19660a1e5bac Mon Sep 17 00:00:00 2001 From: Jianzhun Du Date: Thu, 12 Sep 2024 13:56:30 -0700 Subject: [PATCH 1/3] d --- src/snowflake/snowpark/_internal/telemetry.py | 52 ++++++++ .../_internal/temp_table_auto_cleaner.py | 89 ++++++------- src/snowflake/snowpark/session.py | 17 +-- tests/integ/test_temp_table_cleanup.py | 117 ++++++++++++------ tests/unit/test_session.py | 1 + 5 files changed, 177 insertions(+), 99 deletions(-) diff --git a/src/snowflake/snowpark/_internal/telemetry.py b/src/snowflake/snowpark/_internal/telemetry.py index 8b9ef2acccb..aef60828334 100644 --- a/src/snowflake/snowpark/_internal/telemetry.py +++ b/src/snowflake/snowpark/_internal/telemetry.py @@ -79,6 +79,20 @@ class TelemetryField(Enum): QUERY_PLAN_HEIGHT = "query_plan_height" QUERY_PLAN_NUM_DUPLICATE_NODES = "query_plan_num_duplicate_nodes" QUERY_PLAN_COMPLEXITY = "query_plan_complexity" + # temp table cleanup + TYPE_TEMP_TABLE_CLEANUP = "snowpark_temp_table_cleanup" + NUM_TEMP_TABLES_CLEANED = "num_temp_tables_cleaned" + NUM_TEMP_TABLES_CREATED = "num_temp_tables_created" + TEMP_TABLE_CLEANER_ENABLED = "temp_table_cleaner_enabled" + TYPE_TEMP_TABLE_CLEANUP_ABNORMAL_EXCEPTION = ( + "snowpark_temp_table_cleanup_abnormal_exception" + ) + TEMP_TABLE_CLEANUP_ABNORMAL_EXCEPTION_TABLE_NAME = ( + "temp_table_cleanup_abnormal_exception_table_name" + ) + TEMP_TABLE_CLEANUP_ABNORMAL_EXCEPTION_MESSAGE = ( + "temp_table_cleanup_abnormal_exception_message" + ) # These DataFrame APIs call other DataFrame APIs @@ -464,3 +478,41 @@ def send_large_query_optimization_skipped_telemetry( }, } self.send(message) + + def send_temp_table_cleanup_telemetry( + self, + session_id: str, + temp_table_cleaner_enabled: bool, + num_temp_tables_cleaned: int, + num_temp_tables_created: int, + ) -> None: + message = { + **self._create_basic_telemetry_data( + TelemetryField.TYPE_TEMP_TABLE_CLEANUP.value + ), + TelemetryField.KEY_DATA.value: { + TelemetryField.SESSION_ID.value: session_id, + TelemetryField.TEMP_TABLE_CLEANER_ENABLED.value: temp_table_cleaner_enabled, + TelemetryField.NUM_TEMP_TABLES_CLEANED.value: num_temp_tables_cleaned, + TelemetryField.NUM_TEMP_TABLES_CREATED.value: num_temp_tables_created, + }, + } + self.send(message) + + def send_temp_table_cleanup_abnormal_exception_telemetry( + self, + session_id: str, + table_name: str, + exception_message: str, + ) -> None: + message = { + **self._create_basic_telemetry_data( + TelemetryField.TYPE_TEMP_TABLE_CLEANUP_ABNORMAL_EXCEPTION.value + ), + TelemetryField.KEY_DATA.value: { + TelemetryField.SESSION_ID.value: session_id, + TelemetryField.TEMP_TABLE_CLEANUP_ABNORMAL_EXCEPTION_TABLE_NAME.value: table_name, + TelemetryField.TEMP_TABLE_CLEANUP_ABNORMAL_EXCEPTION_MESSAGE.value: exception_message, + }, + } + self.send(message) diff --git a/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py b/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py index b9055c6fc58..4fa17498d34 100644 --- a/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py +++ b/src/snowflake/snowpark/_internal/temp_table_auto_cleaner.py @@ -4,9 +4,7 @@ import logging import weakref from collections import defaultdict -from queue import Empty, Queue -from threading import Event, Thread -from typing import TYPE_CHECKING, Dict, Optional +from typing import TYPE_CHECKING, Dict from snowflake.snowpark._internal.analyzer.snowflake_plan_node import SnowflakeTable @@ -33,12 +31,6 @@ def __init__(self, session: "Session") -> None: # to its reference count for later temp table management # this dict will still be maintained even if the cleaner is stopped (`stop()` is called) self.ref_count_map: Dict[str, int] = defaultdict(int) - # unused temp table will be put into the queue for cleanup - self.queue: Queue = Queue() - # thread for removing temp tables (running DROP TABLE sql) - self.cleanup_thread: Optional[Thread] = None - # An event managing a flag that indicates whether the cleaner is started - self.stop_event = Event() def add(self, table: SnowflakeTable) -> None: self.ref_count_map[table.name] += 1 @@ -46,61 +38,60 @@ def add(self, table: SnowflakeTable) -> None: # and this table will be dropped finally _ = weakref.finalize(table, self._delete_ref_count, table.name) - def _delete_ref_count(self, name: str) -> None: + def _delete_ref_count(self, name: str) -> None: # pragma: no cover """ Decrements the reference count of a temporary table, and if the count reaches zero, puts this table in the queue for cleanup. """ self.ref_count_map[name] -= 1 if self.ref_count_map[name] == 0: - self.ref_count_map.pop(name) - # clean up - self.queue.put(name) + if self.session.auto_clean_up_temp_table_enabled: + self.drop_table(name) elif self.ref_count_map[name] < 0: logging.debug( f"Unexpected reference count {self.ref_count_map[name]} for table {name}" ) - def process_cleanup(self) -> None: - while not self.stop_event.is_set(): - try: - # it's non-blocking after timeout and become interruptable with stop_event - # it will raise an `Empty` exception if queue is empty after timeout, - # then we catch this exception and avoid breaking loop - table_name = self.queue.get(timeout=1) - self.drop_table(table_name) - except Empty: - continue - - def drop_table(self, name: str) -> None: + def drop_table(self, name: str) -> None: # pragma: no cover common_log_text = f"temp table {name} in session {self.session.session_id}" - logging.debug(f"Cleanup Thread: Ready to drop {common_log_text}") + logging.debug(f"Ready to drop {common_log_text}") + query_id = None try: - # TODO SNOW-1556553: Remove this workaround once multi-threading of Snowpark session is supported - with self.session._conn._conn.cursor() as cursor: - cursor.execute( - f"drop table if exists {name} /* internal query to drop unused temp table */", - _statement_params={DROP_TABLE_STATEMENT_PARAM_NAME: name}, + async_job = self.session.sql( + f"drop table if exists {name} /* internal query to drop unused temp table */", + )._internal_collect_with_tag_no_telemetry( + block=False, statement_params={DROP_TABLE_STATEMENT_PARAM_NAME: name} + ) + query_id = async_job.query_id + logging.debug(f"Dropping {common_log_text} with query id {query_id}") + except Exception as ex: # pragma: no cover + warning_message = f"Failed to drop {common_log_text}, exception: {ex}" + logging.warning(warning_message) + if query_id is None: + # If no query_id is available, it means the query haven't been accepted by gs, + # and it won't occur in our job_etl_view, send a separate telemetry for recording. + self.session._conn._telemetry_client.send_temp_table_cleanup_abnormal_exception_telemetry( + self.session.session_id, + name, + str(ex), ) - logging.debug(f"Cleanup Thread: Successfully dropped {common_log_text}") - except Exception as ex: - logging.warning( - f"Cleanup Thread: Failed to drop {common_log_text}, exception: {ex}" - ) # pragma: no cover - - def is_alive(self) -> bool: - return self.cleanup_thread is not None and self.cleanup_thread.is_alive() - - def start(self) -> None: - self.stop_event.clear() - if not self.is_alive(): - self.cleanup_thread = Thread(target=self.process_cleanup) - self.cleanup_thread.start() def stop(self) -> None: """ - The cleaner will stop immediately and leave unfinished temp tables in the queue. + Stops the cleaner (no-op) and sends the telemetry. """ - self.stop_event.set() - if self.is_alive(): - self.cleanup_thread.join() + self.session._conn._telemetry_client.send_temp_table_cleanup_telemetry( + self.session.session_id, + temp_table_cleaner_enabled=self.session.auto_clean_up_temp_table_enabled, + num_temp_tables_cleaned=self.num_temp_tables_cleaned, + num_temp_tables_created=self.num_temp_tables_created, + ) + + @property + def num_temp_tables_created(self) -> int: + return len(self.ref_count_map) + + @property + def num_temp_tables_cleaned(self) -> int: + # TODO SNOW-1662536: we may need a separate counter for the number of tables cleaned when parameter is enabled + return sum(v == 0 for v in self.ref_count_map.values()) diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 8da0794f139..da6c8840666 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -582,9 +582,6 @@ def __init__( self._tmpdir_handler: Optional[tempfile.TemporaryDirectory] = None self._runtime_version_from_requirement: str = None self._temp_table_auto_cleaner: TempTableAutoCleaner = TempTableAutoCleaner(self) - if self._auto_clean_up_temp_table_enabled: - self._temp_table_auto_cleaner.start() - _logger.info("Snowpark Session information: %s", self._session_info) def __enter__(self): @@ -623,8 +620,8 @@ def close(self) -> None: raise SnowparkClientExceptionMessages.SERVER_FAILED_CLOSE_SESSION(str(ex)) finally: try: - self._conn.close() self._temp_table_auto_cleaner.stop() + self._conn.close() _logger.info("Closed session: %s", self._session_id) finally: _remove_session(self) @@ -659,9 +656,10 @@ def auto_clean_up_temp_table_enabled(self) -> bool: The default value is ``False``. Note: - Even if this parameter is ``False``, Snowpark still records temporary tables when - their corresponding DataFrame are garbage collected. Therefore, if you turn it on in the middle of your session or after turning it off, - the target temporary tables will still be cleaned up accordingly. + Temporary tables will only be dropped if this parameter is enabled during garbage collection. + If a temporary table is no longer referenced when the parameter is on, it will be dropped during garbage collection. + However, if garbage collection occurs while the parameter is off, the table will not be removed. + Note that Python's garbage collection is triggered opportunistically, with no guaranteed timing. """ return self._auto_clean_up_temp_table_enabled @@ -755,11 +753,6 @@ def auto_clean_up_temp_table_enabled(self, value: bool) -> None: self._session_id, value ) self._auto_clean_up_temp_table_enabled = value - is_alive = self._temp_table_auto_cleaner.is_alive() - if value and not is_alive: - self._temp_table_auto_cleaner.start() - elif not value and is_alive: - self._temp_table_auto_cleaner.stop() else: raise ValueError( "value for auto_clean_up_temp_table_enabled must be True or False!" diff --git a/tests/integ/test_temp_table_cleanup.py b/tests/integ/test_temp_table_cleanup.py index 4ac87661484..cdd97d49937 100644 --- a/tests/integ/test_temp_table_cleanup.py +++ b/tests/integ/test_temp_table_cleanup.py @@ -12,6 +12,7 @@ from snowflake.snowpark._internal.utils import ( TempObjectType, random_name_for_temp_object, + warning_dict, ) from snowflake.snowpark.functions import col from tests.utils import IS_IN_STORED_PROC @@ -25,40 +26,61 @@ WAIT_TIME = 1 +@pytest.fixture(autouse=True) +def setup(session): + auto_clean_up_temp_table_enabled = session.auto_clean_up_temp_table_enabled + session.auto_clean_up_temp_table_enabled = True + yield + session.auto_clean_up_temp_table_enabled = auto_clean_up_temp_table_enabled + + def test_basic(session): + session._temp_table_auto_cleaner.ref_count_map.clear() df1 = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).cache_result() table_name = df1.table_name table_ids = table_name.split(".") df1.collect() assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 df2 = df1.select("*").filter(col("a") == 1) df2.collect() assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 df3 = df1.union_all(df2) df3.collect() assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 - session._temp_table_auto_cleaner.start() del df1 gc.collect() time.sleep(WAIT_TIME) assert session._table_exists(table_ids) assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 del df2 gc.collect() time.sleep(WAIT_TIME) assert session._table_exists(table_ids) assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 del df3 gc.collect() time.sleep(WAIT_TIME) assert not session._table_exists(table_ids) - assert table_name not in session._temp_table_auto_cleaner.ref_count_map + assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 def test_function(session): + session._temp_table_auto_cleaner.ref_count_map.clear() table_name = None def f(session: Session) -> None: @@ -68,13 +90,16 @@ def f(session: Session) -> None: nonlocal table_name table_name = df.table_name assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 - session._temp_table_auto_cleaner.start() f(session) gc.collect() time.sleep(WAIT_TIME) assert not session._table_exists(table_name.split(".")) assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 @pytest.mark.parametrize( @@ -86,33 +111,42 @@ def f(session: Session) -> None: ], ) def test_copy(session, copy_function): + session._temp_table_auto_cleaner.ref_count_map.clear() df1 = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).cache_result() table_name = df1.table_name table_ids = table_name.split(".") df1.collect() assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 df2 = copy_function(df1).select("*").filter(col("a") == 1) df2.collect() assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 2 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 - session._temp_table_auto_cleaner.start() del df1 gc.collect() time.sleep(WAIT_TIME) assert session._table_exists(table_ids) assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 - session._temp_table_auto_cleaner.start() del df2 gc.collect() time.sleep(WAIT_TIME) assert not session._table_exists(table_ids) - assert table_name not in session._temp_table_auto_cleaner.ref_count_map + assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 @pytest.mark.skipif(IS_IN_STORED_PROC, reason="Cannot create session in SP") def test_reference_count_map_multiple_sessions(db_parameters, session): + session._temp_table_auto_cleaner.ref_count_map.clear() new_session = Session.builder.configs(db_parameters).create() + new_session.auto_clean_up_temp_table_enabled = True try: df1 = session.create_dataframe( [[1, 2], [3, 4]], schema=["a", "b"] @@ -120,43 +154,59 @@ def test_reference_count_map_multiple_sessions(db_parameters, session): table_name1 = df1.table_name table_ids1 = table_name1.split(".") assert session._temp_table_auto_cleaner.ref_count_map[table_name1] == 1 - assert new_session._temp_table_auto_cleaner.ref_count_map[table_name1] == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 + assert table_name1 not in new_session._temp_table_auto_cleaner.ref_count_map + assert new_session._temp_table_auto_cleaner.num_temp_tables_created == 0 + assert new_session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 df2 = new_session.create_dataframe( [[1, 2], [3, 4]], schema=["a", "b"] ).cache_result() table_name2 = df2.table_name table_ids2 = table_name2.split(".") - assert session._temp_table_auto_cleaner.ref_count_map[table_name2] == 0 + assert table_name2 not in session._temp_table_auto_cleaner.ref_count_map + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 assert new_session._temp_table_auto_cleaner.ref_count_map[table_name2] == 1 + assert new_session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert new_session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 - session._temp_table_auto_cleaner.start() del df1 gc.collect() time.sleep(WAIT_TIME) assert not session._table_exists(table_ids1) assert new_session._table_exists(table_ids2) assert session._temp_table_auto_cleaner.ref_count_map[table_name1] == 0 - assert new_session._temp_table_auto_cleaner.ref_count_map[table_name1] == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 + assert table_name1 not in new_session._temp_table_auto_cleaner.ref_count_map + assert new_session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert new_session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 - new_session._temp_table_auto_cleaner.start() del df2 gc.collect() time.sleep(WAIT_TIME) assert not new_session._table_exists(table_ids2) - assert session._temp_table_auto_cleaner.ref_count_map[table_name2] == 0 + assert table_name2 not in session._temp_table_auto_cleaner.ref_count_map + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 assert new_session._temp_table_auto_cleaner.ref_count_map[table_name2] == 0 + assert new_session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert new_session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 finally: new_session.close() def test_save_as_table_no_drop(session): - session._temp_table_auto_cleaner.start() + session._temp_table_auto_cleaner.ref_count_map.clear() def f(session: Session, temp_table_name: str) -> None: session.create_dataframe( [[1, 2], [3, 4]], schema=["a", "b"] ).write.save_as_table(temp_table_name, table_type="temp") - assert session._temp_table_auto_cleaner.ref_count_map[temp_table_name] == 0 + assert temp_table_name not in session._temp_table_auto_cleaner.ref_count_map + assert session._temp_table_auto_cleaner.num_temp_tables_created == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 temp_table_name = random_name_for_temp_object(TempObjectType.TABLE) f(session, temp_table_name) @@ -165,34 +215,25 @@ def f(session: Session, temp_table_name: str) -> None: assert session._table_exists([temp_table_name]) -def test_start_stop(session): - session._temp_table_auto_cleaner.stop() - - df1 = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).cache_result() - table_name = df1.table_name +def test_auto_clean_up_temp_table_enabled_parameter(db_parameters, session, caplog): + warning_dict.clear() + with caplog.at_level(logging.WARNING): + session.auto_clean_up_temp_table_enabled = False + assert session.auto_clean_up_temp_table_enabled is False + assert "auto_clean_up_temp_table_enabled is experimental" in caplog.text + df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"]).cache_result() + table_name = df.table_name table_ids = table_name.split(".") - assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 - del df1 + del df gc.collect() - assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 0 - assert not session._temp_table_auto_cleaner.queue.empty() - assert session._table_exists(table_ids) - - session._temp_table_auto_cleaner.start() time.sleep(WAIT_TIME) - assert session._temp_table_auto_cleaner.queue.empty() - assert not session._table_exists(table_ids) - - -def test_auto_clean_up_temp_table_enabled_parameter(db_parameters, session, caplog): - with caplog.at_level(logging.WARNING): - session.auto_clean_up_temp_table_enabled = True + assert session._table_exists(table_ids) + assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 0 + assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 + assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 1 + session.auto_clean_up_temp_table_enabled = True assert session.auto_clean_up_temp_table_enabled is True - assert "auto_clean_up_temp_table_enabled is experimental" in caplog.text - assert session._temp_table_auto_cleaner.is_alive() - session.auto_clean_up_temp_table_enabled = False - assert session.auto_clean_up_temp_table_enabled is False - assert not session._temp_table_auto_cleaner.is_alive() + with pytest.raises( ValueError, match="value for auto_clean_up_temp_table_enabled must be True or False!", diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 262c9e82c44..370ee455d62 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -112,6 +112,7 @@ def test_used_scoped_temp_object(): def test_close_exception(): fake_connection = mock.create_autospec(ServerConnection) fake_connection._conn = mock.Mock() + fake_connection._telemetry_client = mock.Mock() fake_connection.is_closed = MagicMock(return_value=False) exception_msg = "Mock exception for session.cancel_all" fake_connection.run_query = MagicMock(side_effect=Exception(exception_msg)) From 8bef77a13dd98ed45161df50415797ce88b7eb78 Mon Sep 17 00:00:00 2001 From: Jianzhun Du Date: Thu, 12 Sep 2024 13:57:03 -0700 Subject: [PATCH 2/3] add test --- tests/integ/test_telemetry.py | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/integ/test_telemetry.py b/tests/integ/test_telemetry.py index 7aaa5c9e5dd..39749de76f6 100644 --- a/tests/integ/test_telemetry.py +++ b/tests/integ/test_telemetry.py @@ -1223,3 +1223,51 @@ def send_telemetry(): data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry) assert data == expected_data assert type_ == "snowpark_compilation_stage_statistics" + + +def test_temp_table_cleanup(session): + client = session._conn._telemetry_client + + def send_telemetry(): + client.send_temp_table_cleanup_telemetry( + session.session_id, + temp_table_cleaner_enabled=True, + num_temp_tables_cleaned=2, + num_temp_tables_created=5, + ) + + telemetry_tracker = TelemetryDataTracker(session) + + expected_data = { + "session_id": session.session_id, + "temp_table_cleaner_enabled": True, + "num_temp_tables_cleaned": 2, + "num_temp_tables_created": 5, + } + + data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry) + assert data == expected_data + assert type_ == "snowpark_temp_table_cleanup" + + +def test_temp_table_cleanup_exception(session): + client = session._conn._telemetry_client + + def send_telemetry(): + client.send_temp_table_cleanup_abnormal_exception_telemetry( + session.session_id, + table_name="table_name_placeholder", + exception_message="exception_message_placeholder", + ) + + telemetry_tracker = TelemetryDataTracker(session) + + expected_data = { + "session_id": session.session_id, + "temp_table_cleanup_abnormal_exception_table_name": "table_name_placeholder", + "temp_table_cleanup_abnormal_exception_message": "exception_message_placeholder", + } + + data, type_, _ = telemetry_tracker.extract_telemetry_log_data(-1, send_telemetry) + assert data == expected_data + assert type_ == "snowpark_temp_table_cleanup_abnormal_exception" From f61a58ea41221e71f6f91c5ee0541e85d0a4b197 Mon Sep 17 00:00:00 2001 From: Jianzhun Du Date: Thu, 12 Sep 2024 14:55:52 -0700 Subject: [PATCH 3/3] add example --- src/snowflake/snowpark/session.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index da6c8840666..8ffd4081473 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -655,6 +655,28 @@ def auto_clean_up_temp_table_enabled(self) -> bool: :meth:`DataFrame.cache_result` in the current session when the DataFrame is no longer referenced (i.e., gets garbage collected). The default value is ``False``. + Example:: + + >>> import gc + >>> + >>> def f(session: Session) -> str: + ... df = session.create_dataframe( + ... [[1, 2], [3, 4]], schema=["a", "b"] + ... ).cache_result() + ... return df.table_name + ... + >>> session.auto_clean_up_temp_table_enabled = True + >>> table_name = f(session) + >>> assert table_name + >>> gc.collect() # doctest: +SKIP + >>> + >>> # The temporary table created by cache_result will be dropped when the DataFrame is no longer referenced + >>> # outside the function + >>> session.sql(f"show tables like '{table_name}'").count() + 0 + + >>> session.auto_clean_up_temp_table_enabled = False + Note: Temporary tables will only be dropped if this parameter is enabled during garbage collection. If a temporary table is no longer referenced when the parameter is on, it will be dropped during garbage collection.