Skip to content

Commit

Permalink
support symbol and split
Browse files Browse the repository at this point in the history
  • Loading branch information
carstenbauer committed Sep 25, 2024
1 parent 0c369f8 commit 85e968d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/OhMyThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const Split = ChunkSplitters.Split
const Consecutive = ChunkSplitters.Consecutive
const RoundRobin = ChunkSplitters.RoundRobin
export chunks, index_chunks
# export RoundRobin, Consecutive, Split # TODO: should we export this?
export RoundRobin, Consecutive, Split

using TaskLocalValues: TaskLocalValues
const TaskLocalValue = TaskLocalValues.TaskLocalValue
Expand Down
43 changes: 32 additions & 11 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ with other multithreaded code.
- `chunksize::Integer` (default not set)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be a positive integer).
- `split::OhMyThreads.Split` (default `OhMyThreads.Consecutive()`):
- `split::Split` (default `Consecutive()`):
* Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
* Beware that for `split=OhMyThreads.RoundRobin()` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
* Beware that for `split=RoundRobin()` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
- `chunking::Bool` (default `true`):
* Controls whether input elements are grouped into chunks (`true`) or not (`false`).
* For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly!
Expand All @@ -75,7 +75,7 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler
split::Split

function DynamicScheduler(threadpool::Symbol, nchunks::Integer, chunksize::Integer,
split::Split; chunking::Bool = true)
split::Union{Split, Symbol}; chunking::Bool = true)
if !(threadpool in (:default, :interactive))
throw(ArgumentError("threadpool must be either :default or :interactive"))
end
Expand All @@ -90,6 +90,13 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler
end
C = chunksize > 0 ? FixedSize : FixedCount
end
if split isa Symbol
if split in (:consecutive, :batch)
split = Consecutive()
elseif split in (:roundrobin, :scatter)
split = RoundRobin()
end
end
new{C}(threadpool, nchunks, chunksize, split)
end
end
Expand All @@ -100,7 +107,7 @@ function DynamicScheduler(;
ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks
chunksize::MaybeInteger = NotGiven(),
chunking::Bool = true,
split::Split = Consecutive())
split::Union{Split, Symbol} = Consecutive())
if !chunking
nchunks = -1
chunksize = -1
Expand Down Expand Up @@ -152,17 +159,17 @@ Isn't well composable with other multithreaded code though.
- `chunking::Bool` (default `true`):
* Controls whether input elements are grouped into chunks (`true`) or not (`false`).
* For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly!
- `split::OhMyThreads.Split` (default `OhMyThreads.Consecutive()`):
- `split::Split` (default `Consecutive()`):
* Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained.
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
* Beware that for `split=OhMyThreads.RoundRobin()` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
* Beware that for `split=RoundRobin()` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**!
"""
struct StaticScheduler{C <: ChunkingMode} <: Scheduler
nchunks::Int
chunksize::Int
split::Split

function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Split;
function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Union{Split, Symbol};
chunking::Bool = true)
if !chunking
C = NoChunking
Expand All @@ -175,6 +182,13 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler
end
C = chunksize > 0 ? FixedSize : FixedCount
end
if split isa Symbol
if split in (:consecutive, :batch)
split = Consecutive()
elseif split in (:roundrobin, :scatter)
split = RoundRobin()
end
end
new{C}(nchunks, chunksize, split)
end
end
Expand All @@ -184,7 +198,7 @@ function StaticScheduler(;
ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks
chunksize::MaybeInteger = NotGiven(),
chunking::Bool = true,
split::Split = Consecutive())
split::Union{Split, Symbol} = Consecutive())
if !chunking
nchunks = -1
chunksize = -1
Expand Down Expand Up @@ -239,7 +253,7 @@ some additional overhead.
- `chunksize::Integer` (default not set)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer).
- `split::OhMyThreads.Split` (default `OhMyThreads.RoundRobin()`):
- `split::Split` (default `RoundRobin()`):
* Determines how the collection is divided into chunks (if chunking=true).
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
"""
Expand All @@ -250,7 +264,7 @@ struct GreedyScheduler{C <: ChunkingMode} <: Scheduler
split::Split

function GreedyScheduler(ntasks::Int, nchunks::Integer, chunksize::Integer,
split::Split; chunking::Bool = false)
split::Union{Split, Symbol}; chunking::Bool = false)
ntasks > 0 || throw(ArgumentError("ntasks must be a positive integer"))
if !chunking
C = NoChunking
Expand All @@ -263,6 +277,13 @@ struct GreedyScheduler{C <: ChunkingMode} <: Scheduler
end
C = chunksize > 0 ? FixedSize : FixedCount
end
if split isa Symbol
if split in (:consecutive, :batch)
split = Consecutive()
elseif split in (:roundrobin, :scatter)
split = RoundRobin()
end
end
new{C}(ntasks, nchunks, chunksize, split)
end
end
Expand All @@ -272,7 +293,7 @@ function GreedyScheduler(;
nchunks::MaybeInteger = NotGiven(),
chunksize::MaybeInteger = NotGiven(),
chunking::Bool = false,
split::Split = RoundRobin())
split::Union{Split, Symbol} = RoundRobin())
if isgiven(nchunks) || isgiven(chunksize)
chunking = true
end
Expand Down
6 changes: 3 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...)
StaticScheduler, DynamicScheduler, GreedyScheduler,
DynamicScheduler{OhMyThreads.Schedulers.NoChunking},
SerialScheduler, ChunkedGreedy)
@testset for split in (Consecutive(), RoundRobin())
@testset for split in (Consecutive(), RoundRobin(), :consecutive, :roundrobin)
for nchunks in (1, 2, 6)
if sched == GreedyScheduler
scheduler = sched(; ntasks = nchunks)
Expand All @@ -36,7 +36,7 @@ ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...)
end

kwargs = (; scheduler)
if (split == RoundRobin() ||
if (split in (RoundRobin(), :roundrobin) ||
sched (GreedyScheduler, ChunkedGreedy)) || op (vcat, *)
# scatter and greedy only works for commutative operators!
else
Expand All @@ -46,7 +46,7 @@ ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...)
@test treduce(op, f.(itrs...); init, kwargs...) ~ mapreduce_f_op_itr
end

split == RoundRobin() && continue
split in (RoundRobin(), :roundrobin) && continue
map_f_itr = map(f, itrs...)
@test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr)
@test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr)
Expand Down

0 comments on commit 85e968d

Please sign in to comment.