diff --git a/pymilvus/orm/iterator.py b/pymilvus/orm/iterator.py index 8cc974a0a..6d54ded9e 100644 --- a/pymilvus/orm/iterator.py +++ b/pymilvus/orm/iterator.py @@ -145,13 +145,13 @@ def __seek_to_offset(self): def __init_cp_file_handler(self) -> bool: mode = "w" - if Path(self._cp_file_path).exists(): + if self._cp_file_path.exists(): mode = "r+" try: - self._cp_file_handler = Path(self._cp_file_path).open(mode) # noqa: SIM115 + self._cp_file_handler = self._cp_file_path.open(mode) except OSError as ose: raise MilvusException( - message=f"Failed to open cp file for iterator:{self._cp_file_path}" + message=f"Failed to open cp file for iterator:{self._cp_file_path_str}" ) from ose return mode == "r+" @@ -163,18 +163,26 @@ def __save_mvcc_ts(self): self._cp_file_handler.writelines(str(self._session_ts) + "\n") def __save_pk_cursor(self): - if self._need_save_cp: + if self._need_save_cp and self._next_id is not None: + if not self._cp_file_path.exists(): + self._cp_file_handler.close() + self._cp_file_handler = self._cp_file_path.open("w") + self._buffer_cursor_lines_number = 0 + self.__save_mvcc_ts() + log.warning( + "iterator cp file is not existed any more, recreate for iteration, " + "do not remove this file manually!" + ) if self._buffer_cursor_lines_number >= 100: self._cp_file_handler.seek(0) self._cp_file_handler.truncate() log.info( "cursor lines in cp file has exceeded 100 lines, truncate the file and rewrite" ) - self._cp_file_handler.writelines(str(self._session_ts) + "\n") - assert_info(self._next_id is not None, "next_id should not be None when saving cp") + self._buffer_cursor_lines_number = 0 self._cp_file_handler.writelines(str(self._next_id) + "\n") self._cp_file_handler.flush() - self._buffer_cursor_lines_number = 0 + self._buffer_cursor_lines_number += 1 def __check_set_reduce_stop_for_best(self): if self._kwargs.get(REDUCE_STOP_FOR_BEST, True): @@ -226,13 +234,15 @@ def __setup_ts_by_request(self): def __set_up_ts_cp(self): self._buffer_cursor_lines_number = 0 - self._cp_file_path = self._kwargs.get(ITERATOR_SESSION_CP_FILE, None) + self._cp_file_path_str = self._kwargs.get(ITERATOR_SESSION_CP_FILE, None) + self._cp_file_path = None # no input cp_file, set up mvccTs by query request - if self._cp_file_path is None: + if self._cp_file_path_str is None: self._need_save_cp = False self.__setup_ts_by_request() else: self._need_save_cp = True + self._cp_file_path = Path(self._cp_file_path_str) if not self.__init_cp_file_handler(): # input cp file is empty, set up mvccTs by query request self.__setup_ts_by_request() @@ -244,7 +254,7 @@ def __set_up_ts_cp(self): line_count = len(lines) if line_count < 2: raise ParamError( - message=f"input cp file:{self._cp_file_path} should contain " + message=f"input cp file:{self._cp_file_path_str} should contain " f"at least two lines, but only:{line_count} lines" ) self._session_ts = int(lines[0]) @@ -254,7 +264,7 @@ def __set_up_ts_cp(self): self._next_id = lines[self._buffer_cursor_lines_number].strip() except OSError as ose: raise MilvusException( - message=f"Failed to read cp info from file:{self._cp_file_path}" + message=f"Failed to read cp info from file:{self._cp_file_path_str}" ) from ose except ValueError as e: raise ParamError(message=f"cannot parse input cp session_ts:{lines[0]}") from e @@ -346,11 +356,11 @@ def close(self) -> None: def inner_close(): self._cp_file_handler.close() - Path(self._cp_file_path).unlink() - log.info(f"removed cp file:{self._cp_file_path} for query iterator") + self._cp_file_path.unlink() + log.info(f"removed cp file:{self._cp_file_path_str} for query iterator") io_operation( - inner_close, f"failed to clear cp file:{self._cp_file_path} for query iterator" + inner_close, f"failed to clear cp file:{self._cp_file_path_str} for query iterator" )