diff --git a/sciphi/config/generation_settings/book_draft_settings.yml b/sciphi/config/generation_settings/book_draft_settings.yml index bd27f24..7e23338 100644 --- a/sciphi/config/generation_settings/book_draft_settings.yml +++ b/sciphi/config/generation_settings/book_draft_settings.yml @@ -9,6 +9,7 @@ num_threads_per_proc: null num_processes: 1 process_num: 0 filter_existing_books: true +batch_size: 2 # llm config llm_provider: openai diff --git a/sciphi/examples/library_of_phi/generate_textbook.py b/sciphi/examples/library_of_phi/generate_textbook.py index c9eb59c..23e8c79 100644 --- a/sciphi/examples/library_of_phi/generate_textbook.py +++ b/sciphi/examples/library_of_phi/generate_textbook.py @@ -2,10 +2,9 @@ import logging import multiprocessing import os -from typing import Generator, Tuple +from typing import Any, Generator, Tuple, Union import fire -from tqdm import tqdm from sciphi.core.utils import get_data_dir from sciphi.examples.helpers import ( @@ -99,19 +98,20 @@ def _load_configuration(self, config_path: str, cli_args: dict): raise ValueError( "Set do_rag to `False`, or provide a RAG server rag_url, rag_username, and rag_password." ) - + self.yml_pointer = config.batch_size self.config = config - def initialize_processing(self, textbook_output_name: str) -> None: + def get_writer(self, textbook_output_name: str) -> CompositeWriter: """Set up and return the output writer.""" output_path = os.path.join( self.config.data_dir, self.config.output_dir, textbook_output_name ) if not os.path.exists(os.path.dirname(output_path)): os.makedirs(os.path.dirname(output_path)) - self.logger.info(f"Saving textbook to {output_path}") - self.writer = TextbookContentGenerator.CompositeWriter(output_path) - self.writer.raw_writer.write(TextbookContentGenerator.AI_DISCLAIMER) + self.logger.info( + f"Saving textbook at {textbook_output_name} to {output_path}" + ) + return TextbookContentGenerator.CompositeWriter(output_path) def dry_run(self) -> None: """ @@ -123,79 +123,96 @@ def dry_run(self) -> None: if not self.llm_provider: raise ValueError("Invalid LLM provider configuration.") - def run(self) -> None: + def initialize_generators( + self, yml_file_paths_chunk + ) -> list[list[Union[Generator, Any]]]: + """Initialize generators based on the yaml files.""" + generators = [] + for yml_file_path in yml_file_paths_chunk[0 : self.config.batch_size]: + yml_config = load_yaml_file(yml_file_path) + textbook_output_name = self.config.textbook or os.path.basename( + yml_file_path + ).replace(".yaml", "") + writer = self.get_writer(textbook_output_name) + generators.append( + [self.process_book_elements(yml_config), None, writer] + ) + return generators + + def process_single_generator(self, generator, current_completion, writer): + """Process a single generator and fetch completions.""" + textbook, current_prompt, prompt_type, chapter, __ = generator.send( + current_completion + ) + self.logger.debug("-" * 200) + self.logger.debug(f"Current Prompt:\n{current_prompt}\n\n") + current_completion = with_retry( + lambda: self.llm_provider.get_completion(current_prompt) + ) + + self.logger.debug(f"Current Completion:\n{current_completion}\n\n") + self.logger.debug("-" * 200) + if prompt_type == "foreword": + writer.raw_writer.write( + f"{TextbookContentGenerator.AI_DISCLAIMER}\n# {textbook}\n{current_completion}\n" + ) + else: + writer.raw_writer.write(f"{current_completion}\n") + writer.jsonl_writer.write( + [ + { + "prompt": current_prompt, + "completion": current_completion, + "type": prompt_type, + } + ] + ) + return current_completion + + def get_next_generator(self, yml_file_paths_chunk: list[str]): + """Get a generator for the next yml file.""" + if self.yml_pointer < len(yml_file_paths_chunk): + yml_file_path = yml_file_paths_chunk[self.yml_pointer] + textbook_output_name = os.path.basename(yml_file_path).replace( + ".yaml", "" + ) + yml_config = load_yaml_file(yml_file_path) + self.yml_pointer += 1 + writer = self.get_writer(textbook_output_name) + return [self.process_book_elements(yml_config), None, writer] + return None + + def run(self): """Run the draft book generation process.""" yml_file_paths_chunk = self.config_manager.get_yml_file_paths( self.logger ) + generators = self.initialize_generators(yml_file_paths_chunk) - if self.config.num_threads_per_proc > 1: - self.logger.debug( - f"Process {self.config.process_num} is processing {len(yml_file_paths_chunk)} files" - ) + while generators: + exhausted_generators = [] - pool = multiprocessing.Pool( - processes=self.config.num_threads_per_proc - ) - with tqdm( - total=len(yml_file_paths_chunk), desc="Processing files" - ) as pbar: - for _ in pool.imap( - self.process_yml_file, yml_file_paths_chunk - ): - pbar.update(1) - - pool.close() - pool.join() - else: - for yml_file_path in tqdm( - yml_file_paths_chunk, desc="Processing files" + for i, (generator, current_completion, writer) in enumerate( + generators ): - self.process_yml_file(yml_file_path) - - def process_yml_file(self, yml_file_path: str) -> None: - """Process a single YAML file to generate textbook content.""" - textbook_output_name = self.config.textbook or os.path.basename( - yml_file_path - ).replace(".yaml", "") - self.initialize_processing(textbook_output_name) - yml_config = load_yaml_file(yml_file_path) - - generator = self.process_book_elements(yml_config) - current_completion = None - try: - while True: - textbook, current_prompt, prompt_type, _, __ = generator.send( - current_completion - ) - self.logger.debug("-" * 200) - self.logger.debug(f"Current Prompt:\n{current_prompt}\n\n") - current_completion = with_retry( - lambda: self.llm_provider.get_completion(current_prompt) - ) - self.logger.debug( - f"Current Completion:\n{current_completion}\n\n" - ) - self.logger.debug("-" * 200) + try: + updated_completion = self.process_single_generator( + generator, current_completion, writer + ) + generators[i][1] = updated_completion + except StopIteration: + exhausted_generators.append(i) - self.writer.raw_writer.write( - f"# {textbook}\n{current_completion}\n" - ) - self.writer.jsonl_writer.write( - [ - { - "prompt": current_prompt, - "completion": current_completion, - "type": prompt_type, - } - ] - ) + # Remove exhausted generators and add new ones if there are still unprocessed yml files + for index in reversed(exhausted_generators): + del generators[index] - except StopIteration: - pass + new_generator = self.get_next_generator(yml_file_paths_chunk) + if new_generator: + generators.append(new_generator) def process_book_elements( - self, config: dict # , prev_completion: str = None + self, config: dict ) -> Generator[Tuple[str, str, str, str], None, None]: """Process the elements of a textbook configuration.""" prev_chapter_config = None