Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate polars and numba in aggregations #370

Open
samlamont opened this issue Jan 13, 2025 · 5 comments
Open

Investigate polars and numba in aggregations #370

samlamont opened this issue Jan 13, 2025 · 5 comments

Comments

@samlamont
Copy link
Collaborator

samlamont commented Jan 13, 2025

Some research notes: We may have already explored this, but aggregations in polars can take UDF's accessing multiple dataframe columns. It can perform operations lazily, and supports method chaining. Seems like it could potentially fit into our current framework fairly easily if we wanted an alternate backend to pyspark, if horizontal scaling is not needed. (API ?)

We can also take advantage of numba decorators in the metric functions to potentially improve query performance (in our current framework and/or with polars).

I haven't done any benchmarking, but wanted to add some notes here.

Example:

import polars as pl
from numba import float64, guvectorize.  

EV_DIR = "test_evaluation/dataset/joined_timeseries/**/*.parquet"
pl_df = pl.scan_parquet(EV_DIR, glob=True) # lazy loading

@guvectorize([(float64[:], float64[:], float64[:])], '(n),(n)->()')
def kling_gupta_efficiency_guvect(p, s, result):
    """Kling-Gupta Efficiency (2009)."""
    if np.std(s) == 0 or np.std(p) == 0:
        result[:] = np.nan

    # Pearson correlation coefficient
    linear_correlation = np.corrcoef(s, p)[0,1]

    # Relative variability
    relative_variability = np.std(s) / np.std(p)

    # Relative mean
    relative_mean = np.mean(s) / np.mean(p)

    # Scaled Euclidean distance
    euclidean_distance = np.sqrt(
        (1.0 * (linear_correlation - 1.0)) ** 2.0 +
        (1.0  * (relative_variability - 1.0)) ** 2.0 +
        (1.0  * (relative_mean - 1.0)) ** 2.0
    )
    result[:] = 1.0 - euclidean_distance

# Calcuating KGE grouping by `primary_location_id`:
pl_df.group_by("primary_location_id").agg(
        pl.struct(["primary_value", "secondary_value"])
        .map_batches(
        lambda combined: kling_gupta_efficiency_guvect(
            combined.struct.field("primary_value"), combined.struct.field("secondary_value")
        )
    ).alias("kling_gupta_efficiency")
).collect()

We can use a wrapper around the functions to implement numba in our current pyspark framework:

def kling_gupta_efficiency(p: pd.Series, s: pd.Series) -> float:
    """Kling-Gupta Efficiency (2009)."""
    return kling_gupta_efficiency_guvect(p.values, s.values)
@mgdenno
Copy link
Contributor

mgdenno commented Jan 14, 2025

This is an interesting idea. Would be good to do a quick test in the 40-yr retro that is in TEEHR-HUB and see how the performance varies compared to PySpark. Assuming it would be a lot faster but would be good to understand how much.

@mgdenno
Copy link
Contributor

mgdenno commented Jan 14, 2025

For some reason I thought Polars did not support this multi column aggregates, but I guess I was wrong.

@samlamont
Copy link
Collaborator Author

For some reason I thought Polars did not support this multi column aggregates, but I guess I was wrong.

I thought the same thing and just kind of stumbled on this. The multiple column approach is listed in the UDF section of the rust docs and not in the aggregation section so it's easy to miss, also not sure when this was added

I did some query testing in teehr-hub (large instance), calculating KGE grouping by primary_location_id (notebook: /data/playground/slamont/polars-test-query-comparison.ipynb):

e1_camels_daily_streamflow

  • teehr: 8 s
  • polars: 9 ms

e2_camels_hourly_streamflow

  • teehr: 21 s
  • polars: 6 s

40-yr retrospective (pointing polars to playground/mdenno/40-yr-retrospective/dataset/joined/**/*.parquet)

  • teehr: ~2 min 5 sec (best of 3)
  • polars: ~ 2 min 30 sec (best of 3)

Polars is generally faster on smaller datasets, seems to rely more on memory. PySpark more efficient on 40-yr. I had to set streaming=True for polars on the 40-yr to avoid running out of memory (not sure what this does, documentation is limited)

Also, using a numba function for KGE in pyspark did not improve performance (actually a bit slower), could be that the KGE function is already vectorized due to numpy.

@mgdenno
Copy link
Contributor

mgdenno commented Jan 18, 2025

This is great. Have you given thought to if/how we could possibly support multiple backends? Would be really awesome to have a faster option. Assuming polars also supports grouping and filtering. Seems like all the pieces are there. I think pandera supports polars too.

@christophertubbs
Copy link

I've had good luck with polars in the past and found it to be far more effective than pandas.

I did notice that it seemed to have a lot of 'gotchas' - not necessarily things that break functionality but that there are 'wrong' ways to do things. Breaking up operations into easily readable chunks, using your own functions, and mixing in other libraries (aside from numba) can end up slowing you down.

An implementation of KGE I was last toying around with ended up looking like:

        beta_label: str = "beta"
        beta_column = polars.col(beta_label)

        gamma_label: str = "gamma"
        gamma_column = polars.col(gamma_label)

        kge: polars.Expr = (
                1 - ((CORRELATION_COLUMN - 1)**2 + (beta_column - 1)**2 + (gamma_column - 1)**2).sqrt()
        ).alias(identifier)

        intermediary_data: polars.DataFrame = data.with_columns(
            (MEAN_SIMULATION_COLUMN / MEAN_CONTROL_COLUMN).alias(beta_label),
            (SIMULATION_STANDARD_DEVIATION_COLUMN / CONTROL_STANDARD_DEVIATION_COLUMN).alias(gamma_label)
        )
        intermediary_data = intermediary_data.with_columns(
            kge
        )

CORRELATION_COLUMN, MEAN_SIMULATION_COLUMN, MEAN_CONTROL_COLUMN, SIMULATION_STANDARD_DEVIATION_COLUMN, and CONTROL_STANDARD_DEVIATION_COLUMN are common expression defined as constants due to their reuse.

It very much wants you to use polars.Expr instead of normal functions as it approaches value evaluation like sql databases in that you give it instructions and it takes its own time to find out how best to optimize itself. By stacking expressions prior to evaluation, it finds clever ways to organize data and perform the calculations.

I forgot what the gotchas were with numba, but I remember running into issues with data types not being compatible. That might have been due to me not being used to it.

I performed some experiments on the retrospective and the bottleneck, by far, was data retrieval.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants