Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disabling Chunking: unsupport nchunks=0 in favor of chunking=false #71

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ OhMyThreads.jl Changelog
Version 0.5.0
-------------

- ![feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading.
- ![feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`)
- ![Breaking][badge-breaking] `DynamicScheduler` and `StaticScheduler` don't support `nchunks=0` or `chunksize=0` any longer. Instead, chunking can now be turned off via an explicit new keyword argument `chunking=false`.
- ![Feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading.
- ![Feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`)
- ![Enhancement][badge-enhancement] Made `@tasks` use `OhMyThreads.WithTaskLocals` automatically as an optimization.
- ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error).
- ![Feature][badge-feature] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive.
- ![BREAKING][badge-breaking] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. (This is unlikely to break existing code but technically could because the type parameter has changed from `Bool` to `ChunkingMode`.)
- ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise).
- ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed).
- ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped.
Expand Down
19 changes: 19 additions & 0 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,25 @@ function _tmap(scheduler::StaticScheduler{NoChunking},
map(fetch, tasks)
end

# w/o chunking (StaticScheduler{NoChunking}): AbstractArray
function _tmap(scheduler::StaticScheduler{NoChunking},
f,
A::AbstractArray,
_Arrs::AbstractArray...;
kwargs...)
Arrs = (A, _Arrs...)
nt = nthreads()
tasks = map(enumerate(A)) do (c, i)
tid = @inbounds nthtid(mod1(c, nt))
@spawnat tid begin
args = map(A -> A[i], Arrs)
promise_task_local(f)(args...)
end
end
v = map(fetch, tasks)
reshape(v, size(A)...)
end

# w/ chunking
function _tmap(scheduler::Scheduler,
f,
Expand Down
2 changes: 1 addition & 1 deletion src/macro_impl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ function tasks_macro(forex)
q = if !isnothing(settings.reducer)
quote
$make_mapping_function
tmapreduce(mapping_function, $(settings.reducer), $(itrng); scheduler = $(settings.scheduler))
tmapreduce(mapping_function, $(settings.reducer), $(itrng); scheduler = $(settings.scheduler))
end
elseif settings.collect
maybe_warn_useless_init(settings)
Expand Down
99 changes: 49 additions & 50 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ with other multithreaded code.
* Determines the number of chunks (and thus also the number of parallel tasks).
* Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing.
* Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads.
* Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
very costly!
- `chunksize::Integer` (default `0`)
- `chunksize::Integer` (default not set)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer).
- `split::Symbol` (default `:batch`):
* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
* Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
* Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
- `chunking::Bool` (default `true`):
* Controls whether input elements are grouped into chunks (`true`) or not (`false`).
* For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly!
- `threadpool::Symbol` (default `:default`):
* Possible options are `:default` and `:interactive`.
* The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes.
Expand All @@ -63,25 +64,20 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler
split::Symbol

function DynamicScheduler(
threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol)
threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true)
if !(threadpool in (:default, :interactive))
throw(ArgumentError("threadpool must be either :default or :interactive"))
end
if nchunks < 0
throw(ArgumentError("nchunks must be a positive integer (or zero)."))
end
if chunksize < 0
throw(ArgumentError("chunksize must be a positive integer (or zero)."))
end
if nchunks != 0 && chunksize != 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero"))
end
if nchunks == 0 && chunksize == 0
if !chunking
C = NoChunking
elseif chunksize != 0
C = FixedSize
else
C = FixedCount
if !(nchunks > 0 || chunksize > 0)
throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false)."))
end
if nchunks > 0 && chunksize > 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer"))
end
C = chunksize > 0 ? FixedSize : FixedCount
end
new{C}(threadpool, nchunks, chunksize, split)
end
Expand All @@ -91,19 +87,22 @@ function DynamicScheduler(;
threadpool::Symbol = :default,
nchunks::Union{Integer, Nothing} = nothing,
chunksize::Union{Integer, Nothing} = nothing,
chunking::Bool = true,
split::Symbol = :batch)
if isnothing(nchunks)
if !chunking
nchunks = -1
chunksize = -1
else
# only choose nchunks default if chunksize hasn't been specified
if isnothing(chunksize)
if isnothing(nchunks) && isnothing(chunksize)
nchunks = 2 * nthreads(threadpool)
chunksize = -1
else
nchunks = 0
nchunks = isnothing(nchunks) ? -1 : nchunks
chunksize = isnothing(chunksize) ? -1 : chunksize
end
end
if isnothing(chunksize)
chunksize = 0
end
DynamicScheduler(threadpool, nchunks, chunksize, split)
DynamicScheduler(threadpool, nchunks, chunksize, split; chunking)
end

function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler)
Expand All @@ -129,11 +128,12 @@ Isn't well composable with other multithreaded code though.
* Determines the number of chunks (and thus also the number of parallel tasks).
* Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads.
* For `nchunks > nthreads()` the chunks will be distributed to the available threads in a round-robin fashion.
* Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
very costly!
- `chunksize::Integer` (default `0`)
- `chunksize::Integer` (default not set)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero).
- `chunking::Bool` (default `true`):
* Controls whether input elements are grouped into chunks (`true`) or not (`false`).
* For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly!
- `split::Symbol` (default `:batch`):
* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
Expand All @@ -144,22 +144,18 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler
chunksize::Int
split::Symbol

function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Symbol)
if nchunks < 0
throw(ArgumentError("nchunks must be a positive integer (or zero)."))
end
if chunksize < 0
throw(ArgumentError("chunksize must be a positive integer (or zero)."))
end
if nchunks != 0 && chunksize != 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero"))
end
if nchunks == 0 && chunksize == 0
function StaticScheduler(
nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true)
if !chunking
C = NoChunking
elseif chunksize != 0
C = FixedSize
else
C = FixedCount
if !(nchunks > 0 || chunksize > 0)
throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false)."))
end
if nchunks > 0 && chunksize > 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer"))
end
C = chunksize > 0 ? FixedSize : FixedCount
end
new{C}(nchunks, chunksize, split)
end
Expand All @@ -168,19 +164,22 @@ end
function StaticScheduler(;
nchunks::Union{Integer, Nothing} = nothing,
chunksize::Union{Integer, Nothing} = nothing,
chunking::Bool = true,
split::Symbol = :batch)
if isnothing(nchunks)
if !chunking
nchunks = -1
chunksize = -1
else
# only choose nchunks default if chunksize hasn't been specified
if isnothing(chunksize)
if isnothing(nchunks) && isnothing(chunksize)
nchunks = nthreads(:default)
chunksize = -1
else
nchunks = 0
nchunks = isnothing(nchunks) ? -1 : nchunks
chunksize = isnothing(chunksize) ? -1 : chunksize
end
end
if isnothing(chunksize)
chunksize = 0
end
StaticScheduler(nchunks, chunksize, split)
StaticScheduler(nchunks, chunksize, split; chunking)
end

function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::StaticScheduler)
Expand Down
57 changes: 23 additions & 34 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +,
if sched == GreedyScheduler
scheduler = sched(; ntasks = nchunks)
elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking}
scheduler = DynamicScheduler(; nchunks = 0)
scheduler = DynamicScheduler(; chunking=false)
elseif sched == SerialScheduler
scheduler = SerialScheduler()
else
Expand Down Expand Up @@ -66,7 +66,7 @@ end
@testset "ChunkSplitters.Chunk" begin
x = rand(100)
chnks = OhMyThreads.chunks(x; n = Threads.nthreads())
for scheduler in (DynamicScheduler(; nchunks = 0), StaticScheduler(; nchunks = 0))
for scheduler in (DynamicScheduler(; chunking=false), StaticScheduler(; chunking=false))
@testset "$scheduler" begin
@test tmap(x -> sin.(x), chnks; scheduler) ≈ map(x -> sin.(x), chnks)
@test tmapreduce(x -> sin.(x), vcat, chnks; scheduler) ≈
Expand Down Expand Up @@ -237,41 +237,30 @@ end
end

@testset "chunking mode + chunksize option" begin
@test DynamicScheduler(; chunksize=2) isa DynamicScheduler
@test StaticScheduler(; chunksize=2) isa StaticScheduler
for sched in (DynamicScheduler, StaticScheduler)
@test sched() isa sched
@test sched(; chunksize=2) isa sched

@test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize
@test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount
@test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=0, chunksize=0)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_mode(DynamicScheduler(; nchunks=0)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; chunksize=2)) == true
@test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=2)) == true
@test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=0, chunksize=0)) == false
@test OhMyThreads.Schedulers.chunking_enabled(DynamicScheduler(; nchunks=0)) == false
@test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize
@test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount
@test OhMyThreads.Schedulers.chunking_mode(sched(; chunking=false)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2, chunksize=4, chunking=false)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=-2, chunksize=-4, split=:whatever, chunking=false)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize=2)) == true
@test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2)) == true
@test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=-2, chunksize=-4, chunking=false)) == false
@test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2, chunksize=4, chunking=false)) == false

@test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize
@test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount
@test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=0, chunksize=0)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_mode(StaticScheduler(; nchunks=0)) == OhMyThreads.Schedulers.NoChunking
@test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; chunksize=2)) == true
@test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=2)) == true
@test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=0, chunksize=0)) == false
@test OhMyThreads.Schedulers.chunking_enabled(StaticScheduler(; nchunks=0)) == false
@test_throws ArgumentError sched(; nchunks=2, chunksize=3)
@test_throws ArgumentError sched(; nchunks=0, chunksize=0)
@test_throws ArgumentError sched(; nchunks=-2, chunksize=-3)

@test_throws ArgumentError DynamicScheduler(; nchunks=2, chunksize=3)
@test_throws ArgumentError StaticScheduler(; nchunks=2, chunksize=3)

let scheduler = DynamicScheduler(; chunksize=2)
@test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10)
@test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10)
@test isnothing(tforeach(sin, 1:10; scheduler))
@test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10)
end
let scheduler = StaticScheduler(; chunksize=2)
@test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10)
@test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10)
@test isnothing(tforeach(sin, 1:10; scheduler))
@test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10)
let scheduler = sched(; chunksize=2)
@test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10)
@test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10)
@test isnothing(tforeach(sin, 1:10; scheduler))
@test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10)
end
end
end

Expand Down
Loading