Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more complicated workflows #16

Open
LivInTheLookingGlass opened this issue May 20, 2021 · 2 comments
Open

Support more complicated workflows #16

LivInTheLookingGlass opened this issue May 20, 2021 · 2 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@LivInTheLookingGlass
Copy link
Owner

It would be useful if the idea of a job could be abstracted a little bit. To this end, I propose that a job be abstracted into a collection of "tasks". Each task must have marked what other task it is dependent on, and are deferred until each of these are completed.

This ensures that shuffling jobs will not cause a dependent task to cross host boundaries. It also has the benefit of allowing some amount of shared memory between tasks, though of course some limitations will need to be imposed on usage.

So a generic version of this might look like:

@dataclass
class Job:
  tasks: list[Task]
  dependencies: dict[Task, list[Task]]

  def __post_init__(...):
    self.callbacks: dict[Task, AsyncResult] = {}
    self.context = {}

  def execute(self, pool):
    while not self.done():
      for task in self.tasks:
        if task.ready() and task.inactive():
          self.callbacks[Task] = pool.apply_async(task.execute, self.context)
    
@LivInTheLookingGlass LivInTheLookingGlass added enhancement New feature or request help wanted Extra attention is needed labels May 20, 2021
@LivInTheLookingGlass
Copy link
Owner Author

One question: how does apply_async() actually work? If you have such calls nested, is that even supported? Or do they all need to be done from the main process?

If the former, we can keep using the map API fairly easily. If the latter, that might be a challenge.

@LivInTheLookingGlass
Copy link
Owner Author

Findings: AsyncResult objects cannot be passed between processes. This means its necessary to implement with Queues or shared memory somehow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

1 participant