diff --git a/src/implementation.jl b/src/implementation.jl index 96748c9e..2e05e4e0 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -152,12 +152,12 @@ function _tmapreduce(f, mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...) end -# GreedyScheduler +# GreedyScheduler w/o chunking function _tmapreduce(f, op, Arrs, ::Type{OutputType}, - scheduler::GreedyScheduler, + scheduler::GreedyScheduler{NoChunking}, mapreduce_kwargs)::OutputType where {OutputType} ntasks_desired = scheduler.ntasks if Base.IteratorSize(first(Arrs)) isa Base.SizeUnknown @@ -185,6 +185,39 @@ function _tmapreduce(f, mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...) end +# GreedyScheduler w/ chunking +function _tmapreduce(f, + op, + Arrs, + ::Type{OutputType}, + scheduler::GreedyScheduler, + mapreduce_kwargs)::OutputType where {OutputType} + if Base.IteratorSize(first(Arrs)) isa Base.SizeUnknown + throw(ArgumentError("SizeUnkown iterators in combination with a greedy scheduler and chunking are currently not supported.")) + end + check_all_have_same_indices(Arrs) + chnks = _chunks(scheduler, first(Arrs)) + ntasks_desired = scheduler.ntasks + ntasks = min(length(chnks), ntasks_desired) + + ch = Channel{typeof(first(chnks))}(length(chnks); spawn = true) do ch + for args in chnks + put!(ch, args) + end + end + tasks = map(1:ntasks) do _ + # Note, calling `promise_task_local` here is only safe because we're assuming that + # Base.mapreduce isn't going to magically try to do multithreading on us... + @spawn mapreduce(promise_task_local(op), ch; mapreduce_kwargs...) do inds + args = map(A -> view(A, inds), Arrs) + mapreduce(promise_task_local(f), promise_task_local(op), args...) + end + end + # Note, calling `promise_task_local` here is only safe because we're assuming that + # Base.mapreduce isn't going to magically try to do multithreading on us... + mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...) +end + function check_all_have_same_indices(Arrs) let A = first(Arrs), Arrs = Arrs[2:end] if !all(B -> eachindex(A) == eachindex(B), Arrs) diff --git a/src/schedulers.jl b/src/schedulers.jl index 7c4dc962..8e22e122 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -191,7 +191,7 @@ end """ A greedy dynamic scheduler. The elements of the collection are first put into a `Channel` -and then dynamic, non-sticky tasks are spawned to process channel content in parallel. +and then dynamic, non-sticky tasks are spawned to process the channel content in parallel. Note that elements are processed in a non-deterministic order, and thus a potential reducing function **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in @@ -204,20 +204,74 @@ some additional overhead. - `ntasks::Int` (default `nthreads()`): * Determines the number of parallel tasks to be spawned. - * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. + * Setting `ntasks < nthreads()` is an effective way to use only a subset of the available threads. +- `chunking::Bool` (default `false`): + * Controls whether input elements are grouped into chunks (`true`) or not (`false`) before put into the channel. This can improve the performance especially if there are many iterations each of which are computationally cheap. + * If `nchunks` or `chunksize` are explicitly specified, `chunking` will be automatically set to `true`. +- `nchunks::Integer` (default `4 * nthreads()`): + * Determines the number of chunks (that will eventually be put into the channel). + * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). For `nchunks <= nthreads()` there are not enough chunks for any load balancing. +- `chunksize::Integer` (default not set) + * Specifies the desired chunk size (instead of the number of chunks). + * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer). +- `split::Symbol` (default `:scatter`): + * Determines how the collection is divided into chunks (if chunking=true). + * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. """ -Base.@kwdef struct GreedyScheduler <: Scheduler - ntasks::Int = nthreads() +struct GreedyScheduler{C <: ChunkingMode} <: Scheduler + ntasks::Int + nchunks::Int + chunksize::Int + split::Symbol - function GreedyScheduler(ntasks::Int) + function GreedyScheduler(ntasks::Int, nchunks::Integer, chunksize::Integer, + split::Symbol; chunking::Bool = false) ntasks > 0 || throw(ArgumentError("ntasks must be a positive integer")) - new(ntasks) + if !chunking + C = NoChunking + else + if !(nchunks > 0 || chunksize > 0) + throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + end + if nchunks > 0 && chunksize > 0 + throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + end + C = chunksize > 0 ? FixedSize : FixedCount + end + new{C}(ntasks, nchunks, chunksize, split) end end +function GreedyScheduler(; + ntasks::Integer = nthreads(), + nchunks::Union{Integer, Nothing} = nothing, + chunksize::Union{Integer, Nothing} = nothing, + chunking::Bool = false, + split::Symbol = :scatter) + if !isnothing(nchunks) || !isnothing(chunksize) + chunking = true + end + if !chunking + nchunks = -1 + chunksize = -1 + else + # only choose nchunks default if chunksize hasn't been specified + if isnothing(nchunks) && isnothing(chunksize) + nchunks = 4 * nthreads(:default) + chunksize = -1 + else + nchunks = isnothing(nchunks) ? -1 : nchunks + chunksize = isnothing(chunksize) ? -1 : chunksize + end + end + GreedyScheduler(ntasks, nchunks, chunksize, split; chunking) +end + function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::GreedyScheduler) print("GreedyScheduler", "\n") println(io, "├ Num. tasks: ", s.ntasks) + cstr = _chunkingstr(s) + println(io, "├ Chunking: ", cstr) print(io, "└ Threadpool: default") end @@ -232,7 +286,7 @@ end chunking_mode(s::Scheduler) = chunking_mode(typeof(s)) chunking_mode(::Type{DynamicScheduler{C}}) where {C} = C chunking_mode(::Type{StaticScheduler{C}}) where {C} = C -chunking_mode(::Type{GreedyScheduler}) = NoChunking +chunking_mode(::Type{GreedyScheduler{C}}) where {C} = C chunking_mode(::Type{SerialScheduler}) = NoChunking chunking_enabled(s::Scheduler) = chunking_enabled(typeof(s))