Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve MapReduceJob and replace Futures interface (in e.g. TMCS) with joblib #387

Closed
AnesBenmerzoug opened this issue Jul 31, 2023 · 9 comments
Assignees
Labels
breaking-change Changes to the API enhancement New feature or request
Milestone

Comments

@AnesBenmerzoug
Copy link
Collaborator

AnesBenmerzoug commented Jul 31, 2023

Thanks to a new feature in joblib 1.3.0, namely the option to return the results as generator instead of waiting for all of them to finish, I think that we can get rid of the concurrent.Futures interface and replace it with joblib.

The main loop for truncated_montecarlo_shapley can be rewritten using joblib as follows:

from joblib import delayed, effective_n_jobs, Parallel


def truncated_montecarlo_shapley(
    u: Utility,
    *,
    done: StoppingCriterion,
    truncation: TruncationPolicy,
    n_jobs: int = 1,
) -> ValuationResult:
    algorithm = "truncated_montecarlo_shapley"
    # This represents the number of jobs that are running
    n_jobs = effective_n_jobs(n_jobs, config)
    # This determines the total number of submitted jobs
    # including the ones that are running
    n_submitted_jobs = 2 * n_jobs

    accumulated_result = ValuationResult.zeros(algorithm=algorithm)

    with Parallel(n_jobs=n_jobs, return_as="generator") as parallel:
        while not done(accumulated_result):
            delayed_calls = [
                delayed(_permutation_montecarlo_one_step)(u, truncation, algorithm)
                for _ in range(n_submitted_jobs)
            ]
            for result in parallel(delayed_calls):
                accumulated_result += result
                if done(accumulated_result):
                     break
    return accumulated_result

This should be almost the same as the version with the concurrent.Futures interface but with way less custom code.
We can rewrite the MapReduceJob class to use this pattern instead and have the default stopping criterion be a dummy one.

We could take some inspiration from this recent open PR in joblib to implement a similar, but not quite the same, pattern.

Configuration

The users could configure the backend using using joblib's context managers parallel_backend or the newer and better parallel_config:

import joblib

with joblib.parallel_config(backend="loky"):
     truncated_montecarlo_shapley(u, done, truncation)

For ray, the user would initialize it directly if they need to otherwise it will be done automatically:

import joblib
import ray

ray.init()

with joblib.parallel_config(backend="ray"):
     truncated_montecarlo_shapley(u, done, truncation)

This would resolve #360 (handled by joblib's parallel_config function) and allow us to resolve #385

Worker Signalling Mechanism

To make this change even more worthwhile it should also resolve #363

For this we can use Event synchronization variable.
I think this is a better approach than relying on yet another service e.g. redis or queue because it is simpler.

Here's how we could do it for the different backends:

  • For joblib, we can either use the one in the multiprocessing package or the one that comes with Loky (the default joblib backend).

  • For ray, there is no Event class and we would have to implement one using an Actor:

    import ray
    
    @ray.remote
    class Event:
       def __init__(self):
         self._flag = False
    
       def is_set(self):
         return self._flag 
    
       def set(self):
           self._flag = True
    
       def clear(self):
           self._flag = False
  • For dask, there is an Event that we can use.

Cancelling Running Jobs

Unfortunately, 'Loky' the default joblib backend does not terminate workers when we exit the Parallel context manager because it tries to reuse worker processes. We may not want that because the individual tasks may take a long time to finish and cause a memory leak.

Other backends do not suffer from this issue.

There are 2 ways to resolve this:

  • Subclass the LokyBackend and make the terminate function actually stop the worker processes (This idea comes from this stackoverflow answer:

    class TerminatingLokyBackend(LokyBackend):
        def terminate(self):
            if self._workers is not None:            
                self._workers.terminate(kill_workers=False)
                # if kill_workers, joblib terminates "brutally" the remaining workers 
                # and their descendants using SIGKILL
                self._workers = None
            self.reset_batch_stats()

    We can then register this backend and set it as default using joblib's register_parallel_backend:

    import joblib
    
    joblib.register_parallel_backend("terminating_loky", TerminatingLokyBackend, make_default=True)
  • Explicitly call the LokyBackend's abort_everything method once the stopping criterion is met.

    from joblib import Parallel
    
    with Parallel() as parallel:
       # Computation
       ...
       # Should be called before exiting the context
       parallel._backend.abort_everything(ensure_ready=False)

EDIT: Added more details and split it into different sections.

@AnesBenmerzoug AnesBenmerzoug added enhancement New feature or request breaking-change Changes to the API labels Jul 31, 2023
@AnesBenmerzoug AnesBenmerzoug self-assigned this Jul 31, 2023
@AnesBenmerzoug
Copy link
Collaborator Author

@mdbenito opinions? I know I took a long while to write the RayExecutor code but I think it will just get in the way in the future (pun not intended).

@AnesBenmerzoug AnesBenmerzoug changed the title Replace Futures interface in e.g. TMCS with joblib Improve MapReduceJob and replace Futures interface (in e.g. TMCS) with joblib Aug 3, 2023
@AnesBenmerzoug
Copy link
Collaborator Author

@mdbenito, @schroedk, @Xuzzo I have updated the description with more details and split it into different sections.
What do you think? The only thing I am not sure about is how this fits in MapReduceJob and whether we should try to make it fit at all.

@fcharras
Copy link

fcharras commented Aug 8, 2023

Hello, I'm the contributor that proposed joblib/joblib#1485 , I'd be interested to hear more feedback about real-world use-cases for those kind of features in joblib.

Would you mind giving more context, what features would you like to see in joblib that could help you with said usecases ?

Regarding joblib/joblib#1485 there are some limitations that might prevent it for being extended or adapted much further than what the example already show:

  • fine grained submission of new tasks, from outputs of previous tasks, e.g where callbacks could arbitrarily submit several or no tasks, does not seem possible to achieve
  • the example feels a bit hacky and do not show inner mechanisms that are important to know beforehand to avoid deadlock patterns

Beware the dask backend does not support return_as=generator yet, I don't think the ray backend does either ? (in theory there's no blocker just need some developer time to adapt them)

Do you think an Executor-like interface in joblib, similar to concurrent.futures (i.e with Futures, wait, etc...), with joblib nice features (ability to swap backends, error surfacing, ...) would be more interesting for this kind of use cases ?

@AnesBenmerzoug
Copy link
Collaborator Author

Hello @fcharras, thanks for taking interest in this issue.

The pattern we use in pyDVL is a MapReduce like pattern in which we want want to be able to submit several jobs (Map) that may take a while to finish and to then combine the results (Reduce) and check a stopping criterion. If the criterion is satisfied then we do not submit more jobs and we ideally terminate any currently running jobs because they may take a long time to finish and we do want that.

The pattern implemented in joblib/joblib#1485 can fit our use case:

  • We only submit one new task for each completed task.
  • I didn't know that there was a risk to end-up in deadlocks (other than the ones described in the implementation's comments and description).

I didn't know that other backends didn't support returning the results as a generator out of the box, that's kind of a show blocker for this issue.

We currently the Executor interface from concurrent.futures only for one of the algorithms: we have a custom RayExecutor class for working with ray and we use the executor implementation from loky (get_reusable_executor).

We are considering to fully switch to joblib because it would simplify our code and remove the need to handle the configuration for the different backends ourselves. If there was a builtin Executor-like interface in joblib that would allow to do the same and has support for task cancellation than that would be perfect for us.

@fcharras
Copy link

fcharras commented Aug 8, 2023

TY for the added information.

Indeed after 1.3 backends do not support return_as out of the box, backends must be adapted to a change of logic (results are retrieved in callbacks threads, as opposed to being retrieved in the main thread before 1.3 or with non-adapted backends). Feel free to open an issue to track support for other backends, we are looking for this type of feedback.

For pattern such as joblib/joblib#1485 , or the map-reduce you're describing, the only risk (to my knowledge) is indeed the one that is already documented in the PR example, a callback thread can't be asked to wait for completion of pending jobs in other workers. It can be worked around by pre-running enough tasks at the start, and ensuring that a cycle in the feedback loop creates as much task as it consumed results.

Since it's implicit, maybe joblib shouldn't encourage towards this kind of patterns, maybe anExecutor-like interface could be considered instead, if it can be seen to fit in joblib scope.

@mdbenito mdbenito modified the milestones: v0.8.0, v0.9.0 Aug 19, 2023
@kosmitive
Copy link
Contributor

@AnesBenmerzoug

  • Vote for the shortened TMCS version.
  • Vote for configuring backend by context manager.

What about?

import joblib

@contextmanager
def Parallel(*args, **kwargs):
    with joblib.Parallel(*args, **kwargs) as parallel:
        yield parallel
        parallel._backend.abort_everything(ensure_ready=False)

MapReduce is in general a problem due to the StoppingCriterion. So we should actually try to get rid of MapReduce for Shapley-like algorithms

@mdbenito
Copy link
Collaborator

MapReduce is in general a problem due to the StoppingCriterion. So we should actually try to get rid of MapReduce for Shapley-like algorithms

Agreed. A central dispatcher is also simpler for handling randomness (although we already have the seed sequences). Unless there is some use case I'm forgetting, we can think of dropping mapreduce

@fcharras
Copy link

I didn't know that other backends didn't support returning the results as a generator out of the box, that's kind of a show blocker for this issue.

@AnesBenmerzoug FYI support with dask backend was just merged , will be available from the next release on.

For support with the ray backend I've also opened a PR at the ray project at ray-project/ray#41028, the merge depends on if ray maintainers notice / are interested in the feature. But the diff is trivial, in fact users can unlock the feature already by adding one attribute to the RayBackend class.

I think with those two on top all major backends are now adressed ?

@schroedk
Copy link
Collaborator

schroedk commented Sep 9, 2024

This is outdated, due to the changes in #558, will close.

@schroedk schroedk closed this as completed Sep 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Changes to the API enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants