Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: r concurrent handler #1295

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft

fix: r concurrent handler #1295

wants to merge 3 commits into from

Conversation

sorhawell
Copy link
Collaborator

@sorhawell sorhawell commented Nov 21, 2024

THIS PR is a DRAFT DO NOT MERGE.

Not fully tested, but promising.

r concurrent handler was reported to crash throughout R session if an R error is raised in an R function embedded in a polars query, reported in #1253

Annother edge case if starting a new r-polars query within an r user function within an r-polars query did not work well.

Solution:

  • ~~use once_cell (not the actual problem, but still a more safe choice with no undefiend behavior warnings in docs)
  • instead of having a single shared concurrent handler, there is now a stack of concurrent handler. Each sub query will push a handler onto the stack.~~ I was all fine all along
  • if an error occours the handler stack will unwind and clean up leaving no dangling communication channels (ThreadComs)

EDIT: It looked up InitCell from the wrong crate :/ the actual docs says it is fine to mutate later
https://docs.rs/state/0.6.0/state/struct.InitCell.html

@etiennebacher
Copy link
Collaborator

Hi @sorhawell and welcome back!

As you may have seen mentioned in some issues, we're currently rewriting r-polars to take advantage of rlang and savvy to have a better match with the structure of py-polars on the Rust side (among other things): https://github.com/eitsupi/neo-r-polars (it's easier to work on a separate repo but eventually it will be merged here).

Therefore it would be great if you could contribute there directly. From what I can see your changes don't involve extendr-related code so porting this to the rewrite should be doable (but I'm no expert on this part so I'll let you assess that).

@sorhawell
Copy link
Collaborator Author

sorhawell commented Nov 24, 2024

Hi @etiennebacher

Many thanks. I see you and eitsupi have surpassed me in contributions. You have made a lot of hard work. I'd like to support any attempt towards a better polars experience, maybe the neo-polars rewrite will be the best thing since sliced bread :)
My contributions will be small and scarce though, as I got caught up in family and job. Maybe I can help out with ThreadCom stuff some times.

When do you plan to switch to neo-polars? What is you policy on patching "r-polars" main while?

As far as neo-polars is now it is still early.

  • Should it be possible to call map_batches from within map_batches?
    If yes, ultimately I think there will be a need for multiple R sessions to avoid a dead lock like here:

Test from this PR
image

It is a bit exotic to do. So maybe just say no :)
To make a new polars query within map_batches can be usefule if needing to map multiple colummns with an R user function. Multiple columns can be packed into single struct series and unnested as a DataFrame within map_batches. Even if allowing the inner polars query, then calling map_batches again might be prohibited.

@etiennebacher
Copy link
Collaborator

etiennebacher commented Nov 25, 2024

My contributions will be small and scarce though, as I got caught up in family and job. Maybe I can help out with ThreadCom stuff some times.

No problem about that, it would be great to have someone experienced on those. I can focus on the things that are not very technically challenging, so it's good if you focus on the thing where you have the most added value.

When do you plan to switch to neo-polars? What is you policy on patching "r-polars" main while?

We don't have a clear timeline in mind. @eitsupi built the infrastructure from May I think and I started contributing in October. I think merging neo-r-polars in January is feasible, but of course that will depend on how much time everyone can put in it. @eitsupi thoughts on that?

Regarding patching r-polars: I don't plan on investing a lot of time updating r-polars, I mostly focus on neo-r-polars. I don't mind making a new r-polars release before the big merge but I won't push for it.

Should it be possible to call map_batches from within map_batches?

Does that work in py-polars? That's often the main argument for deciding whether or not we should support something. IMO that's not a priority and I'd rather let it aside if it adds a lot of complexity. Having a clean implementation of map_batches() that we are sure doesn't crash or has weird side effects would be nice already ;)

@sorhawell
Copy link
Collaborator Author

I would like to rewrite this PR to fix the current bugs, in the smallest changes possible. That would give value and trust today.

py-polars does not explicitly have "collect_in_background" as r-polars where a query is ofloaded to another OS thread via rust. I made that up. py-polars does not support running map_batches with a process pool backend. However, python supports threads natively and use of nested map_batches will not dead-lock because the threads can advance independantly. Also PyO3 the python version of extendr/savvy has GIL abstractions as "Python::with_gil and Python::acquire_gil that helps alot making a clean simple map_batches implementation. In R there is only one thread per session/process. In python it is also fairly straight forward to ofload a polars query to a python-thread without blocking other work or other queries. See examples below.

I will think about of there is a way to emulate threads enough to avoid dead-locks without resorting to an R process pool. Otherwise docs should state that nested queries has limited support.

I don't konw of any way in R to run a polars query in background without enabling that feature on rust side explicitly. Alternatively the R user must resort to clusters for non blocking queries which is not ideal. Mirai is getting close.

import polars as pl
from concurrent.futures import ThreadPoolExecutor
import time


# example 1 py-polars supports embedded queries 
def f(s: pl.Series) -> pl.Series:
    s2 = s.to_frame().lazy().select(
        pl.col("a").map_batches(lambda s: s*3)
    ).collect().to_series(0)
    return s2 + 1 

pl.DataFrame({"a" : [1,2,3]}).select(
    pl.col("a").map_batches(f)
)

# py-polars supports running a query "in background"
# python supports threads, and even with GIL this is quite useful.
# while thread is performing rust-polars work or sleeping other python threads can advance mainwhile

# Sample DataFrame
df = pl.DataFrame({
    "name": ["Alice", "Bob", "Charlie", "David", "Eve"],
    "age": [25, 32, 29, 40, 23],
    "city": ["New York", "London", "Berlin", "Paris", "Madrid"]
})

# Function to simulate batch processing with sleep
def process_batch(s: pl.Series) -> pl.Series:
    print(f"Processing batch:\n{s}")
    time.sleep(2)  # Simulate delay
    return s *2 

# Function to execute Polars job
def job_with_map_batches(df: pl.DataFrame) -> pl.DataFrame:
    print("Starting job...")
    result = df.select(pl.col("age").map_batches(process_batch))
    print("Job finished!")
    return result

def job_without_map_batches(df):
    print("Starting job...")
    result = df
    print("Job finished!")
    return result

# Run multiple jobs concurrently
with ThreadPoolExecutor() as executor:
    future_1 = executor.submit(job_with_map_batches, df)
    future_2 = executor.submit(job_without_map_batches, df)

    # Get results
    result_2 = future_2.result() # can read result 2 before 1 has finished
    print(result_2)
    result_1 = future_1.result() # and 2 sec later result two finishes
    print(result_1)

@eitsupi
Copy link
Collaborator

eitsupi commented Nov 26, 2024

Thanks both!

I don't have a clear timeline either, but the current main branch is not compatible with R 4.5 (now r-devel) and shouldn't be unless someone works on it, so it's probably worth replacing it with the next branch around February.
Thanks to the large amount of work @etiennebacher has done, what's missing is fairly minimal.

In the meantime, I am happy to do minimal fixes or review someone else's PR.
I don't think I will be actively adding features.

As for map_batches, I wonder how necessary this feature is in R relative to the complexity.
In other words, since it is common in R to use something like lapply for list, wouldn't it be enough to make a list once and then use lapply without daring to do so within the framework of Polars?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants