Skip to content

Commit

Permalink
Add parquet export function
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi committed Jun 13, 2024
1 parent 866c68c commit e8bd7f1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 35 deletions.
9 changes: 9 additions & 0 deletions pathways/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
load_numpy_array_from_disk,
load_units_conversion,
resize_scenario_data,
export_results_to_parquet
)


Expand Down Expand Up @@ -506,3 +507,11 @@ def _fill_in_result_array(self, results: dict):

def display_results(self, cutoff: float = 0.001) -> xr.DataArray:
return display_results(self.lca_results, cutoff=cutoff)

def export_results(self, filename: str) -> None:
"""
Export the non-zero LCA results to a compressed parquet file.
:param filename: str. The name of the file to save the results.
:return: None
"""
export_results_to_parquet(self.lca_results, filename)
108 changes: 73 additions & 35 deletions pathways/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@

import csv
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Set, Tuple, Union

import numpy as np
import xarray as xr
import yaml
import pandas as pd
from premise.geomap import Geomap

from .filesystem_constants import DATA_DIR, DIR_CACHED_DB, USER_LOGS_DIR

CLASSIFICATIONS = DATA_DIR / "activities_classifications.yaml"
UNITS_CONVERSION = DATA_DIR / "units_conversion.yaml"


logging.basicConfig(
level=logging.DEBUG,
filename=USER_LOGS_DIR / "pathways.log", # Log file to save the entries
Expand Down Expand Up @@ -105,16 +106,16 @@ def harmonize_units(scenario: xr.DataArray, variables: list) -> xr.DataArray:
)
# multiply scenario by conversion factors
scenario.loc[dict(variables=variables)] *= conversion_factors[
:, np.newaxis, np.newaxis
]
:, np.newaxis, np.newaxis
]
# update units
scenario.attrs["units"] = {var: "EJ/yr" for var in variables}

return scenario


def get_unit_conversion_factors(
scenario_unit: dict, dataset_unit: list, unit_mapping: dict
scenario_unit: dict, dataset_unit: list, unit_mapping: dict
) -> np.ndarray:
"""
Get the unit conversion factors for a given scenario unit and dataset unit.
Expand All @@ -139,15 +140,15 @@ def load_units_conversion() -> dict:


def create_lca_results_array(
methods: [List[str], None],
years: List[int],
regions: List[str],
locations: List[str],
models: List[str],
scenarios: List[str],
classifications: dict,
mapping: dict,
use_distributions: bool = False,
methods: [List[str], None],
years: List[int],
regions: List[str],
locations: List[str],
models: List[str],
scenarios: List[str],
classifications: dict,
mapping: dict,
use_distributions: bool = False,
) -> xr.DataArray:
"""
Create an xarray DataArray to store Life Cycle Assessment (LCA) results.
Expand Down Expand Up @@ -216,10 +217,47 @@ def create_lca_results_array(
return xr.DataArray(np.zeros(dims), coords=coords, dims=list(coords.keys()))


def export_results_to_parquet(lca_results: xr.DataArray, filepath: str):
"""
Export the LCA results to a parquet file.
:param lca_results: Xarray DataArray with LCA results.
:param filepath: The path to the parquet file.
:return: None
"""
if filepath is None:
filepath = f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.gzip"
else:
filepath = f"{filepath}.gzip"

flattened_data = lca_results.values.flatten()

# Step 2: Find the indices of non-zero values
non_zero_indices = np.nonzero(flattened_data)[0]

# Step 3: Extract non-zero values
non_zero_values = flattened_data[non_zero_indices]

# Step 4: Get the shape of the original DataArray
original_shape = lca_results.shape

# Step 5: Find the coordinates corresponding to the non-zero indices
coords = np.array(np.unravel_index(non_zero_indices, original_shape)).T

# Step 6: Create a pandas DataFrame with non-zero values and corresponding coordinates
coord_names = list(lca_results.dims)
coord_values = {dim: lca_results.coords[dim].values[coords[:, i]] for i, dim in enumerate(coord_names)}

df = pd.DataFrame(coord_values)
df['value'] = non_zero_values
df.to_parquet(path=filepath, compression='gzip')

print(f"Results exported to {filepath}")


def display_results(
lca_results: Union[xr.DataArray, None],
cutoff: float = 0.001,
interpolate: bool = False,
lca_results: Union[xr.DataArray, None],
cutoff: float = 0.001,
interpolate: bool = False,
) -> xr.DataArray:
"""
Display the LCA results.
Expand Down Expand Up @@ -290,12 +328,12 @@ def clean_cache_directory():


def resize_scenario_data(
scenario_data: xr.DataArray,
model: List[str],
scenario: List[str],
region: List[str],
year: List[int],
variables: List[str],
scenario_data: xr.DataArray,
model: List[str],
scenario: List[str],
region: List[str],
year: List[int],
variables: List[str],
) -> xr.DataArray:
"""
Resize the scenario data to the given scenario, year, region, and variables.
Expand Down Expand Up @@ -334,10 +372,10 @@ def resize_scenario_data(


def get_activity_indices(
activities: List[Tuple[str, str, str, str]],
technosphere_index: Dict[Tuple[str, str, str, str], Any],
geo: Geomap,
debug: bool = False,
activities: List[Tuple[str, str, str, str]],
technosphere_index: Dict[Tuple[str, str, str, str], Any],
geo: Geomap,
debug: bool = False,
) -> List[int]:
"""
Fetch the indices of activities in the technosphere matrix, optimized for efficiency.
Expand Down Expand Up @@ -383,7 +421,7 @@ def get_activity_indices(


def fetch_indices(
mapping: dict, regions: list, variables: list, technosphere_index: dict, geo: Geomap
mapping: dict, regions: list, variables: list, technosphere_index: dict, geo: Geomap
) -> dict:
"""
Fetch the indices for the given activities in the technosphere matrix.
Expand Down Expand Up @@ -441,7 +479,7 @@ def fetch_indices(


def fetch_inventories_locations(
technosphere_indices: Dict[str, Tuple[str, str, str]]
technosphere_indices: Dict[str, Tuple[str, str, str]]
) -> List[str]:
"""
Fetch the locations of the inventories.
Expand Down Expand Up @@ -479,7 +517,7 @@ def csv_to_dict(filename: str) -> dict[int, tuple[str, ...]]:


def check_unclassified_activities(
technosphere_indices: dict, classifications: dict
technosphere_indices: dict, classifications: dict
) -> List:
"""
Check if there are activities in the technosphere matrix that are not in the classifications.
Expand All @@ -501,7 +539,7 @@ def check_unclassified_activities(


def _group_technosphere_indices(
technosphere_indices: dict, group_by, group_values: list
technosphere_indices: dict, group_by, group_values: list
) -> dict:
"""
Generalized function to group technosphere indices by an arbitrary attribute (category, location, etc.).
Expand Down Expand Up @@ -555,7 +593,7 @@ def gather_filters(current_level: Dict, combined_filters: Dict[str, Set[str]]) -


def get_combined_filters(
filters: Dict, paths: List[List[str]]
filters: Dict, paths: List[List[str]]
) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
"""
Traverse the filters dictionary to get combined filter criteria based on multiple paths.
Expand Down Expand Up @@ -598,10 +636,10 @@ def get_combined_filters(


def apply_filters(
technosphere_inds: Dict[Tuple[str, str, str, str], int],
filters: Dict[str, List[str]],
exceptions: Dict[str, List[str]],
paths: List[List[str]], # Add paths as an argument
technosphere_inds: Dict[Tuple[str, str, str, str], int],
filters: Dict[str, List[str]],
exceptions: Dict[str, List[str]],
paths: List[List[str]], # Add paths as an argument
) -> Tuple[List[int], List[int], Dict[str, Set[str]], Dict[str, Set[str]]]:
"""
Apply the filters to the database and return a list of indices and exceptions,
Expand Down

0 comments on commit e8bd7f1

Please sign in to comment.