diff --git a/utils/generate_api.py b/utils/generate_api.py index 8b02e1be..0a92cb4a 100644 --- a/utils/generate_api.py +++ b/utils/generate_api.py @@ -177,35 +177,35 @@ def dump(self) -> None: with open( "opensearchpy/_async/client/plugins.py", "r+", encoding="utf-8" ) as file: - content = file.read() - content = content.replace( + file_content = file.read() + file_content = file_content.replace( "super().__init__(client)\n", f"super().__init__(client)\n\n self.{self.namespace} = {self.namespace_new}Client(client)", # pylint: disable=line-too-long 1, ) - content = content.replace( + file_content = file_content.replace( "from .client import Client", f"from ..plugins.{self.namespace} import {self.namespace_new}Client\nfrom .client import Client", # pylint: disable=line-too-long 1, ) - content = content.replace( + file_content = file_content.replace( "class PluginsClient(NamespacedClient):\n", f"class PluginsClient(NamespacedClient): \n {self.namespace}: Any\n", # pylint: disable=line-too-long 1, ) - content = content.replace( + file_content = file_content.replace( "plugins = [", f'plugins = [\n "{self.namespace}",\n' ) file.seek(0) - file.write(content) + file.write(file_content) file.truncate() else: with open( "opensearchpy/_async/client/__init__.py", "r+", encoding="utf-8" ) as file: - content = file.read() - file_content = content.replace( + file_content = file.read() + file_content = file_content.replace( "# namespaced clients for compatibility with API names", f"# namespaced clients for compatibility with API names\n self.{self.namespace} = {self.namespace_new}Client(self)", # pylint: disable=line-too-long 1, @@ -231,27 +231,27 @@ def dump(self) -> None: # Identifying the insertion point for the "THIS CODE IS AUTOMATICALLY GENERATED" header. if os.path.exists(self.filepath): with open(self.filepath, encoding="utf-8") as file: - content = file.read() - if header_separator in content: + file_content = file.read() + if header_separator in file_content: update_header = False header_end_position = ( - content.find(header_separator) + len(header_separator) + 2 + file_content.find(header_separator) + len(header_separator) + 2 ) - header_position = content.rfind("\n", 0, header_end_position) + 1 - if license_header_end_1 in content: - if license_header_end_2 in content: + header_position = file_content.rfind("\n", 0, header_end_position) + 1 + if license_header_end_1 in file_content: + if license_header_end_2 in file_content: position = ( - content.find(license_header_end_2) + file_content.find(license_header_end_2) + len(license_header_end_2) + 2 ) else: position = ( - content.find(license_header_end_1) + file_content.find(license_header_end_1) + len(license_header_end_1) + 2 ) - license_position = content.rfind("\n", 0, position) + 1 + license_position = file_content.rfind("\n", 0, position) + 1 current_script_folder = os.path.dirname(os.path.abspath(__file__)) generated_file_header_path = os.path.join( @@ -272,49 +272,47 @@ def dump(self) -> None: if "from " + utils + " import" not in line ) - with open(self.filepath, "w", encoding="utf-8") as file: - if update_header is True: - file.write( - self.header[:license_position] - + "\n" - + header_content - + "\n\n" - + "#replace_token#\n" - + self.header[license_position:] - ) - else: - file.write( - self.header[:header_position] - + "\n" - + "#replace_token#\n" - + self.header[header_position:] - ) - for api in self._apis: - file.write(api.to_python()) + file_content = "" + if update_header is True: + file_content += ( + self.header[:license_position] + + "\n" + + header_content + + "\n\n" + + "#replace_token#\n" + + self.header[license_position:] + ) + else: + file_content += ( + self.header[:header_position] + + "\n" + + "#replace_token#\n" + + self.header[header_position:] + ) + for api in self._apis: + file_content += api.to_python() # Generating imports for each module utils_imports = "" - file_content = "" - with open(self.filepath, encoding="utf-8") as file: - content = file.read() - keywords = [ - "SKIP_IN_PATH", - "_normalize_hosts", - "_escape", - "_make_path", - "query_params", - "_bulk_body", - "_base64_auth_header", - "NamespacedClient", - "AddonClient", - ] - present_keywords = [keyword for keyword in keywords if keyword in content] - - if present_keywords: - utils_imports = "from " + utils + " import" - result = f"{utils_imports} {', '.join(present_keywords)}" - utils_imports = result - file_content = content.replace("#replace_token#", utils_imports) + + keywords = [ + "SKIP_IN_PATH", + "_normalize_hosts", + "_escape", + "_make_path", + "query_params", + "_bulk_body", + "_base64_auth_header", + "NamespacedClient", + "AddonClient", + ] + present_keywords = [keyword for keyword in keywords if keyword in file_content] + + if present_keywords: + utils_imports = "from " + utils + " import" + result = f"{utils_imports} {', '.join(present_keywords)}" + utils_imports = result + file_content = file_content.replace("#replace_token#", utils_imports) with open(self.filepath, "w", encoding="utf-8") as file: file.write(file_content)