Skip to content

Commit

Permalink
deprecate SpawnAllScheduler in favor of DynamicScheduler(; nchunks=0)
Browse files Browse the repository at this point in the history
  • Loading branch information
carstenbauer committed Feb 10, 2024
1 parent fd47f2b commit d12eef0
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 68 deletions.
1 change: 0 additions & 1 deletion docs/src/refs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ Scheduler
DynamicScheduler
StaticScheduler
GreedyScheduler
SpawnAllScheduler
```

## Non-Exported
Expand Down
2 changes: 1 addition & 1 deletion docs/src/translation.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ end

```julia
# OhMyThreads
tforeach(1:10; scheduler=SpawnAllScheduler()) do i
tforeach(1:10; scheduler=DynamicScheduler(; nchunks=0)) do i
println(i)
end
```
Expand Down
58 changes: 24 additions & 34 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcol
using OhMyThreads: StableTasks, chunks, @spawn, @spawnat
using OhMyThreads.Tools: nthtid
using OhMyThreads: Scheduler,
chunking_enabled, DynamicScheduler, StaticScheduler, GreedyScheduler, SpawnAllScheduler
chunking_enabled, DynamicScheduler, StaticScheduler, GreedyScheduler
using Base: @propagate_inbounds
using Base.Threads: nthreads, @threads

Expand Down Expand Up @@ -75,22 +75,6 @@ function _tmapreduce(f,
mapreduce(fetch, op, tasks; mapreduce_kwargs...)
end

# SpawnAllScheduler: AbstractArray
function _tmapreduce(f,
op,
Arrs,
::Type{OutputType},
scheduler::SpawnAllScheduler,
mapreduce_kwargs)::OutputType where {OutputType}
(; threadpool) = scheduler
check_all_have_same_indices(Arrs)
tasks = map(eachindex(first(Arrs))) do i
args = map(A -> @inbounds(A[i]), Arrs)
@spawn threadpool f(args...)
end
mapreduce(fetch, op, tasks; mapreduce_kwargs...)
end

# StaticScheduler
function _tmapreduce(f,
op,
Expand Down Expand Up @@ -191,29 +175,35 @@ function tmap(f,
_tmap(scheduler, f, A, _Arrs...; kwargs...)
end

# w/o chunking (SpawnAllScheduler, DynamicScheduler{false})
function _tmap(scheduler::Union{SpawnAllScheduler, DynamicScheduler{false}},
# w/o chunking (DynamicScheduler{false}): AbstractArray
function _tmap(scheduler::DynamicScheduler{false},
f,
A::Union{AbstractArray, ChunkSplitters.Chunk},
A::AbstractArray,
_Arrs::AbstractArray...;
kwargs...)
(; threadpool) = scheduler
if A isa ChunkSplitters.Chunk
tasks = map(A) do idcs
@spawn threadpool f(idcs)
end
v = map(fetch, tasks)
else
Arrs = (A, _Arrs...)
tasks = map(eachindex(A)) do i
@spawn threadpool begin
args = map(A -> A[i], Arrs)
f(args...)
end
Arrs = (A, _Arrs...)
tasks = map(eachindex(A)) do i
@spawn threadpool begin
args = map(A -> A[i], Arrs)
f(args...)
end
v = map(fetch, tasks)
reshape(v, size(A)...)
end
v = map(fetch, tasks)
reshape(v, size(A)...)
end

# w/o chunking (DynamicScheduler{false}): ChunkSplitters.Chunk
function _tmap(scheduler::DynamicScheduler{false},
f,
A::ChunkSplitters.Chunk,
_Arrs::AbstractArray...;
kwargs...)
(; threadpool) = scheduler
tasks = map(A) do idcs
@spawn threadpool f(idcs)
end
v = map(fetch, tasks)
end

# w/ chunking
Expand Down
32 changes: 3 additions & 29 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Supertype for all available schedulers:
* [`DynamicScheduler`](@ref): default dynamic scheduler
* [`StaticScheduler`](@ref): low-overhead static scheduler
* [`GreedyScheduler`](@ref): greedy load-balancing scheduler
* [`SpawnAllScheduler`](@ref): `@spawn` one task per element
"""
abstract type Scheduler end

Expand Down Expand Up @@ -114,33 +113,8 @@ end

chunking_enabled(::GreedyScheduler) = false

"""
A scheduler that spawns a task per element (i.e. there is no internal chunking) to perform
the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic
scheduler and are non-sticky, that is, they can migrate between threads.
Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
very costly!
Essentially the same as `DynamicScheduler(; nchunks=nelements)`, but with a simpler,
potentially more efficient implementation.
## Keyword arguments:
- `threadpool::Symbol` (default `:default`):
* Possible options are `:default` and `:interactive`.
* The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes.
"""
Base.@kwdef struct SpawnAllScheduler <: Scheduler
threadpool::Symbol = :default

function SpawnAllScheduler(threadpool::Symbol)
threadpool in (:default, :interactive) ||
throw(ArgumentError("threadpool must be either :default or :interactive"))
new(threadpool)
end
end

chunking_enabled(::SpawnAllScheduler) = false
@deprecate SpawnAllScheduler(args...; kwargs...) DynamicScheduler(args...;
nchunks = 0,
kwargs...)

end # module
6 changes: 3 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ sets_to_test = [
@testset "Basics" begin
for (; ~, f, op, itrs, init) sets_to_test
@testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin
@testset for sched (StaticScheduler, DynamicScheduler, GreedyScheduler, SpawnAllScheduler)
@testset for sched (StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{false})
@testset for split (:batch, :scatter)
for nchunks (1, 2, 6)
if sched == GreedyScheduler
scheduler = sched(; ntasks=nchunks)
elseif sched == SpawnAllScheduler
scheduler = sched()
elseif sched == DynamicScheduler{false}
scheduler = DynamicScheduler(; nchunks=0)
else
scheduler = sched(; nchunks, split)
end
Expand Down

0 comments on commit d12eef0

Please sign in to comment.