Skip to content

Commit

Permalink
Add docs to pipeline builder
Browse files Browse the repository at this point in the history
  • Loading branch information
botirk38 committed Aug 26, 2024
1 parent 5e9268f commit 58a27ed
Showing 1 changed file with 70 additions and 4 deletions.
74 changes: 70 additions & 4 deletions huggingface_pipelines/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from .audio import AudioToEmbeddingPipelineFactory # type: ignore
from .metric_analyzer import MetricAnalyzerPipelineFactory # type: ignore
from .pipeline import Pipeline # type: ignore
from .text import ( # type: ignore
from .pipeline import Pipeline, PipelineFactory # type: ignore
from .text import ( # type: ignore[import]
EmbeddingToTextPipelineFactory,
TextSegmentationPipelineFactory,
TextToEmbeddingPipelineFactory,
Expand All @@ -26,11 +26,35 @@


class PipelineBuilder:
"""
A class for building and managing different types of processing pipelines.
This class provides methods to create pipelines for various operations such as
text-to-embedding, embedding-to-text, text segmentation, metric analysis, and
audio-to-embedding. It uses the Factory pattern to create specific pipeline
instances based on the operation type and configuration.
Attributes:
config_dir (Path): The directory containing configuration files for pipelines.
pipeline_factories (Dict[SupportedOperation, PipelineFactory]): A dictionary mapping
operations to their respective factory classes.
Args:
config_dir (Union[str, Path], optional): The directory containing configuration
files. Defaults to "huggingface_pipelines/datacards".
Example:
>>> builder = PipelineBuilder()
>>> text_to_embedding_pipeline = builder.create_pipeline("sonar", "text_to_embedding")
>>> processed_dataset = text_to_embedding_pipeline(input_dataset)
"""

def __init__(
self, config_dir: Union[str, Path] = "huggingface_pipelines/datacards"
):
self.config_dir = Path(config_dir)
self.pipeline_factories: Dict[SupportedOperation, Any] = {
self.pipeline_factories: Dict[SupportedOperation, PipelineFactory] = {
"text_to_embedding": TextToEmbeddingPipelineFactory(),
"embedding_to_text": EmbeddingToTextPipelineFactory(),
"text_segmentation": TextSegmentationPipelineFactory(),
Expand All @@ -41,6 +65,23 @@ def __init__(
def load_config(
self, dataset_name: str, operation: SupportedOperation
) -> Dict[str, Any]:
"""
Load the configuration for a specific dataset and operation.
This method reads the YAML configuration file for the specified dataset and operation.
The configuration is used to set up the PipelineConfig for the requested pipeline.
Args:
dataset_name (str): The name of the dataset.
operation (SupportedOperation): The type of operation to perform.
Returns:
Dict[str, Any]: The configuration dictionary.
Raises:
FileNotFoundError: If the configuration file is not found.
"""
config_file = self.config_dir / f"{dataset_name}/{operation}.yaml"
try:
with open(config_file, "r") as f:
Expand All @@ -54,10 +95,35 @@ def load_config(
def create_pipeline(
self, dataset_name: str, operation: SupportedOperation
) -> Pipeline:
"""
Create a pipeline for a specific dataset and operation.
This method uses the appropriate PipelineFactory to create a Pipeline instance
based on the specified operation and configuration. The created Pipeline
adheres to the abstract Pipeline class structure and uses a PipelineConfig
for its configuration.
Args:
dataset_name (str): The name of the dataset.
operation (SupportedOperation): The type of operation to perform.
Returns:
Pipeline: The created pipeline instance, which can be called with a dataset.
Raises:
ValueError: If the operation is not supported.
Example:
>>> builder = PipelineBuilder()
>>> text_to_embedding_pipeline = builder.create_pipeline("dataset_name", "text_to_embedding")
>>> processed_dataset = text_to_embedding_pipeline(input_dataset)
>>> audio_to_embedding_pipeline = builder.create_pipeline("dataset_name", "audio_to_embedding")
>>> processed_audio_dataset = audio_to_embedding_pipeline(input_audio_dataset)
"""
if operation not in self.pipeline_factories:
raise ValueError(
f"Unsupported operation: {operation}. Supported operations are: {', '.join(self.pipeline_factories.keys())}"
)

config = self.load_config(dataset_name, operation)
return self.pipeline_factories[operation].create_pipeline(config)

0 comments on commit 58a27ed

Please sign in to comment.