Skip to content

stochastic-sisyphus/adv_data_processing_pipeline

Repository files navigation

Advanced Data Processing Pipeline

This project implements a sophisticated data processing pipeline using Python, designed to handle large-scale data processing tasks efficiently. The pipeline includes various stages such as data loading, cleaning, transformation, analysis, and visualization.

This package is available on PyPI, and you can view it at the provided URL: https://pypi.org/project/advanced-data-processing/.

PyPI version

Features

  • Flexible data loading from various sources (CSV, Excel, JSON, Parquet, SQL databases, APIs, S3)
  • Efficient data cleaning and preprocessing using Dask for large datasets
  • Advanced data transformation techniques (scaling, encoding, feature engineering)
  • Text analytics capabilities (sentiment analysis, summarization)
  • Named Entity Recognition (NER) for extracting entities from text data
  • Topic modeling for uncovering latent topics in text corpora
  • Data visualization tools for exploratory data analysis
  • Feature selection and dimensionality reduction techniques
  • Integration with machine learning models for predictive analytics
  • Robust error handling and logging mechanisms
  • Configurable pipeline steps via YAML configuration files
  • Distributed processing and caching for improved performance
  • Automatic feature engineering
  • Handling of imbalanced datasets
  • Automatic hyperparameter tuning
  • Automatic model checkpointing and experiment tracking
  • Integrated GPU support with fallback to CPU
  • Comprehensive logging and monitoring
  • Model explainability tools (SHAP, LIME)
  • Data drift detection and monitoring
  • API endpoint generation for model serving
  • Model versioning and registry
  • Automated model documentation generation
  • CI/CD pipeline integration
  • Containerization support with Docker
  • Efficient batch processing with automatic memory optimization
  • GPU acceleration support via PyTorch
  • Advanced performance monitoring and optimization

Requirements

See requirements.txt for a full list of dependencies. Key libraries include:

  • pandas>=1.3.0
  • dask[complete]>=2022.1.0
  • scikit-learn>=1.0.0
  • numpy>=1.20.0
  • psutil>=5.9.0
  • torch>=2.0.0
  • mlflow>=2.6.0
  • optuna>=3.0.0
  • wandb>=0.15.0
  • ray>=2.6.0

Installation

You can install the package directly from PyPI:

pip install advanced-data-processing

Alternatively, if you are working from a local clone of the repository:

Install the required dependencies:

pip install -r requirements.txt

To build and install the package locally:

pip install -e .

For development installation with all features:

pip install -r requirements-dev.txt

Usage

Basic Usage

To use the package in another Python project:

from advanced_data_processing import process_data, load_data, clean_data

# Use the functions as needed
data = load_data("path/to/your/data.csv")
cleaned_data = clean_data(data)
processed_data = process_data(cleaned_data, steps=['transform', 'feature_engineering'])

Configuration

Configure your pipeline in config.yaml:

source: 'path/to/your/data.csv'
steps: ['load', 'clean', 'transform']
output_file: 'path/to/output.csv'
# Add other configuration parameters as needed

The config.yaml file should include the following parameters:

  • source: Path to the input data file
  • steps: List of processing steps to execute
  • output_file: Path for the processed output file
  • file_type: Type of the input file (e.g., 'csv', 'json', 'parquet')
  • text_column: Name of the column containing text data (for text analytics)
  • model_type: Type of model to use for predictive analytics

Command-line Usage

Run the pipeline from the command line:

adp --config config.yaml

Or:

python data_processing/main.py --config config.yaml

Command-line Arguments

You can customize the pipeline execution with various command-line arguments:

  • --resume: Resume from a saved pipeline state
  • --plugins: Load custom plugins (specify paths to plugin files)
  • --n_workers: Number of workers for parallel processing
  • --scheduler_address: Address of the Dask scheduler for distributed processing
  • --visualize: Generate visualizations
  • --analyze_text: Perform text analytics
  • --use_cache: Use cached results
  • --generate_report: Generate a comprehensive report
  • --auto_feature_engineering: Perform automatic feature engineering
  • --handle_imbalanced: Handle imbalanced datasets
  • --auto_tune: Perform automatic hyperparameter tuning

Examples

Generate visualizations:

python data_processing/main.py --config config.yaml --visualize

Perform text analytics:

python data_processing/main.py --config config.yaml --analyze_text

Use cached results and generate a report:

python data_processing/main.py --config config.yaml --use_cache --generate_report

Perform automatic feature engineering and handle imbalanced data:

python data_processing/main.py --config config.yaml --auto_feature_engineering --handle_imbalanced

Advanced Features

Custom Plugins

You can extend the pipeline's functionality using custom plugins:

  1. Create a Python file with your custom function(s).
  2. Use the --plugins argument to specify the path to your plugin file(s) when running the pipeline.

Resuming from a Saved State

You can resume the pipeline from a previously saved state using the --resume option:

python data_processing/main.py --config config.yaml --resume pipeline_state_step_name.pkl

Distributed Processing

This pipeline uses Dask for distributed processing. You can specify the number of workers or provide a Dask scheduler address:

python data_processing/main.py --config config.yaml --n_workers 4

or

python data_processing/main.py --config config.yaml --scheduler_address tcp://scheduler-address:8786

You can also set a memory limit for Dask workers:

python data_processing/main.py --config config.yaml --n_workers 4 --memory_limit 4GB

Caching and Intermediate Results

To use caching and save intermediate results:

python data_processing/main.py --config config.yaml --use_cache --save_intermediate --intermediate_path ./intermediate/

Automatic Hyperparameter Tuning

To perform automatic hyperparameter tuning for machine learning models:

python data_processing/main.py --config config.yaml --auto_tune

Customizing the Pipeline

The pipeline can be customized for different types of datasets by modifying the configuration file. Here are some examples:

For Time-Series Data:

feature_engineering:
  create_time_features: true
  time_column: 'timestamp'

data_transformation:
  numerical_features:
    - 'value'
    - 'year'
    - 'month'
    - 'day'
  categorical_features:
    - 'day_of_week'
  scaling_method: 'minmax'

For NLP Data:

feature_engineering:
  create_text_features: true
  text_columns:
    - 'text_content'

data_transformation:
  text_features:
    - 'text_content'
  text_vectorization_method: 'tfidf'

For Tabular Data:

feature_engineering:
  create_polynomial_features: true
  create_interaction_features: true

data_transformation:
  numerical_features:
    - 'feature1'
    - 'feature2'
  categorical_features:
    - 'category1'
    - 'category2'
  scaling_method: 'standard'
  encoding_method: 'onehot'

Pipeline Steps

The main processing steps are defined in the process_data function. These include:

Data Loading

The pipeline supports loading data from various sources.

Data Cleaning

Data cleaning operations include handling missing values, outliers, and duplicates.

Data Transformation

The pipeline offers various data transformation techniques.

Feature Engineering

Automatic feature engineering is supported.

Handling Imbalanced Data

The pipeline can handle imbalanced datasets.

Error Handling

Robust error handling is implemented throughout the pipeline.

Comprehensive Report

To generate a comprehensive report of the data processing steps and results, use the --generate_report flag:

python data_processing/main.py --config config.yaml --generate_report

The report includes:

  • Configuration details
  • Completed processing steps
  • Data shape and types
  • Summary statistics
  • Output file location

The report is saved as 'pipeline_report.txt' in the project directory.

Example Usage

Here's a detailed example of how to use the pipeline:

from advanced_data_processing import process_data, load_config

# Load configuration
config = load_config('config.yaml')

# Process data
processed_data = process_data('path/to/your/data.csv', config=config)

# Save processed data
processed_data.to_csv('processed_data.csv', index=False)

To run the pipeline from the command line with all options:

python main.py --config config.yaml --output processed_data.csv --visualize --analyze_text --extract_entities --model_topics --select_features --reduce_dimensions --validate_schema --summary_stats --auto_feature_engineering --handle_imbalanced --auto_tune

Contributing

Contributions to improve the pipeline are welcome. Please follow these steps:

  1. Fork the repository
  2. Create a new branch (git checkout -b feature-branch)
  3. Make your changes and commit (git commit -am 'Add some feature')
  4. Push to the branch (git push origin feature-branch)
  5. Create a new Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

For more detailed usage instructions and examples, please refer to the full documentation here.

Tutorials and Examples

Time Series Analysis Example

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published