Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpawnAllScheduler #46

Merged
merged 5 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
OhMyThreads.jl Changelog
=========================

Version 0.4.1
-------------

- ![Feature][badge-feature] Added a new, simple `SpawnAllScheduler` that spawns a task per input element (can be a lot of tasks!).


Version 0.4.0
-------------

Expand Down
1 change: 1 addition & 0 deletions docs/src/refs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Scheduler
DynamicScheduler
StaticScheduler
GreedyScheduler
SpawnAllScheduler
```

## Non-Exported
Expand Down
2 changes: 1 addition & 1 deletion docs/src/translation.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ end

```julia
# OhMyThreads
tforeach(1:10; scheduler=DynamicScheduler(; nchunks=10)) do i
tforeach(1:10; scheduler=SpawnAllScheduler()) do i
println(i)
end
```
Expand Down
5 changes: 3 additions & 2 deletions src/OhMyThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ function tcollect end

include("tools.jl")
include("schedulers.jl")
using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
using .Schedulers: Scheduler,
DynamicScheduler, StaticScheduler, GreedyScheduler, SpawnAllScheduler
include("implementation.jl")

export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect
export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SpawnAllScheduler

end # module OhMyThreads
57 changes: 51 additions & 6 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcol

using OhMyThreads: StableTasks, chunks, @spawn, @spawnat
using OhMyThreads.Tools: nthtid
using OhMyThreads: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
using OhMyThreads: Scheduler,
DynamicScheduler, StaticScheduler, GreedyScheduler, SpawnAllScheduler
using Base: @propagate_inbounds
using Base.Threads: nthreads, @threads

Expand Down Expand Up @@ -43,6 +44,22 @@ function _tmapreduce(f,
mapreduce(fetch, op, tasks)
end

# SpawnAllScheduler
function _tmapreduce(f,
op,
Arrs,
::Type{OutputType},
scheduler::SpawnAllScheduler,
mapreduce_kwargs)::OutputType where {OutputType}
(; threadpool) = scheduler
check_all_have_same_indices(Arrs)
tasks = map(eachindex(first(Arrs))) do i
args = map(A -> @inbounds(A[i]), Arrs)
@spawn threadpool f(args...)
end
mapreduce(fetch, op, tasks; mapreduce_kwargs...)
end

# GreedyScheduler
function _tmapreduce(f,
op,
Expand Down Expand Up @@ -124,15 +141,43 @@ function tmap(f,
kwargs...)
if scheduler isa GreedyScheduler
error("Greedy scheduler isn't supported with `tmap` unless you provide an `OutputElementType` argument, since the greedy schedule requires a commutative reducing operator.")
end
(; nchunks, split) = scheduler
if split != :batch
elseif hasfield(typeof(scheduler), :split) && scheduler.split != :batch
error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $scheduler)")
end

Arrs = (A, _Arrs...)
check_all_have_same_indices(Arrs)
chunk_idcs = collect(chunks(A; n = nchunks))
v = tmapreduce(append!!, chunk_idcs; scheduler, kwargs...) do inds
_tmap(scheduler, f, A, _Arrs...; kwargs...)
end

# SpawnAllScheduler
function _tmap(scheduler::SpawnAllScheduler,
f,
A::AbstractArray,
_Arrs::AbstractArray...;
kwargs...)
(; threadpool) = scheduler
Arrs = (A, _Arrs...)
ts = map(eachindex(A)) do i
@spawn threadpool begin
args = map(A -> A[i], Arrs)
f(args...)
end
end
v = map(fetch, ts)
reshape(v, size(A)...)
end

# All other schedulers
function _tmap(scheduler::Scheduler,
f,
A::AbstractArray,
_Arrs::AbstractArray...;
kwargs...)
Arrs = (A, _Arrs...)
idcs = collect(chunks(A; n = scheduler.nchunks))
reduction_f = append!!
v = tmapreduce(reduction_f, idcs; scheduler, kwargs...) do inds
args = map(A -> @view(A[inds]), Arrs)
map(f, args...)
end
Expand Down
28 changes: 28 additions & 0 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Supertype for all available schedulers:
* [`DynamicScheduler`](@ref): default dynamic scheduler
* [`StaticScheduler`](@ref): low-overhead static scheduler
* [`GreedyScheduler`](@ref): greedy load-balancing scheduler
* [`SpawnAllScheduler`](@ref): `@spawn` one task per element
"""
abstract type Scheduler end

Expand Down Expand Up @@ -102,4 +103,31 @@ Base.@kwdef struct GreedyScheduler <: Scheduler
end
end

"""
A scheduler that spawns a task per element (i.e. there is no internal chunking) to perform
the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic
scheduler and are non-sticky, that is, they can migrate between threads.

Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be
very costly!

Essentially the same as `DynamicScheduler(; nchunks=nelements)`, but with a simpler,
potentially more efficient implementation.

## Keyword arguments:

- `threadpool::Symbol` (default `:default`):
* Possible options are `:default` and `:interactive`.
* The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes.
"""
Base.@kwdef struct SpawnAllScheduler <: Scheduler
threadpool::Symbol = :default

function SpawnAllScheduler(threadpool::Symbol)
threadpool in (:default, :interactive) ||
throw(ArgumentError("threadpool must be either :default or :interactive"))
new(threadpool)
end
end

end # module
4 changes: 3 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ sets_to_test = [
@testset "Basics" begin
for (; ~, f, op, itrs, init) ∈ sets_to_test
@testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin
@testset for sched ∈ (StaticScheduler, DynamicScheduler, GreedyScheduler)
@testset for sched ∈ (StaticScheduler, DynamicScheduler, GreedyScheduler, SpawnAllScheduler)
@testset for split ∈ (:batch, :scatter)
for nchunks ∈ (1, 2, 6)
if sched == GreedyScheduler
scheduler = sched(; ntasks=nchunks)
elseif sched == SpawnAllScheduler
scheduler = sched()
else
scheduler = sched(; nchunks, split)
end
Expand Down
Loading