Skip to content

Commit

Permalink
enhance: handling abormal iterator cp file(milvus-io#2306)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Oct 23, 2024
1 parent 0fc6b28 commit f72a72b
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions pymilvus/orm/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ def __seek_to_offset(self):

def __init_cp_file_handler(self) -> bool:
mode = "w"
if Path(self._cp_file_path).exists():
if self._cp_file_path.exists():
mode = "r+"
try:
self._cp_file_handler = Path(self._cp_file_path).open(mode) # noqa: SIM115
self._cp_file_handler = self._cp_file_path.open(mode)
except OSError as ose:
raise MilvusException(
message=f"Failed to open cp file for iterator:{self._cp_file_path}"
message=f"Failed to open cp file for iterator:{self._cp_file_path_str}"
) from ose
return mode == "r+"

Expand All @@ -163,18 +163,26 @@ def __save_mvcc_ts(self):
self._cp_file_handler.writelines(str(self._session_ts) + "\n")

def __save_pk_cursor(self):
if self._need_save_cp:
if self._need_save_cp and self._next_id is not None:
if not self._cp_file_path.exists():
self._cp_file_handler.close()
self._cp_file_handler = self._cp_file_path.open("w")
self._buffer_cursor_lines_number = 0
self.__save_mvcc_ts()
log.warning(
"iterator cp file is not existed any more, recreate for iteration, "
"do not remove this file manually!"
)
if self._buffer_cursor_lines_number >= 100:
self._cp_file_handler.seek(0)
self._cp_file_handler.truncate()
log.info(
"cursor lines in cp file has exceeded 100 lines, truncate the file and rewrite"
)
self._cp_file_handler.writelines(str(self._session_ts) + "\n")
assert_info(self._next_id is not None, "next_id should not be None when saving cp")
self._buffer_cursor_lines_number = 0
self._cp_file_handler.writelines(str(self._next_id) + "\n")
self._cp_file_handler.flush()
self._buffer_cursor_lines_number = 0
self._buffer_cursor_lines_number += 1

def __check_set_reduce_stop_for_best(self):
if self._kwargs.get(REDUCE_STOP_FOR_BEST, True):
Expand Down Expand Up @@ -226,13 +234,15 @@ def __setup_ts_by_request(self):

def __set_up_ts_cp(self):
self._buffer_cursor_lines_number = 0
self._cp_file_path = self._kwargs.get(ITERATOR_SESSION_CP_FILE, None)
self._cp_file_path_str = self._kwargs.get(ITERATOR_SESSION_CP_FILE, None)
self._cp_file_path = None
# no input cp_file, set up mvccTs by query request
if self._cp_file_path is None:
if self._cp_file_path_str is None:
self._need_save_cp = False
self.__setup_ts_by_request()
else:
self._need_save_cp = True
self._cp_file_path = Path(self._cp_file_path_str)
if not self.__init_cp_file_handler():
# input cp file is empty, set up mvccTs by query request
self.__setup_ts_by_request()
Expand All @@ -244,7 +254,7 @@ def __set_up_ts_cp(self):
line_count = len(lines)
if line_count < 2:
raise ParamError(
message=f"input cp file:{self._cp_file_path} should contain "
message=f"input cp file:{self._cp_file_path_str} should contain "
f"at least two lines, but only:{line_count} lines"
)
self._session_ts = int(lines[0])
Expand All @@ -254,7 +264,7 @@ def __set_up_ts_cp(self):
self._next_id = lines[self._buffer_cursor_lines_number].strip()
except OSError as ose:
raise MilvusException(
message=f"Failed to read cp info from file:{self._cp_file_path}"
message=f"Failed to read cp info from file:{self._cp_file_path_str}"
) from ose
except ValueError as e:
raise ParamError(message=f"cannot parse input cp session_ts:{lines[0]}") from e
Expand Down Expand Up @@ -346,11 +356,11 @@ def close(self) -> None:

def inner_close():
self._cp_file_handler.close()
Path(self._cp_file_path).unlink()
log.info(f"removed cp file:{self._cp_file_path} for query iterator")
self._cp_file_path.unlink()
log.info(f"removed cp file:{self._cp_file_path_str} for query iterator")

io_operation(
inner_close, f"failed to clear cp file:{self._cp_file_path} for query iterator"
inner_close, f"failed to clear cp file:{self._cp_file_path_str} for query iterator"
)


Expand Down

0 comments on commit f72a72b

Please sign in to comment.