diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index 6c5a8817..6cdd422e 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -28,7 +28,6 @@ Scheduler DynamicScheduler StaticScheduler GreedyScheduler -SpawnAllScheduler ``` ## Non-Exported diff --git a/docs/src/translation.md b/docs/src/translation.md index 435af9a2..efcfbef4 100644 --- a/docs/src/translation.md +++ b/docs/src/translation.md @@ -47,7 +47,7 @@ end ```julia # OhMyThreads -tforeach(1:10; scheduler=SpawnAllScheduler()) do i +tforeach(1:10; scheduler=DynamicScheduler(; nchunks=0)) do i println(i) end ``` diff --git a/src/implementation.jl b/src/implementation.jl index 6e58403a..8f528721 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -5,7 +5,7 @@ import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcol using OhMyThreads: StableTasks, chunks, @spawn, @spawnat using OhMyThreads.Tools: nthtid using OhMyThreads: Scheduler, - chunking_enabled, DynamicScheduler, StaticScheduler, GreedyScheduler, SpawnAllScheduler + chunking_enabled, DynamicScheduler, StaticScheduler, GreedyScheduler using Base: @propagate_inbounds using Base.Threads: nthreads, @threads @@ -75,22 +75,6 @@ function _tmapreduce(f, mapreduce(fetch, op, tasks; mapreduce_kwargs...) end -# SpawnAllScheduler: AbstractArray -function _tmapreduce(f, - op, - Arrs, - ::Type{OutputType}, - scheduler::SpawnAllScheduler, - mapreduce_kwargs)::OutputType where {OutputType} - (; threadpool) = scheduler - check_all_have_same_indices(Arrs) - tasks = map(eachindex(first(Arrs))) do i - args = map(A -> @inbounds(A[i]), Arrs) - @spawn threadpool f(args...) - end - mapreduce(fetch, op, tasks; mapreduce_kwargs...) -end - # StaticScheduler function _tmapreduce(f, op, @@ -191,29 +175,35 @@ function tmap(f, _tmap(scheduler, f, A, _Arrs...; kwargs...) end -# w/o chunking (SpawnAllScheduler, DynamicScheduler{false}) -function _tmap(scheduler::Union{SpawnAllScheduler, DynamicScheduler{false}}, +# w/o chunking (DynamicScheduler{false}): AbstractArray +function _tmap(scheduler::DynamicScheduler{false}, f, - A::Union{AbstractArray, ChunkSplitters.Chunk}, + A::AbstractArray, _Arrs::AbstractArray...; kwargs...) (; threadpool) = scheduler - if A isa ChunkSplitters.Chunk - tasks = map(A) do idcs - @spawn threadpool f(idcs) - end - v = map(fetch, tasks) - else - Arrs = (A, _Arrs...) - tasks = map(eachindex(A)) do i - @spawn threadpool begin - args = map(A -> A[i], Arrs) - f(args...) - end + Arrs = (A, _Arrs...) + tasks = map(eachindex(A)) do i + @spawn threadpool begin + args = map(A -> A[i], Arrs) + f(args...) end - v = map(fetch, tasks) - reshape(v, size(A)...) end + v = map(fetch, tasks) + reshape(v, size(A)...) +end + +# w/o chunking (DynamicScheduler{false}): ChunkSplitters.Chunk +function _tmap(scheduler::DynamicScheduler{false}, + f, + A::ChunkSplitters.Chunk, + _Arrs::AbstractArray...; + kwargs...) + (; threadpool) = scheduler + tasks = map(A) do idcs + @spawn threadpool f(idcs) + end + v = map(fetch, tasks) end # w/ chunking diff --git a/src/schedulers.jl b/src/schedulers.jl index 97653259..bbf14cdf 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -8,7 +8,6 @@ Supertype for all available schedulers: * [`DynamicScheduler`](@ref): default dynamic scheduler * [`StaticScheduler`](@ref): low-overhead static scheduler * [`GreedyScheduler`](@ref): greedy load-balancing scheduler -* [`SpawnAllScheduler`](@ref): `@spawn` one task per element """ abstract type Scheduler end @@ -114,33 +113,8 @@ end chunking_enabled(::GreedyScheduler) = false -""" -A scheduler that spawns a task per element (i.e. there is no internal chunking) to perform -the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic -scheduler and are non-sticky, that is, they can migrate between threads. - -Note that, depending on the input, this scheduler **might spawn many(!) tasks** and can be -very costly! - -Essentially the same as `DynamicScheduler(; nchunks=nelements)`, but with a simpler, -potentially more efficient implementation. - -## Keyword arguments: - -- `threadpool::Symbol` (default `:default`): - * Possible options are `:default` and `:interactive`. - * The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes. -""" -Base.@kwdef struct SpawnAllScheduler <: Scheduler - threadpool::Symbol = :default - - function SpawnAllScheduler(threadpool::Symbol) - threadpool in (:default, :interactive) || - throw(ArgumentError("threadpool must be either :default or :interactive")) - new(threadpool) - end -end - -chunking_enabled(::SpawnAllScheduler) = false +@deprecate SpawnAllScheduler(args...; kwargs...) DynamicScheduler(args...; + nchunks = 0, + kwargs...) end # module diff --git a/test/runtests.jl b/test/runtests.jl index 0db19233..a8e56f9b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,13 +11,13 @@ sets_to_test = [ @testset "Basics" begin for (; ~, f, op, itrs, init) ∈ sets_to_test @testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin - @testset for sched ∈ (StaticScheduler, DynamicScheduler, GreedyScheduler, SpawnAllScheduler) + @testset for sched ∈ (StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{false}) @testset for split ∈ (:batch, :scatter) for nchunks ∈ (1, 2, 6) if sched == GreedyScheduler scheduler = sched(; ntasks=nchunks) - elseif sched == SpawnAllScheduler - scheduler = sched() + elseif sched == DynamicScheduler{false} + scheduler = DynamicScheduler(; nchunks=0) else scheduler = sched(; nchunks, split) end