From 4dd6d6be96349a37a4d9407bf256ea872868fb1b Mon Sep 17 00:00:00 2001 From: guilhermebodin Date: Mon, 11 Nov 2024 15:51:59 -0300 Subject: [PATCH 1/3] return results in all processes in pmap --- Project.toml | 2 +- src/pmap.jl | 6 +++++- test/test_pmap_mpi.jl | 10 ++++------ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Project.toml b/Project.toml index 3021adb..9ee33c9 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JobQueueMPI" uuid = "32d208e1-246e-420c-b6ff-18b71b410923" authors = ["pedroripper "] -version = "0.1.0" +version = "0.1.1" [deps] MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" diff --git a/src/pmap.jl b/src/pmap.jl index 7e285b3..c429bc8 100644 --- a/src/pmap.jl +++ b/src/pmap.jl @@ -55,11 +55,15 @@ function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing) end send_termination_message() mpi_barrier() + result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) + mpi_barrier() return result else _p_map_workers_loop(f, data_defined_in_process) mpi_barrier() - return nothing + result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) + mpi_barrier() + return result end else for (i, job) in enumerate(jobs) diff --git a/test/test_pmap_mpi.jl b/test/test_pmap_mpi.jl index d46a222..ee34e53 100644 --- a/test/test_pmap_mpi.jl +++ b/test/test_pmap_mpi.jl @@ -21,12 +21,10 @@ end sum_100_answer = JQM.pmap(sum_100, collect(1:10)) divisors_answer = JQM.pmap(get_divisors, collect(1:10)) -if JQM.is_controller_process() - @testset "pmap MPI" begin - @test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110] - @test divisors_answer == - [[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]] - end +@testset "pmap MPI" begin + @test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110] + @test divisors_answer == + [[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]] end JQM.mpi_finalize() From e7c4945ceebecffd121668ce1f16d82a394fa69b Mon Sep 17 00:00:00 2001 From: guilhermebodin Date: Mon, 11 Nov 2024 18:33:15 -0300 Subject: [PATCH 2/3] return result in all processes --- src/pmap.jl | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/pmap.jl b/src/pmap.jl index c429bc8..07c46da 100644 --- a/src/pmap.jl +++ b/src/pmap.jl @@ -19,7 +19,12 @@ function _p_map_workers_loop(f, data_defined_in_process) end """ - pmap(f, jobs, data_defined_in_process = nothing) + pmap( + f::Function, + jobs::Vector, + data_defined_in_process = nothing; + return_result_in_all_processes::Bool = false + ) Parallel map function that works with MPI. If the function is called in parallel, it will distribute the jobs to the workers and collect the results. If the function is called in @@ -28,10 +33,17 @@ serial, it will just map the function to the jobs. The function `f` should take one argument, which is the message to be processed. If `data_defined_in_process` is not `nothing`, the function `f` should take two arguments, the first one being `data_defined_in_process`. +The `return_result_in_all_processes` argument is used to broadcast the result to all processes. If set to `true`. + The controller process will return the answer in the same order as the jobs were given. The workers will return nothing. """ -function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing) +function pmap( + f::Function, + jobs::Vector, + data_defined_in_process = nothing; + return_result_in_all_processes::Bool = false +) result = Vector{Any}(undef, length(jobs)) if is_running_in_parallel() mpi_barrier() @@ -55,14 +67,18 @@ function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing) end send_termination_message() mpi_barrier() - result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) - mpi_barrier() + if return_result_in_all_processes + result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) + mpi_barrier() + end return result else _p_map_workers_loop(f, data_defined_in_process) mpi_barrier() - result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) - mpi_barrier() + if return_result_in_all_processes + result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) + mpi_barrier() + end return result end else From 59daa8df7614f6ea612c38549277ae324d399af5 Mon Sep 17 00:00:00 2001 From: guilhermebodin Date: Mon, 11 Nov 2024 18:46:36 -0300 Subject: [PATCH 3/3] updates to docs and added a new option to not broadcast results --- README.md | 4 +- docs/make.jl | 4 +- docs/src/index.md | 94 +++++++++++++++++++++++++++++++++++++ src/pmap.jl | 8 ++-- test/test_pmap_mpi.jl | 2 +- test/test_pmap_mpi_optim.jl | 9 ++-- 6 files changed, 107 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 8621cc6..60e8200 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,8 @@ [codecov-img]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl/coverage.svg?branch=master [codecov-url]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl?branch=master -| **Build Status** | **Coverage** | -|:-----------------:|:-----------------:| +| **Build Status** | **Coverage** | **Documentation** | +|:-----------------:|:-----------------:|:-----------------:| | [![Build Status][build-img]][build-url] | [![Codecov branch][codecov-img]][codecov-url] |[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://psrenergy.github.io/JobQueueMPI.jl/dev/) diff --git a/docs/make.jl b/docs/make.jl index 2ed9096..d8fa7ed 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -12,10 +12,10 @@ makedocs(; authors = "psrenergy", pages = [ "Home" => "index.md", - ] + ], ) deploydocs(; repo = "github.com/psrenergy/JobQueueMPI.jl.git", push_preview = true, -) \ No newline at end of file +) diff --git a/docs/src/index.md b/docs/src/index.md index 4f71c24..679d5ff 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -19,6 +19,100 @@ JobQueueMPI.jl has the following components: - `Controller`: The controller is responsible for managing the jobs and the workers. It keeps track of the jobs that have been sent and received and sends the jobs to the available workers. - `Worker`: The worker is responsible for executing the jobs. It receives the jobs from the controller, executes them, and sends the results back to the controller. +Users can call functions to compute jobs in parallel in two ways: + - Building a function and using a `pmap` implementation that will put the function in the job queue and send it to the workers. +```julia +using JobQueueMPI + +function sum_100(value) + return value + 100 +end + +sum_100_answer = JobQueueMPI.pmap(sum_100, collect(1:10)) +``` + - Building the jobs and sending them to workers explicitly. There are examples of this structure in the test folder. This way is much more flexible than the first one, but it requires more code and knowledge about how MPI works. + +```julia +using JobQueueMPI + +mutable struct Message + value::Int + vector_idx::Int +end + +all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller) + +function sum_100(message::Message) + message.value += 100 + return message +end + +function update_data(new_data, message::Message) + idx = message.vector_idx + value = message.value + return new_data[idx] = value +end + +function workers_loop() + if JQM.is_worker_process() + worker = JQM.Worker() + while true + job = JQM.receive_job(worker) + message = JQM.get_message(job) + if message == JQM.TerminationMessage() + break + end + return_message = sum_100(message) + JQM.send_job_answer_to_controller(worker, return_message) + end + exit(0) + end +end + +function job_queue(data) + JQM.mpi_init() + JQM.mpi_barrier() + + T = eltype(data) + N = length(data) + + if JQM.is_controller_process() + new_data = Array{T}(undef, N) + + controller = JQM.Controller(JQM.num_workers()) + + for i in eachindex(data) + message = Message(data[i], i) + JQM.add_job_to_queue!(controller, message) + end + + while !all_jobs_done(controller) + if !JQM.is_job_queue_empty(controller) + JQM.send_jobs_to_any_available_workers(controller) + end + if JQM.any_pending_jobs(controller) + job_answer = JQM.check_for_job_answers(controller) + if !isnothing(job_answer) + message = JQM.get_message(job_answer) + update_data(new_data, message) + end + end + end + + JQM.send_termination_message() + + return new_data + end + workers_loop() + JQM.mpi_barrier() + JQM.mpi_finalize() + return nothing +end + +data = collect(1:10) +new_data = job_queue(data) +``` + ## API ```@docs diff --git a/src/pmap.jl b/src/pmap.jl index 07c46da..ceb8315 100644 --- a/src/pmap.jl +++ b/src/pmap.jl @@ -39,10 +39,10 @@ The controller process will return the answer in the same order as the jobs were return nothing. """ function pmap( - f::Function, - jobs::Vector, - data_defined_in_process = nothing; - return_result_in_all_processes::Bool = false + f::Function, + jobs::Vector, + data_defined_in_process = nothing; + return_result_in_all_processes::Bool = true, ) result = Vector{Any}(undef, length(jobs)) if is_running_in_parallel() diff --git a/test/test_pmap_mpi.jl b/test/test_pmap_mpi.jl index ee34e53..6c5274d 100644 --- a/test/test_pmap_mpi.jl +++ b/test/test_pmap_mpi.jl @@ -24,7 +24,7 @@ divisors_answer = JQM.pmap(get_divisors, collect(1:10)) @testset "pmap MPI" begin @test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110] @test divisors_answer == - [[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]] + [[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]] end JQM.mpi_finalize() diff --git a/test/test_pmap_mpi_optim.jl b/test/test_pmap_mpi_optim.jl index 10f48c6..c681191 100644 --- a/test/test_pmap_mpi_optim.jl +++ b/test/test_pmap_mpi_optim.jl @@ -8,19 +8,18 @@ JQM.mpi_init() N = 5 data = collect(1:N) function g(x, i, data) - return i * (x[i] - 2 * i) ^ 2 + data[i] + return i * (x[i] - 2 * i)^2 + data[i] end x0 = zeros(N) list_i = collect(1:N) fake_input = Int[] # ignored - let is_done = false if JQM.is_controller_process() ret = optimize(x0, NelderMead()) do x MPI.bcast(is_done, MPI.COMM_WORLD) - g_x = JQM.pmap((v)->g(v[1], v[2], data), [(x, i) for i in list_i]) + g_x = JQM.pmap((v) -> g(v[1], v[2], data), [(x, i) for i in list_i]) return sum(g_x) end # tell workers to stop calling pmap @@ -42,11 +41,11 @@ let if is_done break end - JQM.pmap((v)->g(v[1], v[2], data), fake_input) + JQM.pmap((v) -> g(v[1], v[2], data), fake_input) end end end JQM.mpi_barrier() -JQM.mpi_finalize() \ No newline at end of file +JQM.mpi_finalize()