-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1641644: Drop temp table directly at garbage collection instead of using multi-threading #2214
Conversation
@@ -643,11 +639,6 @@ def auto_clean_up_temp_table_enabled(self) -> bool: | |||
When setting this parameter to ``True``, Snowpark will automatically clean up temporary tables created by | |||
:meth:`DataFrame.cache_result` in the current session when the DataFrame is no longer referenced (i.e., gets garbage collected). | |||
The default value is ``False``. | |||
|
|||
Note: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still provide this guarantee, by checking all entries of the count map whether a count reaches 0 during garbage collection. But given we're not using watch thread, I don't think this guarantee makes much sense. We only need to do our best effort to clean up temp tables when this parameter is enabled. Alternatively, we can always easily add this guarantee later when the customer requests it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavior is irrelevant with the threading work, it is a behavior of the cleaner. if we do not have this, what we are saying is we only clean up temp tables whose reference reach 0 after the cleaner is started. let's make sure this is documented clearly somewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can say the temporary tables will only be dropped when this parameter is turned on during garbage collection, whereas the garbage collection in Python is triggered opportunistically and the timing is not guaranteed. I think it's also clear. Let me know wdyt.
@@ -54,53 +46,26 @@ def _delete_ref_count(self, name: str) -> None: | |||
self.ref_count_map[name] -= 1 | |||
if self.ref_count_map[name] == 0: | |||
self.ref_count_map.pop(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do not do pop here, if we keep the stop method, we can probably sent a teletemetry on close for the temp table that we didn't clean up (ones that whose reference that is 0, and ones that whose reference is not 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have a telemetry here about 1) the number of temp tables cleaned up (value is 0) 2) the total number of temp table created by cache_result (length of the dict), when closing the session. So we can understand the percentage of temp table created from cache_result that are cleaned up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think actually it's a good point, and I added such telemetry and corresponding tests
@@ -643,11 +639,6 @@ def auto_clean_up_temp_table_enabled(self) -> bool: | |||
When setting this parameter to ``True``, Snowpark will automatically clean up temporary tables created by | |||
:meth:`DataFrame.cache_result` in the current session when the DataFrame is no longer referenced (i.e., gets garbage collected). | |||
The default value is ``False``. | |||
|
|||
Note: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavior is irrelevant with the threading work, it is a behavior of the cleaner. if we do not have this, what we are saying is we only clean up temp tables whose reference reach 0 after the cleaner is started. let's make sure this is documented clearly somewhere
except Exception as ex: | ||
logging.warning( | ||
f"Cleanup Thread: Failed to drop {common_log_text}, exception: {ex}" | ||
f"Failed to drop {common_log_text}, exception: {ex}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably recored a telemetry later for failed table drops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we might not need telemetry for this case, because we can group by temp table name from statement params, then we can find the corresponding query is successful or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can only do this for queries that has reached to server, not for the ones that don't even have a query id.
i think what we can do here is check if the async job is not None, and if it doesn't have a query_id, if not, we can sent a separate telemetry over
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async job will not be None and only query id might be None. I added such a telemetry.
0311885
to
ce05b0b
Compare
src/snowflake/snowpark/session.py
Outdated
Even if this parameter is ``False``, Snowpark still records temporary tables when | ||
their corresponding DataFrame are garbage collected. Therefore, if you turn it on in the middle of your session or after turning it off, | ||
the target temporary tables will still be cleaned up accordingly. | ||
The temporary tables will only be dropped when this parameter is turned on during garbage collection. That is, if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wording of this note is very hard to read. Can we have another pass over it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes fixed. Actually here I would like clarify that only if garbage collection when this parameter is on, table will be dropped. But garbage collection might not happen once the table object is not referenced anymore. It's controlled by python. So if a user turns on this parameter, a table object is not referenced whereas garbage collection doesn't occur, then user turns it off, finally the table will not be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most recent writeup is way clearer. Thanks for this update! I would also add that perhaps an example for a simple script where we can show where a temp table gets created, where it becomes not referenced anymore, and the scope (following this last point) within which it may get dropped assuming (1) python garbage collection is triggered, and (2) the parameter is on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are cases where this scenario can play out because of explicitly using cache_result
or because of other (implicit) reasons for temp table creation, then we can also have an example for each such case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, added!
9447316
to
083b6a6
Compare
|
||
|
||
class TempTableCleanupTelemetryField(Enum): | ||
TYPE_TEMP_TABLE_CLEANUP = "snowpark_temp_table_cleanup" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that probably doesn't belong to the new compiliation stage, more suitable to be added at the current telemetry constants file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry didn't realize it's in new compiliation stage, moved.
|
||
del df2 | ||
gc.collect() | ||
time.sleep(WAIT_TIME) | ||
assert session._table_exists(table_ids) | ||
assert session._temp_table_auto_cleaner.ref_count_map[table_name] == 1 | ||
assert session._temp_table_auto_cleaner.num_temp_tables_created == 1 | ||
assert session._temp_table_auto_cleaner.num_temp_tables_cleaned == 0 | ||
|
||
del df3 | ||
gc.collect() | ||
time.sleep(WAIT_TIME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are the wait time here, the wait here just sounds like it is going to be flaky by default, i think we can just do a while loop check here with a time out of 5 mins here
src/snowflake/snowpark/session.py
Outdated
@@ -609,8 +606,12 @@ def close(self) -> None: | |||
raise SnowparkClientExceptionMessages.SERVER_FAILED_CLOSE_SESSION(str(ex)) | |||
finally: | |||
try: | |||
self._conn._telemetry_client.send_temp_table_cleanup_telemetry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would suggest you to add back the stop() method, and move those telemetry sent to the stop methods, and we send telemetry on every stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, added back
except Exception as ex: | ||
logging.warning( | ||
f"Cleanup Thread: Failed to drop {common_log_text}, exception: {ex}" | ||
f"Failed to drop {common_log_text}, exception: {ex}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can only do this for queries that has reached to server, not for the ones that don't even have a query id.
i think what we can do here is check if the async job is not None, and if it doesn't have a query_id, if not, we can sent a separate telemetry over
self.stop_event.set() | ||
if self.is_alive(): | ||
self.cleanup_thread.join() | ||
def reset_reference_count_map(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that only for testing purpose? can you just access the class to clear the map for just testing purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
9721d9b
to
200daf5
Compare
warning_message = f"Failed to drop {common_log_text}, exception: {ex}" | ||
logging.warning(warning_message) | ||
if query_id is None: | ||
self.session._conn._telemetry_client.send_temp_table_cleanup_exception_telemetry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment here
"""
If no query_id is available that means the query haven't been accepted by gs, and it won't occur in our job_etl_view, send a separate telemetry for recording.
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if self.is_alive(): | ||
self.cleanup_thread.join() | ||
self.session._conn._telemetry_client.send_temp_table_cleanup_telemetry( | ||
self.session.session_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also record the parameter value, so that we know when this telemetry is sent is it due to session close or parameter turn off
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
self.send(message) | ||
|
||
def send_temp_table_cleanup_exception_telemetry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can call this "abnormal_exception" since we are not sending this for all exeception,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if query_id is None: | ||
self.session._conn._telemetry_client.send_temp_table_cleanup_exception_telemetry( | ||
self.session.session_id, | ||
warning_message, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we separate the table name and exception message, so that we do not need to parse the fields for information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
2b513e3
to
8bef77a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-1641644
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
see details in discussion