Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run in parallel #8

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open

Run in parallel #8

wants to merge 1 commit into from

Conversation

skrawcz
Copy link
Collaborator

@skrawcz skrawcz commented Aug 6, 2024

Updates to use parallelizable & collect

Updates to use parallelizable & collect
@Roy-Kid
Copy link
Contributor

Roy-Kid commented Aug 9, 2024

Hi, @skrawcz
The mapper-worker-reducer pattern works well on multithreadingexecutor, but not work for multiprocessingexecutor. The error is cannot pickle 'module' object. Here is a minimal reproduce environment:

import worker
"""
def double(a:int) -> int:
    return a*2
"""

import mapper
"""
from hamilton.htypes import Parallelizable, Collect
from typing import Any

def mapper(
    drivers: list,
    inputs: list,
    final_vars: list = [],
) -> Parallelizable[dict]:
    for dr, input_ in zip(drivers, inputs):
        yield {
            "dr": dr,
            "final_vars": final_vars or dr.list_available_variables(),
            "input": input_,
        }


def worker(mapper: dict) -> dict:
    _dr = mapper["dr"]
    _inputs = mapper["input"]
    _final_var = mapper["final_vars"]
    return _dr.execute(final_vars=_final_var, inputs=_inputs)


def reducer(worker: Collect[dict]) -> Any:

    return worker
"""
from hamilton import driver
from hamilton.execution import executors

drivers = []
inputs = []
for i in range(4):
    dr = driver.Builder().with_modules(worker).build()
    drivers.append(dr)
    inputs.append({'a': i})


dr = (
    driver.Builder()
    .with_modules(mapper)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(8))
    .build()
)
dr.execute(
    final_vars=["reducer"],
    inputs={"drivers": drivers, "inputs": inputs, "final_vars": ['double']},
)

I have no clue why it need to pickle module since I didn't use any module inside any function. I'm trying to read the source code because I think it is quite organized, and it's good material to learn multiprocess. But I'm sorry that I have no time to wait for solving the problem by myself. Could you help me with it?

@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 9, 2024

Hi, @skrawcz The mapper-worker-reducer pattern works well on multithreadingexecutor, but not work for multiprocessingexecutor. The error is cannot pickle 'module' object. Here is a minimal reproduce environment:

import worker
"""
def double(a:int) -> int:
    return a*2
"""

import mapper
"""
from hamilton.htypes import Parallelizable, Collect
from typing import Any

def mapper(
    drivers: list,
    inputs: list,
    final_vars: list = [],
) -> Parallelizable[dict]:
    for dr, input_ in zip(drivers, inputs):
        yield {
            "dr": dr,
            "final_vars": final_vars or dr.list_available_variables(),
            "input": input_,
        }


def worker(mapper: dict) -> dict:
    _dr = mapper["dr"]
    _inputs = mapper["input"]
    _final_var = mapper["final_vars"]
    return _dr.execute(final_vars=_final_var, inputs=_inputs)


def reducer(worker: Collect[dict]) -> Any:

    return worker
"""
from hamilton import driver
from hamilton.execution import executors

drivers = []
inputs = []
for i in range(4):
    dr = driver.Builder().with_modules(worker).build()
    drivers.append(dr)
    inputs.append({'a': i})


dr = (
    driver.Builder()
    .with_modules(mapper)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_remote_executor(executors.MultiProcessingExecutor(8))
    .build()
)
dr.execute(
    final_vars=["reducer"],
    inputs={"drivers": drivers, "inputs": inputs, "final_vars": ['double']},
)

I have no clue why it need to pickle module since I didn't use any module inside any function. I'm trying to read the source code because I think it is quite organized, and it's good material to learn multiprocess. But I'm sorry that I have no time to wait for solving the problem by myself. Could you help me with it?

Yes multiprocessing has this issue. I'll take a look in a bit -- or if you have the stacktrace could you post that? Note: Multiprocessing will do a better job if the files are explicit standalone modules -- it looks like they are in the example but putting here just to double check.

Otherwise I would try installing Ray and using the RayTaskExecutor instead. It should do a better job of serialization. E.g. https://github.com/DAGWorks-Inc/hamilton/blob/main/examples/parallelism/file_processing/run.py#L36

@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 9, 2024

e.g. we probably want to avoid this:

 temp_module = ad_hoc_utils.create_temporary_module(
            mapper,
            dag_result,
            reducer,
            module_name="start_tasks_mapper_reducer",
        )

and instead have this be a real python module -- multiprocessing might be able to handle that better.

@Roy-Kid
Copy link
Contributor

Roy-Kid commented Aug 9, 2024

Thanks! I read the doc in create_temporary_module, that's why I put the module outside.
I leave my office so I can not share the stachtrace. The code is standalone so you can reproduce it with copy & and paste.

Ray is a good option, I will consider about it : )

@skrawcz
Copy link
Collaborator Author

skrawcz commented Aug 10, 2024

switching to ray works it seems -- I haven't tracked down what is causing the SERDE issue for multi-processing, but Ray does a better job there.

from hamilton import driver
from hamilton.execution import executors
import worker
import mapper
import ray
from hamilton.plugins import h_ray

if __name__ == '__main__':
    drivers = []
    inputs = []
    for i in range(4):
        dr = driver.Builder().with_modules(worker).build()
        drivers.append(dr)
        inputs.append({'a': i})


    dr = (
        driver.Builder()
        .with_modules(mapper)
        .enable_dynamic_execution(allow_experimental_mode=True)
        # .with_local_executor(executors.SynchronousLocalTaskExecutor())
        # .with_remote_executor(executors.MultiProcessingExecutor(8))
        .with_remote_executor(h_ray.RayTaskExecutor(8))
        .build()
    )
    dr.execute(
        final_vars=["reducer"],
        inputs={"drivers": drivers, "inputs": inputs, "final_vars": ['double']},
    )

@Roy-Kid
Copy link
Contributor

Roy-Kid commented Aug 10, 2024

Yes! I also made it yesterday. pickle is always trouble, whenever I meet pickle related problem I can never fix it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants