Skip to content

Commit

Permalink
Enhance data processing pipeline
Browse files Browse the repository at this point in the history
Add logging, type hints, and docstrings to various functions and classes in the `adv_data_processing` module.

* **Batch Processing**:
  - Add logging for batch processing functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.

* **Data Cleaning**:
  - Add logging for data cleaning functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.

* **Configuration Management**:
  - Add logging for configuration management functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.

* **Data Preprocessing**:
  - Add logging for data preprocessing functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.

* **Data Transformation**:
  - Add logging for data transformation functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.

* **Dimensionality Reduction**:
  - Add logging for dimensionality reduction functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.

* **Feature Engineering**:
  - Add logging for feature engineering functions.
  - Add type hints for all functions and classes.
  - Add docstrings for all functions and classes.
  - Remove unused imports.
  • Loading branch information
stochastic-sisyphus committed Dec 10, 2024
1 parent 28900d4 commit 23f67ff
Show file tree
Hide file tree
Showing 29 changed files with 1,506 additions and 133 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,48 @@ For more detailed usage instructions and examples, please refer to the full docu
## Tutorials and Examples

### Time Series Analysis Example

## Environment Setup Instructions

To set up the environment for this project, follow these steps:

1. Clone the repository:
```bash
git clone https://github.com/stochastic-sisyphus/adv_data_processing_pipeline.git
cd adv_data_processing_pipeline
```

2. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # On Windows use `venv\Scripts\activate`
```
3. Install the required dependencies:
```bash
pip install -r requirements.txt
```
4. For development, install additional dependencies:
```bash
pip install -r requirements-dev.txt
```
## Contribution Guidelines
We welcome contributions to the Advanced Data Processing Pipeline project. To contribute, please follow these guidelines:
1. Fork the repository and create a new branch for your feature or bugfix.
2. Write clear, concise commit messages.
3. Ensure that your code follows the project's coding standards and passes all tests.
4. Submit a pull request with a detailed description of your changes.
## Changelog
### Version 0.2.6
- Added environment setup instructions to the README.
- Added contribution guidelines to the README.
- Added changelog to the README.
- Added performance reports and pipeline diagrams.
- Added baseline model metrics for comparison.
91 changes: 80 additions & 11 deletions adv_data_processing/batch_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from typing import Iterator, Tuple, Optional, List
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import math
import psutil
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

@dataclass
class BatchConfig:
Expand All @@ -19,39 +20,92 @@ class BatchConfig:

class TabularDataset(Dataset):
"""Dataset wrapper for tabular data."""
def __init__(self, features: np.ndarray, targets: Optional[np.ndarray] = None):
def __init__(self, features: np.ndarray, targets: Optional[np.ndarray] = None) -> None:
"""
Initialize the dataset with features and optional targets.
Args:
features (np.ndarray): Feature data.
targets (Optional[np.ndarray], optional): Target data. Defaults to None.
"""
self.features = torch.FloatTensor(features)
self.targets = torch.FloatTensor(targets) if targets is not None else None

def __len__(self) -> int:
"""
Return the number of samples in the dataset.
Returns:
int: Number of samples.
"""
return len(self.features)

def __getitem__(self, idx: int) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
Get a sample and its target from the dataset.
Args:
idx (int): Index of the sample.
Returns:
Tuple[torch.Tensor, Optional[torch.Tensor]]: Feature and target tensors.
"""
if self.targets is not None:
return self.features[idx], self.targets[idx]
return self.features[idx], None

def get_optimal_batch_size(data_size: int, sample_size_bytes: int) -> int:
"""Calculate optimal batch size based on available memory."""
"""
Calculate optimal batch size based on available memory.
Args:
data_size (int): Total number of samples.
sample_size_bytes (int): Size of a single sample in bytes.
Returns:
int: Optimal batch size.
"""
available_memory = psutil.virtual_memory().available
target_memory_usage = available_memory * 0.7 # Use 70% of available memory

batch_size = int(target_memory_usage / (sample_size_bytes * 2)) # Factor of 2 for safety
logger.info(f"Calculated optimal batch size: {batch_size}")
return min(batch_size, data_size)

def get_memory_efficiency_score(batch_size: int, features_shape: Tuple[int, ...]) -> float:
"""Calculate memory efficiency score for given batch size."""
"""
Calculate memory efficiency score for given batch size.
Args:
batch_size (int): Batch size.
features_shape (Tuple[int, ...]): Shape of the feature data.
Returns:
float: Memory efficiency score.
"""
sample_size = np.prod(features_shape) * 4 # 4 bytes per float32
batch_memory = sample_size * batch_size
available_memory = psutil.virtual_memory().available
return 1 - (batch_memory / available_memory)
score = 1 - (batch_memory / available_memory)
logger.info(f"Memory efficiency score for batch size {batch_size}: {score}")
return score

def optimize_batch_configuration(
data_shape: Tuple[int, ...],
target_memory_usage: float = 0.7,
max_workers: int = 8
) -> BatchConfig:
"""Optimize batch processing configuration based on system resources."""
"""
Optimize batch processing configuration based on system resources.
Args:
data_shape (Tuple[int, ...]): Shape of the data.
target_memory_usage (float, optional): Target memory usage as a fraction. Defaults to 0.7.
max_workers (int, optional): Maximum number of workers. Defaults to 8.
Returns:
BatchConfig: Optimized batch configuration.
"""
cpu_count = psutil.cpu_count(logical=False)
num_workers = min(cpu_count - 1, max_workers)

Expand All @@ -60,29 +114,44 @@ def optimize_batch_configuration(
np.prod(data_shape[1:]) * 4
)

return BatchConfig(
config = BatchConfig(
batch_size=optimal_batch_size,
num_workers=num_workers,
pin_memory=torch.cuda.is_available(),
prefetch_factor=2
)
logger.info(f"Optimized batch configuration: {config}")
return config

def create_data_loader(
features: np.ndarray,
targets: Optional[np.ndarray] = None,
batch_size: Optional[int] = None,
num_workers: int = 4
) -> DataLoader:
"""Create a DataLoader with optimal batch size."""
"""
Create a DataLoader with optimal batch size.
Args:
features (np.ndarray): Feature data.
targets (Optional[np.ndarray], optional): Target data. Defaults to None.
batch_size (Optional[int], optional): Batch size. Defaults to None.
num_workers (int, optional): Number of workers. Defaults to 4.
Returns:
DataLoader: DataLoader for the dataset.
"""
if batch_size is None:
sample_size = features[0].nbytes
batch_size = get_optimal_batch_size(len(features), sample_size)

dataset = TabularDataset(features, targets)
return DataLoader(
data_loader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=True,
shuffle=True
)
)
logger.info(f"Created DataLoader with batch size {batch_size} and {num_workers} workers")
return data_loader
91 changes: 82 additions & 9 deletions adv_data_processing/cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@
logger = logging.getLogger(__name__)

def validate_data(df: dd.DataFrame, schema: Dict[str, Any]) -> bool:
"""Validate the dataframe against the given schema."""
"""
Validate the dataframe against the given schema.
Args:
df (dd.DataFrame): The dataframe to validate.
schema (Dict[str, Any]): The schema to validate against.
Returns:
bool: True if the dataframe is valid, False otherwise.
"""
try:
for column, rules in schema.items():
if 'type' in rules and df[column].dtype != rules['type']:
Expand All @@ -25,7 +34,16 @@ def validate_data(df: dd.DataFrame, schema: Dict[str, Any]) -> bool:
return False

def clean_data(df: dd.DataFrame, config: Dict[str, Any]) -> dd.DataFrame:
"""Clean the dataframe based on the provided configuration."""
"""
Clean the dataframe based on the provided configuration.
Args:
df (dd.DataFrame): The dataframe to clean.
config (Dict[str, Any]): The configuration for cleaning.
Returns:
dd.DataFrame: The cleaned dataframe.
"""
try:
if not validate_data(df, config.get('schema', {})):
raise ValueError("Data validation failed")
Expand All @@ -41,7 +59,17 @@ def clean_data(df: dd.DataFrame, config: Dict[str, Any]) -> dd.DataFrame:
raise

def handle_missing_values(df: dd.DataFrame, column: str, strategy: Dict[str, str]) -> dd.DataFrame:
"""Handle missing values based on the provided strategy."""
"""
Handle missing values based on the provided strategy.
Args:
df (dd.DataFrame): The dataframe to handle missing values in.
column (str): The column to handle missing values in.
strategy (Dict[str, str]): The strategy for handling missing values.
Returns:
dd.DataFrame: The dataframe with missing values handled.
"""
try:
if df[column].isnull().any().compute():
if strategy.get('missing') == 'drop':
Expand All @@ -60,7 +88,17 @@ def handle_missing_values(df: dd.DataFrame, column: str, strategy: Dict[str, str
raise

def handle_outliers(df: dd.DataFrame, columns: List[str], method: str = 'iqr') -> dd.DataFrame:
"""Handle outliers using the specified method."""
"""
Handle outliers using the specified method.
Args:
df (dd.DataFrame): The dataframe to handle outliers in.
columns (List[str]): The columns to handle outliers in.
method (str, optional): The method to use for handling outliers. Defaults to 'iqr'.
Returns:
dd.DataFrame: The dataframe with outliers handled.
"""
for column in columns:
if method == 'iqr':
Q1 = df[column].quantile(0.25)
Expand All @@ -76,15 +114,33 @@ def handle_outliers(df: dd.DataFrame, columns: List[str], method: str = 'iqr') -
return df

def remove_duplicates(df: dd.DataFrame) -> dd.DataFrame:
"""Remove duplicate rows from the dataframe."""
"""
Remove duplicate rows from the dataframe.
Args:
df (dd.DataFrame): The dataframe to remove duplicates from.
Returns:
dd.DataFrame: The dataframe with duplicates removed.
"""
try:
return df.drop_duplicates()
except Exception as e:
logger.error(f"Error removing duplicates: {str(e)}")
raise

def handle_outliers_zscore(df: dd.DataFrame, columns: List[str], threshold: float = 3) -> dd.DataFrame:
"""Handle outliers using the Z-score method."""
"""
Handle outliers using the Z-score method.
Args:
df (dd.DataFrame): The dataframe to handle outliers in.
columns (List[str]): The columns to handle outliers in.
threshold (float, optional): The Z-score threshold. Defaults to 3.
Returns:
dd.DataFrame: The dataframe with outliers handled.
"""
try:
for col in columns:
z_scores = (df[col] - df[col].mean()) / df[col].std()
Expand All @@ -95,7 +151,16 @@ def handle_outliers_zscore(df: dd.DataFrame, columns: List[str], threshold: floa
raise

def convert_datatypes(df: dd.DataFrame, dtype_dict: Dict[str, str]) -> dd.DataFrame:
"""Convert column datatypes based on the provided dictionary."""
"""
Convert column datatypes based on the provided dictionary.
Args:
df (dd.DataFrame): The dataframe to convert datatypes in.
dtype_dict (Dict[str, str]): The dictionary of column datatypes to convert.
Returns:
dd.DataFrame: The dataframe with datatypes converted.
"""
try:
for col, dtype in dtype_dict.items():
df[col] = df[col].astype(dtype)
Expand All @@ -105,7 +170,16 @@ def convert_datatypes(df: dd.DataFrame, dtype_dict: Dict[str, str]) -> dd.DataFr
raise

def impute_missing_values(df: dd.DataFrame, strategies: Dict[str, str]) -> dd.DataFrame:
"""Impute missing values using specified strategies."""
"""
Impute missing values using specified strategies.
Args:
df (dd.DataFrame): The dataframe to impute missing values in.
strategies (Dict[str, str]): The strategies for imputing missing values.
Returns:
dd.DataFrame: The dataframe with missing values imputed.
"""
try:
for col, strategy in strategies.items():
if strategy == 'mean':
Expand All @@ -120,4 +194,3 @@ def impute_missing_values(df: dd.DataFrame, strategies: Dict[str, str]) -> dd.Da
except Exception as e:
logger.error(f"Error imputing missing values: {str(e)}")
raise

5 changes: 3 additions & 2 deletions adv_data_processing/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

"""Configuration management for the data processing pipeline."""

import os
Expand Down Expand Up @@ -26,6 +25,7 @@ def from_yaml(cls, config_path: str) -> 'PipelineConfig':
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info(f"Loaded config from {config_path}")
return cls(**config)
except Exception as e:
logger.error(f"Failed to load config from {config_path}: {str(e)}")
Expand All @@ -42,7 +42,8 @@ def validate_config(config: PipelineConfig) -> bool:
for section in required_sections:
if not hasattr(config, section):
raise ValueError(f"Missing required config section: {section}")
logger.info("Config validation successful")
return True
except Exception as e:
logger.error(f"Config validation failed: {str(e)}")
raise
raise
Loading

0 comments on commit 23f67ff

Please sign in to comment.