diff --git a/CHANGELOG.md b/CHANGELOG.md index 99cc506d..46bbd08c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ OhMyThreads.jl Changelog Version 0.5.0 ------------- +- ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error). +- ![Feature][badge-feature] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. - ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise). - ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed). - ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped. diff --git a/Project.toml b/Project.toml index 0c88eae9..b1a8b521 100644 --- a/Project.toml +++ b/Project.toml @@ -11,7 +11,7 @@ TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34" [compat] BangBang = "0.4" -ChunkSplitters = "2.1" +ChunkSplitters = "2.3" StableTasks = "0.1.5" TaskLocalValues = "0.1" julia = "1.6" diff --git a/src/implementation.jl b/src/implementation.jl index ef6d2736..54a0906b 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -5,7 +5,8 @@ import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcol using OhMyThreads: chunks, @spawn, @spawnat using OhMyThreads.Tools: nthtid using OhMyThreads: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler -using OhMyThreads.Schedulers: chunking_enabled +using OhMyThreads.Schedulers: chunking_enabled, chunking_mode, ChunkingMode, NoChunking, + FixedSize, FixedCount using Base: @propagate_inbounds using Base.Threads: nthreads, @threads @@ -18,10 +19,20 @@ include("macro_impl.jl") function auto_disable_chunking_warning() @warn("You passed in a `ChunkSplitters.Chunk` but also a scheduler that has "* "chunking enabled. Will turn off internal chunking to proceed.\n"* - "To avoid this warning, try to turn off chunking (`nchunks=0`) "* + "To avoid this warning, try to turn off chunking (`nchunks=0` and `chunksize=0`) "* "or pass in `collect(chunks(...))`.") end +function _chunks(sched, arg; kwargs...) + C = chunking_mode(sched) + @assert C != NoChunking + if C == FixedCount + chunks(arg; n = sched.nchunks, split = sched.split, kwargs...) + elseif C == FixedSize + chunks(arg; size = sched.chunksize, split = sched.split, kwargs...) + end +end + function tmapreduce(f, op, Arrs...; scheduler::Scheduler = DynamicScheduler(), outputtype::Type = Any, @@ -46,10 +57,10 @@ function _tmapreduce(f, ::Type{OutputType}, scheduler::DynamicScheduler, mapreduce_kwargs)::OutputType where {OutputType} - (; nchunks, split, threadpool) = scheduler + (; threadpool) = scheduler check_all_have_same_indices(Arrs) if chunking_enabled(scheduler) - tasks = map(chunks(first(Arrs); n = nchunks, split)) do inds + tasks = map(_chunks(scheduler, first(Arrs))) do inds args = map(A -> view(A, inds), Arrs) @spawn threadpool mapreduce(f, op, args...; $mapreduce_kwargs...) end @@ -85,24 +96,18 @@ function _tmapreduce(f, ::Type{OutputType}, scheduler::StaticScheduler, mapreduce_kwargs) where {OutputType} - (; nchunks, split) = scheduler + nt = nthreads() check_all_have_same_indices(Arrs) if chunking_enabled(scheduler) - n = min(nthreads(), nchunks) # We could implement strategies, like round-robin, in the future - tasks = map(enumerate(chunks(first(Arrs); n, split))) do (c, inds) - tid = @inbounds nthtid(c) + tasks = map(enumerate(_chunks(scheduler, first(Arrs)))) do (c, inds) + tid = @inbounds nthtid(mod1(c, nt)) args = map(A -> view(A, inds), Arrs) @spawnat tid mapreduce(f, op, args...; mapreduce_kwargs...) end mapreduce(fetch, op, tasks) else - if length(first(Arrs)) > nthreads() - error("You have disabled chunking but provided an input with more then " * - "`nthreads()` elements. This is not supported for `StaticScheduler`.") - end - n = min(nthreads(), nchunks) # We could implement strategies, like round-robin, in the future tasks = map(enumerate(eachindex(first(Arrs)))) do (c, i) - tid = @inbounds nthtid(c) + tid = @inbounds nthtid(mod1(c, nt)) args = map(A -> @inbounds(A[i]), Arrs) @spawnat tid f(args...) end @@ -120,12 +125,9 @@ function _tmapreduce(f, chunking_enabled(scheduler) && auto_disable_chunking_warning() check_all_have_same_indices(Arrs) chnks = only(Arrs) - if length(chnks) > nthreads() - error("You provided a `ChunkSplitters.Chunk` with more than `nthreads()` chunks " * - "as input, which is not supported by the `StaticScheduler`.") - end + nt = nthreads() tasks = map(enumerate(chnks)) do (c, idcs) - tid = @inbounds nthtid(c) + tid = @inbounds nthtid(mod1(c, nt)) @spawnat tid f(idcs) end mapreduce(fetch, op, tasks; mapreduce_kwargs...) @@ -216,8 +218,8 @@ function tmap(f, _tmap(scheduler, f, A, _Arrs...; kwargs...) end -# w/o chunking (DynamicScheduler{false}): AbstractArray -function _tmap(scheduler::DynamicScheduler{false}, +# w/o chunking (DynamicScheduler{NoChunking}): AbstractArray +function _tmap(scheduler::DynamicScheduler{NoChunking}, f, A::AbstractArray, _Arrs::AbstractArray...; @@ -234,8 +236,8 @@ function _tmap(scheduler::DynamicScheduler{false}, reshape(v, size(A)...) end -# w/o chunking (DynamicScheduler{false}): ChunkSplitters.Chunk -function _tmap(scheduler::DynamicScheduler{false}, +# w/o chunking (DynamicScheduler{NoChunking}): ChunkSplitters.Chunk +function _tmap(scheduler::DynamicScheduler{NoChunking}, f, A::ChunkSplitters.Chunk, _Arrs::AbstractArray...; @@ -247,18 +249,15 @@ function _tmap(scheduler::DynamicScheduler{false}, map(fetch, tasks) end -# w/o chunking (StaticScheduler{false}): ChunkSplitters.Chunk -function _tmap(scheduler::StaticScheduler{false}, +# w/o chunking (StaticScheduler{NoChunking}): ChunkSplitters.Chunk +function _tmap(scheduler::StaticScheduler{NoChunking}, f, A::ChunkSplitters.Chunk, _Arrs::AbstractArray...; kwargs...) - if length(A) > nthreads() - error("You provided a `ChunkSplitters.Chunk` with more than `nthreads()` chunks " * - "as input, which is not supported by the `StaticScheduler`.") - end + nt = nthreads() tasks = map(enumerate(A)) do (c, idcs) - tid = @inbounds nthtid(c) + tid = @inbounds nthtid(mod1(c, nt)) @spawnat tid f(idcs) end map(fetch, tasks) @@ -271,7 +270,7 @@ function _tmap(scheduler::Scheduler, _Arrs::AbstractArray...; kwargs...) Arrs = (A, _Arrs...) - idcs = collect(chunks(A; n = scheduler.nchunks)) + idcs = collect(_chunks(scheduler, A)) reduction_f = append!! v = tmapreduce(reduction_f, idcs; scheduler, kwargs...) do inds args = map(A -> @view(A[inds]), Arrs) diff --git a/src/schedulers.jl b/src/schedulers.jl index 7a66278a..9bd80add 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -11,6 +11,22 @@ Supertype for all available schedulers: """ abstract type Scheduler end +abstract type ChunkingMode end +struct NoChunking <: ChunkingMode end +struct FixedCount <: ChunkingMode end +struct FixedSize <: ChunkingMode end + +function _chunkingstr(s::Scheduler) + C = chunking_mode(s) + if C == FixedCount + cstr = "fixed count ($(s.nchunks)), :$(s.split)" + elseif C == FixedSize + cstr = "fixed size ($(s.chunksize)), :$(s.split)" + elseif C == NoChunking + cstr = "none" + end +end + """ The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. @@ -26,29 +42,74 @@ with other multithreaded code. * Determines the number of chunks (and thus also the number of parallel tasks). * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing. * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. - * Setting `nchunks = 0` turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be + * Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be very costly! +- `chunksize::Integer` (default `0`) + * Specifies the desired chunk size (instead of the number of chunks). + * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero). - `split::Symbol` (default `:batch`): -* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. -* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. -* Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**! + * Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. + * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. + * Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**! - `threadpool::Symbol` (default `:default`): * Possible options are `:default` and `:interactive`. * The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes. """ -Base.@kwdef struct DynamicScheduler{C} <: Scheduler - threadpool::Symbol = :default - nchunks::Int = 2 * nthreads(threadpool) # a multiple of nthreads to enable load balancing - split::Symbol = :batch - - function DynamicScheduler(threadpool::Symbol, nchunks::Integer, split::Symbol) - threadpool in (:default, :interactive) || +struct DynamicScheduler{C <: ChunkingMode} <: Scheduler + threadpool::Symbol + nchunks::Int + chunksize::Int + split::Symbol + + function DynamicScheduler( + threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol) + if !(threadpool in (:default, :interactive)) throw(ArgumentError("threadpool must be either :default or :interactive")) - nchunks >= 0 || + end + if nchunks < 0 throw(ArgumentError("nchunks must be a positive integer (or zero).")) - C = !(nchunks == 0) # type parameter indicates whether chunking is enabled - new{C}(threadpool, nchunks, split) + end + if chunksize < 0 + throw(ArgumentError("chunksize must be a positive integer (or zero).")) + end + if nchunks != 0 && chunksize != 0 + throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero")) + end + if nchunks == 0 && chunksize == 0 + C = NoChunking + elseif chunksize != 0 + C = FixedSize + else + C = FixedCount + end + new{C}(threadpool, nchunks, chunksize, split) + end +end + +function DynamicScheduler(; + threadpool::Symbol = :default, + nchunks::Union{Integer, Nothing} = nothing, + chunksize::Union{Integer, Nothing} = nothing, + split::Symbol = :batch) + if isnothing(nchunks) + # only choose nchunks default if chunksize hasn't been specified + if isnothing(chunksize) + nchunks = 2 * nthreads(threadpool) + else + nchunks = 0 + end + end + if isnothing(chunksize) + chunksize = 0 end + DynamicScheduler(threadpool, nchunks, chunksize, split) +end + +function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler) + print("DynamicScheduler", "\n") + cstr = _chunkingstr(s) + println(io, "├ Chunking: ", cstr) + print(io, "└ Threadpool: ", s.threadpool) end """ @@ -66,22 +127,66 @@ Isn't well composable with other multithreaded code though. - `nchunks::Integer` (default `nthreads()`): * Determines the number of chunks (and thus also the number of parallel tasks). * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. - * Currently, `nchunks > nthreads()` **isn't officialy supported** but, for now, will fall back to `nchunks = nthreads()`. + * For `nchunks > nthreads()` the chunks will be distributed to the available threads in a round-robin fashion. + * Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be + very costly! +- `chunksize::Integer` (default `0`) + * Specifies the desired chunk size (instead of the number of chunks). + * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero). - `split::Symbol` (default `:batch`): * Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. * Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**! """ -Base.@kwdef struct StaticScheduler{C} <: Scheduler - nchunks::Int = nthreads() - split::Symbol = :batch +struct StaticScheduler{C <: ChunkingMode} <: Scheduler + nchunks::Int + chunksize::Int + split::Symbol - function StaticScheduler(nchunks::Integer, split::Symbol) - nchunks >= 0 || + function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Symbol) + if nchunks < 0 throw(ArgumentError("nchunks must be a positive integer (or zero).")) - C = !(nchunks == 0) # type parameter indicates whether chunking is enabled - new{C}(nchunks, split) + end + if chunksize < 0 + throw(ArgumentError("chunksize must be a positive integer (or zero).")) + end + if nchunks != 0 && chunksize != 0 + throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero")) + end + if nchunks == 0 && chunksize == 0 + C = NoChunking + elseif chunksize != 0 + C = FixedSize + else + C = FixedCount + end + new{C}(nchunks, chunksize, split) + end +end + +function StaticScheduler(; + nchunks::Union{Integer, Nothing} = nothing, + chunksize::Union{Integer, Nothing} = nothing, + split::Symbol = :batch) + if isnothing(nchunks) + # only choose nchunks default if chunksize hasn't been specified + if isnothing(chunksize) + nchunks = nthreads(:default) + else + nchunks = 0 + end + end + if isnothing(chunksize) + chunksize = 0 end + StaticScheduler(nchunks, chunksize, split) +end + +function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::StaticScheduler) + print("StaticScheduler", "\n") + cstr = _chunkingstr(s) + println(io, "├ Chunking: ", cstr) + print(io, "└ Threadpool: default") end """ @@ -110,9 +215,18 @@ Base.@kwdef struct GreedyScheduler <: Scheduler end end +function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::GreedyScheduler) + print("GreedyScheduler", "\n") + println(io, "├ Num. tasks: ", s.ntasks) + print(io, "└ Threadpool: default") +end + +chunking_mode(s::Scheduler) = chunking_mode(typeof(s)) +chunking_mode(::Type{DynamicScheduler{C}}) where {C} = C +chunking_mode(::Type{StaticScheduler{C}}) where {C} = C +chunking_mode(::Type{GreedyScheduler}) = NoChunking + chunking_enabled(s::Scheduler) = chunking_enabled(typeof(s)) -chunking_enabled(::Type{DynamicScheduler{C}}) where {C} = C -chunking_enabled(::Type{StaticScheduler{C}}) where {C} = C -chunking_enabled(::Type{GreedyScheduler}) = false +chunking_enabled(::Type{S}) where {S <: Scheduler} = chunking_mode(S) != NoChunking end # module diff --git a/test/runtests.jl b/test/runtests.jl index 2bc13c2b..94501f4a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -13,12 +13,12 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, for (; ~, f, op, itrs, init) in sets_to_test @testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin @testset for sched in ( - StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{false}) + StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{OhMyThreads.Schedulers.NoChunking}) @testset for split in (:batch, :scatter) for nchunks in (1, 2, 6) if sched == GreedyScheduler scheduler = sched(; ntasks = nchunks) - elseif sched == DynamicScheduler{false} + elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking} scheduler = DynamicScheduler(; nchunks = 0) else scheduler = sched(; nchunks, split) @@ -192,4 +192,43 @@ end end) == 10*var end +@testset "chunking mode + chunksize option" begin + @test DynamicScheduler(; chunksize=2) isa DynamicScheduler + @test StaticScheduler(; chunksize=2) isa StaticScheduler + + @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize + @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount + @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=0, chunksize=0)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=0)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; chunksize=2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=0, chunksize=0)) == false + @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=0)) == false + + @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize + @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount + @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=0, chunksize=0)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=0)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; chunksize=2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=0, chunksize=0)) == false + @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=0)) == false + + @test_throws ArgumentError DynamicScheduler(; nchunks=2, chunksize=3) + @test_throws ArgumentError StaticScheduler(; nchunks=2, chunksize=3) + + let scheduler = DynamicScheduler(; chunksize=2) + @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) + @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) + @test isnothing(tforeach(sin, 1:10; scheduler)) + @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) + end + let scheduler = StaticScheduler(; chunksize=2) + @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) + @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) + @test isnothing(tforeach(sin, 1:10; scheduler)) + @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) + end +end + # Todo way more testing, and easier tests to deal with