Skip to content

Commit

Permalink
chunksize option (#68)
Browse files Browse the repository at this point in the history
* chunksize option for DynamicScheduler + ChunkingMode

* chunking_mode

* adjust implementations + basic tests

* changelog

* pretty printing of scheduler

* drop unnecessary lines

* round robin assignment for static scheduler and nchunks > nthreads

* changelog update

* make StaticScheduler support chunksize

* changlog + docstring updates

* fix typo
  • Loading branch information
carstenbauer authored Mar 6, 2024
1 parent 7fc5c0a commit 0ed78b4
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ OhMyThreads.jl Changelog
Version 0.5.0
-------------

- ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error).
- ![Feature][badge-feature] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive.
- ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise).
- ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed).
- ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"

[compat]
BangBang = "0.4"
ChunkSplitters = "2.1"
ChunkSplitters = "2.3"
StableTasks = "0.1.5"
TaskLocalValues = "0.1"
julia = "1.6"
Expand Down
61 changes: 30 additions & 31 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcol
using OhMyThreads: chunks, @spawn, @spawnat
using OhMyThreads.Tools: nthtid
using OhMyThreads: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
using OhMyThreads.Schedulers: chunking_enabled
using OhMyThreads.Schedulers: chunking_enabled, chunking_mode, ChunkingMode, NoChunking,
FixedSize, FixedCount
using Base: @propagate_inbounds
using Base.Threads: nthreads, @threads

Expand All @@ -18,10 +19,20 @@ include("macro_impl.jl")
function auto_disable_chunking_warning()
@warn("You passed in a `ChunkSplitters.Chunk` but also a scheduler that has "*
"chunking enabled. Will turn off internal chunking to proceed.\n"*
"To avoid this warning, try to turn off chunking (`nchunks=0`) "*
"To avoid this warning, try to turn off chunking (`nchunks=0` and `chunksize=0`) "*
"or pass in `collect(chunks(...))`.")
end

function _chunks(sched, arg; kwargs...)
C = chunking_mode(sched)
@assert C != NoChunking
if C == FixedCount
chunks(arg; n = sched.nchunks, split = sched.split, kwargs...)
elseif C == FixedSize
chunks(arg; size = sched.chunksize, split = sched.split, kwargs...)
end
end

function tmapreduce(f, op, Arrs...;
scheduler::Scheduler = DynamicScheduler(),
outputtype::Type = Any,
Expand All @@ -46,10 +57,10 @@ function _tmapreduce(f,
::Type{OutputType},
scheduler::DynamicScheduler,
mapreduce_kwargs)::OutputType where {OutputType}
(; nchunks, split, threadpool) = scheduler
(; threadpool) = scheduler
check_all_have_same_indices(Arrs)
if chunking_enabled(scheduler)
tasks = map(chunks(first(Arrs); n = nchunks, split)) do inds
tasks = map(_chunks(scheduler, first(Arrs))) do inds
args = map(A -> view(A, inds), Arrs)
@spawn threadpool mapreduce(f, op, args...; $mapreduce_kwargs...)
end
Expand Down Expand Up @@ -85,24 +96,18 @@ function _tmapreduce(f,
::Type{OutputType},
scheduler::StaticScheduler,
mapreduce_kwargs) where {OutputType}
(; nchunks, split) = scheduler
nt = nthreads()
check_all_have_same_indices(Arrs)
if chunking_enabled(scheduler)
n = min(nthreads(), nchunks) # We could implement strategies, like round-robin, in the future
tasks = map(enumerate(chunks(first(Arrs); n, split))) do (c, inds)
tid = @inbounds nthtid(c)
tasks = map(enumerate(_chunks(scheduler, first(Arrs)))) do (c, inds)
tid = @inbounds nthtid(mod1(c, nt))
args = map(A -> view(A, inds), Arrs)
@spawnat tid mapreduce(f, op, args...; mapreduce_kwargs...)
end
mapreduce(fetch, op, tasks)
else
if length(first(Arrs)) > nthreads()
error("You have disabled chunking but provided an input with more then " *
"`nthreads()` elements. This is not supported for `StaticScheduler`.")
end
n = min(nthreads(), nchunks) # We could implement strategies, like round-robin, in the future
tasks = map(enumerate(eachindex(first(Arrs)))) do (c, i)
tid = @inbounds nthtid(c)
tid = @inbounds nthtid(mod1(c, nt))
args = map(A -> @inbounds(A[i]), Arrs)
@spawnat tid f(args...)
end
Expand All @@ -120,12 +125,9 @@ function _tmapreduce(f,
chunking_enabled(scheduler) && auto_disable_chunking_warning()
check_all_have_same_indices(Arrs)
chnks = only(Arrs)
if length(chnks) > nthreads()
error("You provided a `ChunkSplitters.Chunk` with more than `nthreads()` chunks " *
"as input, which is not supported by the `StaticScheduler`.")
end
nt = nthreads()
tasks = map(enumerate(chnks)) do (c, idcs)
tid = @inbounds nthtid(c)
tid = @inbounds nthtid(mod1(c, nt))
@spawnat tid f(idcs)
end
mapreduce(fetch, op, tasks; mapreduce_kwargs...)
Expand Down Expand Up @@ -216,8 +218,8 @@ function tmap(f,
_tmap(scheduler, f, A, _Arrs...; kwargs...)
end

# w/o chunking (DynamicScheduler{false}): AbstractArray
function _tmap(scheduler::DynamicScheduler{false},
# w/o chunking (DynamicScheduler{NoChunking}): AbstractArray
function _tmap(scheduler::DynamicScheduler{NoChunking},
f,
A::AbstractArray,
_Arrs::AbstractArray...;
Expand All @@ -234,8 +236,8 @@ function _tmap(scheduler::DynamicScheduler{false},
reshape(v, size(A)...)
end

# w/o chunking (DynamicScheduler{false}): ChunkSplitters.Chunk
function _tmap(scheduler::DynamicScheduler{false},
# w/o chunking (DynamicScheduler{NoChunking}): ChunkSplitters.Chunk
function _tmap(scheduler::DynamicScheduler{NoChunking},
f,
A::ChunkSplitters.Chunk,
_Arrs::AbstractArray...;
Expand All @@ -247,18 +249,15 @@ function _tmap(scheduler::DynamicScheduler{false},
map(fetch, tasks)
end

# w/o chunking (StaticScheduler{false}): ChunkSplitters.Chunk
function _tmap(scheduler::StaticScheduler{false},
# w/o chunking (StaticScheduler{NoChunking}): ChunkSplitters.Chunk
function _tmap(scheduler::StaticScheduler{NoChunking},
f,
A::ChunkSplitters.Chunk,
_Arrs::AbstractArray...;
kwargs...)
if length(A) > nthreads()
error("You provided a `ChunkSplitters.Chunk` with more than `nthreads()` chunks " *
"as input, which is not supported by the `StaticScheduler`.")
end
nt = nthreads()
tasks = map(enumerate(A)) do (c, idcs)
tid = @inbounds nthtid(c)
tid = @inbounds nthtid(mod1(c, nt))
@spawnat tid f(idcs)
end
map(fetch, tasks)
Expand All @@ -271,7 +270,7 @@ function _tmap(scheduler::Scheduler,
_Arrs::AbstractArray...;
kwargs...)
Arrs = (A, _Arrs...)
idcs = collect(chunks(A; n = scheduler.nchunks))
idcs = collect(_chunks(scheduler, A))
reduction_f = append!!
v = tmapreduce(reduction_f, idcs; scheduler, kwargs...) do inds
args = map(A -> @view(A[inds]), Arrs)
Expand Down
164 changes: 139 additions & 25 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ Supertype for all available schedulers:
"""
abstract type Scheduler end

abstract type ChunkingMode end
struct NoChunking <: ChunkingMode end
struct FixedCount <: ChunkingMode end
struct FixedSize <: ChunkingMode end

function _chunkingstr(s::Scheduler)
C = chunking_mode(s)
if C == FixedCount
cstr = "fixed count ($(s.nchunks)), :$(s.split)"
elseif C == FixedSize
cstr = "fixed size ($(s.chunksize)), :$(s.split)"
elseif C == NoChunking
cstr = "none"
end
end

"""
The default dynamic scheduler. Divides the given collection into chunks and
then spawns a task per chunk to perform the requested operation in parallel.
Expand All @@ -26,29 +42,74 @@ with other multithreaded code.
* Determines the number of chunks (and thus also the number of parallel tasks).
* Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing.
* Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads.
* Setting `nchunks = 0` turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
* Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
very costly!
- `chunksize::Integer` (default `0`)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero).
- `split::Symbol` (default `:batch`):
* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
* Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
* Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
- `threadpool::Symbol` (default `:default`):
* Possible options are `:default` and `:interactive`.
* The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes.
"""
Base.@kwdef struct DynamicScheduler{C} <: Scheduler
threadpool::Symbol = :default
nchunks::Int = 2 * nthreads(threadpool) # a multiple of nthreads to enable load balancing
split::Symbol = :batch

function DynamicScheduler(threadpool::Symbol, nchunks::Integer, split::Symbol)
threadpool in (:default, :interactive) ||
struct DynamicScheduler{C <: ChunkingMode} <: Scheduler
threadpool::Symbol
nchunks::Int
chunksize::Int
split::Symbol

function DynamicScheduler(
threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol)
if !(threadpool in (:default, :interactive))
throw(ArgumentError("threadpool must be either :default or :interactive"))
nchunks >= 0 ||
end
if nchunks < 0
throw(ArgumentError("nchunks must be a positive integer (or zero)."))
C = !(nchunks == 0) # type parameter indicates whether chunking is enabled
new{C}(threadpool, nchunks, split)
end
if chunksize < 0
throw(ArgumentError("chunksize must be a positive integer (or zero)."))
end
if nchunks != 0 && chunksize != 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero"))
end
if nchunks == 0 && chunksize == 0
C = NoChunking
elseif chunksize != 0
C = FixedSize
else
C = FixedCount
end
new{C}(threadpool, nchunks, chunksize, split)
end
end

function DynamicScheduler(;
threadpool::Symbol = :default,
nchunks::Union{Integer, Nothing} = nothing,
chunksize::Union{Integer, Nothing} = nothing,
split::Symbol = :batch)
if isnothing(nchunks)
# only choose nchunks default if chunksize hasn't been specified
if isnothing(chunksize)
nchunks = 2 * nthreads(threadpool)
else
nchunks = 0
end
end
if isnothing(chunksize)
chunksize = 0
end
DynamicScheduler(threadpool, nchunks, chunksize, split)
end

function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler)
print("DynamicScheduler", "\n")
cstr = _chunkingstr(s)
println(io, "├ Chunking: ", cstr)
print(io, "└ Threadpool: ", s.threadpool)
end

"""
Expand All @@ -66,22 +127,66 @@ Isn't well composable with other multithreaded code though.
- `nchunks::Integer` (default `nthreads()`):
* Determines the number of chunks (and thus also the number of parallel tasks).
* Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads.
* Currently, `nchunks > nthreads()` **isn't officialy supported** but, for now, will fall back to `nchunks = nthreads()`.
* For `nchunks > nthreads()` the chunks will be distributed to the available threads in a round-robin fashion.
* Setting `nchunks = 0` (and `chunksize = 0`) turns off the internal chunking entirely (a task is spawned for each element). Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
very costly!
- `chunksize::Integer` (default `0`)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero).
- `split::Symbol` (default `:batch`):
* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
* Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
"""
Base.@kwdef struct StaticScheduler{C} <: Scheduler
nchunks::Int = nthreads()
split::Symbol = :batch
struct StaticScheduler{C <: ChunkingMode} <: Scheduler
nchunks::Int
chunksize::Int
split::Symbol

function StaticScheduler(nchunks::Integer, split::Symbol)
nchunks >= 0 ||
function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Symbol)
if nchunks < 0
throw(ArgumentError("nchunks must be a positive integer (or zero)."))
C = !(nchunks == 0) # type parameter indicates whether chunking is enabled
new{C}(nchunks, split)
end
if chunksize < 0
throw(ArgumentError("chunksize must be a positive integer (or zero)."))
end
if nchunks != 0 && chunksize != 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be non-zero"))
end
if nchunks == 0 && chunksize == 0
C = NoChunking
elseif chunksize != 0
C = FixedSize
else
C = FixedCount
end
new{C}(nchunks, chunksize, split)
end
end

function StaticScheduler(;
nchunks::Union{Integer, Nothing} = nothing,
chunksize::Union{Integer, Nothing} = nothing,
split::Symbol = :batch)
if isnothing(nchunks)
# only choose nchunks default if chunksize hasn't been specified
if isnothing(chunksize)
nchunks = nthreads(:default)
else
nchunks = 0
end
end
if isnothing(chunksize)
chunksize = 0
end
StaticScheduler(nchunks, chunksize, split)
end

function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::StaticScheduler)
print("StaticScheduler", "\n")
cstr = _chunkingstr(s)
println(io, "├ Chunking: ", cstr)
print(io, "└ Threadpool: default")
end

"""
Expand Down Expand Up @@ -110,9 +215,18 @@ Base.@kwdef struct GreedyScheduler <: Scheduler
end
end

function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::GreedyScheduler)
print("GreedyScheduler", "\n")
println(io, "├ Num. tasks: ", s.ntasks)
print(io, "└ Threadpool: default")
end

chunking_mode(s::Scheduler) = chunking_mode(typeof(s))
chunking_mode(::Type{DynamicScheduler{C}}) where {C} = C
chunking_mode(::Type{StaticScheduler{C}}) where {C} = C
chunking_mode(::Type{GreedyScheduler}) = NoChunking

chunking_enabled(s::Scheduler) = chunking_enabled(typeof(s))
chunking_enabled(::Type{DynamicScheduler{C}}) where {C} = C
chunking_enabled(::Type{StaticScheduler{C}}) where {C} = C
chunking_enabled(::Type{GreedyScheduler}) = false
chunking_enabled(::Type{S}) where {S <: Scheduler} = chunking_mode(S) != NoChunking

end # module
Loading

0 comments on commit 0ed78b4

Please sign in to comment.