diff --git a/Project.toml b/Project.toml index 3021adb..9ee33c9 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JobQueueMPI" uuid = "32d208e1-246e-420c-b6ff-18b71b410923" authors = ["pedroripper "] -version = "0.1.0" +version = "0.1.1" [deps] MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" diff --git a/README.md b/README.md index 8621cc6..60e8200 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,8 @@ [codecov-img]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl/coverage.svg?branch=master [codecov-url]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl?branch=master -| **Build Status** | **Coverage** | -|:-----------------:|:-----------------:| +| **Build Status** | **Coverage** | **Documentation** | +|:-----------------:|:-----------------:|:-----------------:| | [![Build Status][build-img]][build-url] | [![Codecov branch][codecov-img]][codecov-url] |[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://psrenergy.github.io/JobQueueMPI.jl/dev/) diff --git a/docs/make.jl b/docs/make.jl index 2ed9096..d8fa7ed 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -12,10 +12,10 @@ makedocs(; authors = "psrenergy", pages = [ "Home" => "index.md", - ] + ], ) deploydocs(; repo = "github.com/psrenergy/JobQueueMPI.jl.git", push_preview = true, -) \ No newline at end of file +) diff --git a/docs/src/index.md b/docs/src/index.md index 4f71c24..679d5ff 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -19,6 +19,100 @@ JobQueueMPI.jl has the following components: - `Controller`: The controller is responsible for managing the jobs and the workers. It keeps track of the jobs that have been sent and received and sends the jobs to the available workers. - `Worker`: The worker is responsible for executing the jobs. It receives the jobs from the controller, executes them, and sends the results back to the controller. +Users can call functions to compute jobs in parallel in two ways: + - Building a function and using a `pmap` implementation that will put the function in the job queue and send it to the workers. +```julia +using JobQueueMPI + +function sum_100(value) + return value + 100 +end + +sum_100_answer = JobQueueMPI.pmap(sum_100, collect(1:10)) +``` + - Building the jobs and sending them to workers explicitly. There are examples of this structure in the test folder. This way is much more flexible than the first one, but it requires more code and knowledge about how MPI works. + +```julia +using JobQueueMPI + +mutable struct Message + value::Int + vector_idx::Int +end + +all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller) + +function sum_100(message::Message) + message.value += 100 + return message +end + +function update_data(new_data, message::Message) + idx = message.vector_idx + value = message.value + return new_data[idx] = value +end + +function workers_loop() + if JQM.is_worker_process() + worker = JQM.Worker() + while true + job = JQM.receive_job(worker) + message = JQM.get_message(job) + if message == JQM.TerminationMessage() + break + end + return_message = sum_100(message) + JQM.send_job_answer_to_controller(worker, return_message) + end + exit(0) + end +end + +function job_queue(data) + JQM.mpi_init() + JQM.mpi_barrier() + + T = eltype(data) + N = length(data) + + if JQM.is_controller_process() + new_data = Array{T}(undef, N) + + controller = JQM.Controller(JQM.num_workers()) + + for i in eachindex(data) + message = Message(data[i], i) + JQM.add_job_to_queue!(controller, message) + end + + while !all_jobs_done(controller) + if !JQM.is_job_queue_empty(controller) + JQM.send_jobs_to_any_available_workers(controller) + end + if JQM.any_pending_jobs(controller) + job_answer = JQM.check_for_job_answers(controller) + if !isnothing(job_answer) + message = JQM.get_message(job_answer) + update_data(new_data, message) + end + end + end + + JQM.send_termination_message() + + return new_data + end + workers_loop() + JQM.mpi_barrier() + JQM.mpi_finalize() + return nothing +end + +data = collect(1:10) +new_data = job_queue(data) +``` + ## API ```@docs diff --git a/src/pmap.jl b/src/pmap.jl index 7e285b3..ceb8315 100644 --- a/src/pmap.jl +++ b/src/pmap.jl @@ -19,7 +19,12 @@ function _p_map_workers_loop(f, data_defined_in_process) end """ - pmap(f, jobs, data_defined_in_process = nothing) + pmap( + f::Function, + jobs::Vector, + data_defined_in_process = nothing; + return_result_in_all_processes::Bool = false + ) Parallel map function that works with MPI. If the function is called in parallel, it will distribute the jobs to the workers and collect the results. If the function is called in @@ -28,10 +33,17 @@ serial, it will just map the function to the jobs. The function `f` should take one argument, which is the message to be processed. If `data_defined_in_process` is not `nothing`, the function `f` should take two arguments, the first one being `data_defined_in_process`. +The `return_result_in_all_processes` argument is used to broadcast the result to all processes. If set to `true`. + The controller process will return the answer in the same order as the jobs were given. The workers will return nothing. """ -function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing) +function pmap( + f::Function, + jobs::Vector, + data_defined_in_process = nothing; + return_result_in_all_processes::Bool = true, +) result = Vector{Any}(undef, length(jobs)) if is_running_in_parallel() mpi_barrier() @@ -55,11 +67,19 @@ function pmap(f::Function, jobs::Vector, data_defined_in_process = nothing) end send_termination_message() mpi_barrier() + if return_result_in_all_processes + result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) + mpi_barrier() + end return result else _p_map_workers_loop(f, data_defined_in_process) mpi_barrier() - return nothing + if return_result_in_all_processes + result = MPI.bcast(result, controller_rank(), MPI.COMM_WORLD) + mpi_barrier() + end + return result end else for (i, job) in enumerate(jobs) diff --git a/test/test_pmap_mpi.jl b/test/test_pmap_mpi.jl index d46a222..6c5274d 100644 --- a/test/test_pmap_mpi.jl +++ b/test/test_pmap_mpi.jl @@ -21,12 +21,10 @@ end sum_100_answer = JQM.pmap(sum_100, collect(1:10)) divisors_answer = JQM.pmap(get_divisors, collect(1:10)) -if JQM.is_controller_process() - @testset "pmap MPI" begin - @test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110] - @test divisors_answer == - [[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]] - end +@testset "pmap MPI" begin + @test sum_100_answer == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110] + @test divisors_answer == + [[1], [1, 2], [1, 3], [1, 2, 4], [1, 5], [1, 2, 3, 6], [1, 7], [1, 2, 4, 8], [1, 3, 9], [1, 2, 5, 10]] end JQM.mpi_finalize() diff --git a/test/test_pmap_mpi_optim.jl b/test/test_pmap_mpi_optim.jl index 10f48c6..c681191 100644 --- a/test/test_pmap_mpi_optim.jl +++ b/test/test_pmap_mpi_optim.jl @@ -8,19 +8,18 @@ JQM.mpi_init() N = 5 data = collect(1:N) function g(x, i, data) - return i * (x[i] - 2 * i) ^ 2 + data[i] + return i * (x[i] - 2 * i)^2 + data[i] end x0 = zeros(N) list_i = collect(1:N) fake_input = Int[] # ignored - let is_done = false if JQM.is_controller_process() ret = optimize(x0, NelderMead()) do x MPI.bcast(is_done, MPI.COMM_WORLD) - g_x = JQM.pmap((v)->g(v[1], v[2], data), [(x, i) for i in list_i]) + g_x = JQM.pmap((v) -> g(v[1], v[2], data), [(x, i) for i in list_i]) return sum(g_x) end # tell workers to stop calling pmap @@ -42,11 +41,11 @@ let if is_done break end - JQM.pmap((v)->g(v[1], v[2], data), fake_input) + JQM.pmap((v) -> g(v[1], v[2], data), fake_input) end end end JQM.mpi_barrier() -JQM.mpi_finalize() \ No newline at end of file +JQM.mpi_finalize()