diff --git a/CHANGELOG.md b/CHANGELOG.md index ca7ba5a8..4f9b3e3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,12 @@ OhMyThreads.jl Changelog Version 0.5.0 ------------- -- ![feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading. -- ![feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`) +- ![Breaking][badge-breaking] `DynamicScheduler` and `StaticScheduler` don't support `nchunks=0` or `chunksize=0` any longer. Instead, chunking can now be turned off via an explicit new keyword argument `chunking=false`. +- ![Feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading. +- ![Feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`) - ![Enhancement][badge-enhancement] Made `@tasks` use `OhMyThreads.WithTaskLocals` automatically as an optimization. - ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error). -- ![Feature][badge-feature] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. +- ![BREAKING][badge-breaking] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. (This is unlikely to break existing code but technically could because the type parameter has changed from `Bool` to `ChunkingMode`.) - ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise). - ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed). - ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped. diff --git a/src/implementation.jl b/src/implementation.jl index bfef209a..96748c9e 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -309,6 +309,25 @@ function _tmap(scheduler::StaticScheduler{NoChunking}, map(fetch, tasks) end +# w/o chunking (StaticScheduler{NoChunking}): AbstractArray +function _tmap(scheduler::StaticScheduler{NoChunking}, + f, + A::AbstractArray, + _Arrs::AbstractArray...; + kwargs...) + Arrs = (A, _Arrs...) + nt = nthreads() + tasks = map(enumerate(A)) do (c, i) + tid = @inbounds nthtid(mod1(c, nt)) + @spawnat tid begin + args = map(A -> A[i], Arrs) + promise_task_local(f)(args...) + end + end + v = map(fetch, tasks) + reshape(v, size(A)...) +end + # w/ chunking function _tmap(scheduler::Scheduler, f, diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 8097eeef..74ce0287 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -40,7 +40,7 @@ function tasks_macro(forex) q = if !isnothing(settings.reducer) quote $make_mapping_function - tmapreduce(mapping_function, $(settings.reducer), $(itrng); scheduler = $(settings.scheduler)) + tmapreduce(mapping_function, $(settings.reducer), $(itrng); scheduler = $(settings.scheduler)) end elseif settings.collect maybe_warn_useless_init(settings) diff --git a/src/schedulers.jl b/src/schedulers.jl index 597298f5..7c4dc962 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -43,15 +43,16 @@ with other multithreaded code. * Determines the number of chunks (and thus also the number of parallel tasks). * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing. * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. - * Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be - very costly! -- `chunksize::Integer` (default `0`) +- `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). - * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero). + * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer). - `split::Symbol` (default `:batch`): - * Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. + * Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. * Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**! +- `chunking::Bool` (default `true`): + * Controls whether input elements are grouped into chunks (`true`) or not (`false`). + * For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! - `threadpool::Symbol` (default `:default`): * Possible options are `:default` and `:interactive`. * The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes. @@ -63,25 +64,20 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler split::Symbol function DynamicScheduler( - threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol) + threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true) if !(threadpool in (:default, :interactive)) throw(ArgumentError("threadpool must be either :default or :interactive")) end - if nchunks < 0 - throw(ArgumentError("nchunks must be a positive integer (or zero).")) - end - if chunksize < 0 - throw(ArgumentError("chunksize must be a positive integer (or zero).")) - end - if nchunks != 0 && chunksize != 0 - throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero")) - end - if nchunks == 0 && chunksize == 0 + if !chunking C = NoChunking - elseif chunksize != 0 - C = FixedSize else - C = FixedCount + if !(nchunks > 0 || chunksize > 0) + throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + end + if nchunks > 0 && chunksize > 0 + throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + end + C = chunksize > 0 ? FixedSize : FixedCount end new{C}(threadpool, nchunks, chunksize, split) end @@ -91,19 +87,22 @@ function DynamicScheduler(; threadpool::Symbol = :default, nchunks::Union{Integer, Nothing} = nothing, chunksize::Union{Integer, Nothing} = nothing, + chunking::Bool = true, split::Symbol = :batch) - if isnothing(nchunks) + if !chunking + nchunks = -1 + chunksize = -1 + else # only choose nchunks default if chunksize hasn't been specified - if isnothing(chunksize) + if isnothing(nchunks) && isnothing(chunksize) nchunks = 2 * nthreads(threadpool) + chunksize = -1 else - nchunks = 0 + nchunks = isnothing(nchunks) ? -1 : nchunks + chunksize = isnothing(chunksize) ? -1 : chunksize end end - if isnothing(chunksize) - chunksize = 0 - end - DynamicScheduler(threadpool, nchunks, chunksize, split) + DynamicScheduler(threadpool, nchunks, chunksize, split; chunking) end function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler) @@ -129,11 +128,12 @@ Isn't well composable with other multithreaded code though. * Determines the number of chunks (and thus also the number of parallel tasks). * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. * For `nchunks > nthreads()` the chunks will be distributed to the available threads in a round-robin fashion. - * Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be - very costly! -- `chunksize::Integer` (default `0`) +- `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero). +- `chunking::Bool` (default `true`): + * Controls whether input elements are grouped into chunks (`true`) or not (`false`). + * For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! - `split::Symbol` (default `:batch`): * Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. @@ -144,22 +144,18 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler chunksize::Int split::Symbol - function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Symbol) - if nchunks < 0 - throw(ArgumentError("nchunks must be a positive integer (or zero).")) - end - if chunksize < 0 - throw(ArgumentError("chunksize must be a positive integer (or zero).")) - end - if nchunks != 0 && chunksize != 0 - throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero")) - end - if nchunks == 0 && chunksize == 0 + function StaticScheduler( + nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true) + if !chunking C = NoChunking - elseif chunksize != 0 - C = FixedSize else - C = FixedCount + if !(nchunks > 0 || chunksize > 0) + throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + end + if nchunks > 0 && chunksize > 0 + throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + end + C = chunksize > 0 ? FixedSize : FixedCount end new{C}(nchunks, chunksize, split) end @@ -168,19 +164,22 @@ end function StaticScheduler(; nchunks::Union{Integer, Nothing} = nothing, chunksize::Union{Integer, Nothing} = nothing, + chunking::Bool = true, split::Symbol = :batch) - if isnothing(nchunks) + if !chunking + nchunks = -1 + chunksize = -1 + else # only choose nchunks default if chunksize hasn't been specified - if isnothing(chunksize) + if isnothing(nchunks) && isnothing(chunksize) nchunks = nthreads(:default) + chunksize = -1 else - nchunks = 0 + nchunks = isnothing(nchunks) ? -1 : nchunks + chunksize = isnothing(chunksize) ? -1 : chunksize end end - if isnothing(chunksize) - chunksize = 0 - end - StaticScheduler(nchunks, chunksize, split) + StaticScheduler(nchunks, chunksize, split; chunking) end function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::StaticScheduler) diff --git a/test/runtests.jl b/test/runtests.jl index 05819e55..3eee363b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -22,7 +22,7 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, if sched == GreedyScheduler scheduler = sched(; ntasks = nchunks) elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking} - scheduler = DynamicScheduler(; nchunks = 0) + scheduler = DynamicScheduler(; chunking=false) elseif sched == SerialScheduler scheduler = SerialScheduler() else @@ -66,7 +66,7 @@ end @testset "ChunkSplitters.Chunk" begin x = rand(100) chnks = OhMyThreads.chunks(x; n = Threads.nthreads()) - for scheduler in (DynamicScheduler(; nchunks = 0), StaticScheduler(; nchunks = 0)) + for scheduler in (DynamicScheduler(; chunking=false), StaticScheduler(; chunking=false)) @testset "$scheduler" begin @test tmap(x -> sin.(x), chnks; scheduler) ≈ map(x -> sin.(x), chnks) @test tmapreduce(x -> sin.(x), vcat, chnks; scheduler) ≈ @@ -237,41 +237,30 @@ end end @testset "chunking mode + chunksize option" begin - @test DynamicScheduler(; chunksize=2) isa DynamicScheduler - @test StaticScheduler(; chunksize=2) isa StaticScheduler + for sched in (DynamicScheduler, StaticScheduler) + @test sched() isa sched + @test sched(; chunksize=2) isa sched - @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize - @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount - @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=0, chunksize=0)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=0)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; chunksize=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=0, chunksize=0)) == false - @test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=0)) == false + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize + @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking=false)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2, chunksize=4, chunking=false)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=-2, chunksize=-4, split=:whatever, chunking=false)) == OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize=2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=-2, chunksize=-4, chunking=false)) == false + @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2, chunksize=4, chunking=false)) == false - @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize - @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount - @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=0, chunksize=0)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=0)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; chunksize=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=0, chunksize=0)) == false - @test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=0)) == false + @test_throws ArgumentError sched(; nchunks=2, chunksize=3) + @test_throws ArgumentError sched(; nchunks=0, chunksize=0) + @test_throws ArgumentError sched(; nchunks=-2, chunksize=-3) - @test_throws ArgumentError DynamicScheduler(; nchunks=2, chunksize=3) - @test_throws ArgumentError StaticScheduler(; nchunks=2, chunksize=3) - - let scheduler = DynamicScheduler(; chunksize=2) - @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) - @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) - @test isnothing(tforeach(sin, 1:10; scheduler)) - @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) - end - let scheduler = StaticScheduler(; chunksize=2) - @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) - @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) - @test isnothing(tforeach(sin, 1:10; scheduler)) - @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) + let scheduler = sched(; chunksize=2) + @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) + @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) + @test isnothing(tforeach(sin, 1:10; scheduler)) + @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) + end end end