From 98adfd26d847a3c1d1906acb38d55ff69a453df3 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 16:21:10 +0100 Subject: [PATCH 01/17] tmp --- src/implementation.jl | 37 +++++++++++++++++++------- src/schedulers.jl | 60 +++++++++++++++++++++++++++---------------- 2 files changed, 66 insertions(+), 31 deletions(-) diff --git a/src/implementation.jl b/src/implementation.jl index 7bbb7325..80a93e3f 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -4,10 +4,12 @@ import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcol using OhMyThreads: chunks, @spawn, @spawnat, WithTaskLocals, promise_task_local using OhMyThreads.Tools: nthtid -using OhMyThreads: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, - SerialScheduler -using OhMyThreads.Schedulers: chunking_enabled, chunking_mode, ChunkingMode, NoChunking, - FixedSize, FixedCount +using OhMyThreads: Scheduler, + DynamicScheduler, StaticScheduler, GreedyScheduler, + SerialScheduler +using OhMyThreads.Schedulers: chunking_enabled, + chunking_mode, ChunkingMode, NoChunking, + FixedSize, FixedCount, scheduler_from_symbol using Base: @propagate_inbounds using Base.Threads: nthreads, @threads @@ -34,18 +36,35 @@ function _chunks(sched, arg; kwargs...) end end +const MaybeScheduler = Union{Nothing, Scheduler, Symbol} +const MaybeInteger = Union{Nothing, Integer} + function tmapreduce(f, op, Arrs...; - scheduler::Scheduler = DynamicScheduler(), + scheduler::MaybeScheduler = nothing, + ntasks::MaybeInteger = nothing, outputtype::Type = Any, mapreduce_kwargs...) min_kwarg_len = haskey(mapreduce_kwargs, :init) ? 1 : 0 if length(mapreduce_kwargs) > min_kwarg_len tmapreduce_kwargs_err(; mapreduce_kwargs...) end - if scheduler isa SerialScheduler + isnothing(ntasks) || ntasks > 0 || + throw(ArgumentError("ntasks must be a positive integer.")) + if scheduler isa Scheduler + isnothing(ntasks) || + throw(ArgumentError("providing an explicit scheduler as well as ntasks is currently not supported.")) + _scheduler = scheduler + elseif scheduler isa Symbol + _scheduler = scheduler_from_symbol(scheduler; ntasks) + else # scheduler == nothing + _scheduler = DynamicScheduler(; ntasks) + end + + @show _scheduler + if _scheduler isa SerialScheduler mapreduce(f, op, Arrs...; mapreduce_kwargs...) else - @noinline _tmapreduce(f, op, Arrs, outputtype, scheduler, mapreduce_kwargs) + @noinline _tmapreduce(f, op, Arrs, outputtype, _scheduler, mapreduce_kwargs) end end @@ -112,8 +131,8 @@ function _tmapreduce(f, args = map(A -> view(A, inds), Arrs) # Note, calling `promise_task_local` here is only safe because we're assuming that # Base.mapreduce isn't going to magically try to do multithreading on us... - @spawnat tid mapreduce( - promise_task_local(f), promise_task_local(op), args...; mapreduce_kwargs...) + @spawnat tid mapreduce(promise_task_local(f), promise_task_local(op), args...; + mapreduce_kwargs...) end # Note, calling `promise_task_local` here is only safe because we're assuming that # Base.mapreduce isn't going to magically try to do multithreading on us... diff --git a/src/schedulers.jl b/src/schedulers.jl index 92f41e3d..c970ba53 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -2,6 +2,8 @@ module Schedulers using Base.Threads: nthreads +const MaybeInteger = Union{Integer, Nothing} + """ Supertype for all available schedulers: @@ -39,20 +41,20 @@ with other multithreaded code. ## Keyword arguments: -- `nchunks::Integer` (default `2 * nthreads(threadpool)`): +- `nchunks::Integer` or `ntasks::Integer` (default `2 * nthreads(threadpool)`): * Determines the number of chunks (and thus also the number of parallel tasks). * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing. * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. - `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). - * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer). + * The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be a positive integer). - `split::Symbol` (default `:batch`): * Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. * Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**! - `chunking::Bool` (default `true`): * Controls whether input elements are grouped into chunks (`true`) or not (`false`). - * For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! + * For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! - `threadpool::Symbol` (default `:default`): * Possible options are `:default` and `:interactive`. * The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes. @@ -63,8 +65,8 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler chunksize::Int split::Symbol - function DynamicScheduler( - threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true) + function DynamicScheduler(threadpool::Symbol, nchunks::Integer, chunksize::Integer, + split::Symbol; chunking::Bool = true) if !(threadpool in (:default, :interactive)) throw(ArgumentError("threadpool must be either :default or :interactive")) end @@ -72,10 +74,10 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler C = NoChunking else if !(nchunks > 0 || chunksize > 0) - throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + throw(ArgumentError("Either nchunks/ntasks or chunksize must be a positive integer (or chunking=false).")) end if nchunks > 0 && chunksize > 0 - throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + throw(ArgumentError("nchunks/ntasks and chunksize are mutually exclusive and only one of them may be a positive integer")) end C = chunksize > 0 ? FixedSize : FixedCount end @@ -85,8 +87,9 @@ end function DynamicScheduler(; threadpool::Symbol = :default, - nchunks::Union{Integer, Nothing} = nothing, - chunksize::Union{Integer, Nothing} = nothing, + nchunks::MaybeInteger = nothing, + ntasks::MaybeInteger = nothing, # "alias" for nchunks + chunksize::MaybeInteger = nothing, chunking::Bool = true, split::Symbol = :batch) if !chunking @@ -94,11 +97,12 @@ function DynamicScheduler(; chunksize = -1 else # only choose nchunks default if chunksize hasn't been specified - if isnothing(nchunks) && isnothing(chunksize) + if isnothing(nchunks) && isnothing(chunksize) && isnothing(ntasks) nchunks = 2 * nthreads(threadpool) chunksize = -1 else - nchunks = isnothing(nchunks) ? -1 : nchunks + nchunks = !isnothing(nchunks) ? nchunks : + !isnothing(ntasks) ? ntasks : -1 chunksize = isnothing(chunksize) ? -1 : chunksize end end @@ -124,16 +128,16 @@ Isn't well composable with other multithreaded code though. ## Keyword arguments: -- `nchunks::Integer` (default `nthreads()`): +- `nchunks::Integer` or `ntasks::Integer` (default `nthreads()`): * Determines the number of chunks (and thus also the number of parallel tasks). * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. * For `nchunks > nthreads()` the chunks will be distributed to the available threads in a round-robin fashion. - `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). - * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero). + * The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be non-zero). - `chunking::Bool` (default `true`): * Controls whether input elements are grouped into chunks (`true`) or not (`false`). - * For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! + * For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! - `split::Symbol` (default `:batch`): * Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. @@ -144,16 +148,16 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler chunksize::Int split::Symbol - function StaticScheduler( - nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true) + function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Symbol; + chunking::Bool = true) if !chunking C = NoChunking else if !(nchunks > 0 || chunksize > 0) - throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + throw(ArgumentError("Either nchunks/ntasks or chunksize must be a positive integer (or chunking=false).")) end if nchunks > 0 && chunksize > 0 - throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + throw(ArgumentError("nchunks/ntasks and chunksize are mutually exclusive and only one of them may be a positive integer")) end C = chunksize > 0 ? FixedSize : FixedCount end @@ -162,8 +166,9 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler end function StaticScheduler(; - nchunks::Union{Integer, Nothing} = nothing, - chunksize::Union{Integer, Nothing} = nothing, + nchunks::MaybeInteger = nothing, + ntasks::MaybeInteger = nothing, # "alias" for nchunks + chunksize::MaybeInteger = nothing, chunking::Bool = true, split::Symbol = :batch) if !chunking @@ -171,11 +176,12 @@ function StaticScheduler(; chunksize = -1 else # only choose nchunks default if chunksize hasn't been specified - if isnothing(nchunks) && isnothing(chunksize) + if isnothing(nchunks) && isnothing(chunksize) && isnothing(ntasks) nchunks = nthreads(:default) chunksize = -1 else - nchunks = isnothing(nchunks) ? -1 : nchunks + nchunks = !isnothing(nchunks) ? nchunks : + !isnothing(ntasks) ? ntasks : -1 chunksize = isnothing(chunksize) ? -1 : chunksize end end @@ -238,4 +244,14 @@ chunking_mode(::Type{SerialScheduler}) = NoChunking chunking_enabled(s::Scheduler) = chunking_enabled(typeof(s)) chunking_enabled(::Type{S}) where {S <: Scheduler} = chunking_mode(S) != NoChunking +scheduler_from_symbol(s::Symbol; kwargs...) = scheduler_from_symbol(Val{s}; kwargs...) +scheduler_from_symbol(::Type{Val{:static}}; kwargs...) = StaticScheduler(; kwargs...) +scheduler_from_symbol(::Type{Val{:dynamic}}; kwargs...) = DynamicScheduler(; kwargs...) +scheduler_from_symbol(::Type{Val{:greedy}}; kwargs...) = GreedyScheduler(; kwargs...) +scheduler_from_symbol(::Type{Val{:serial}}; kwargs...) = SerialScheduler(; kwargs...) +function scheduler_from_symbol(::Type{Val{T}}; kwargs...) where {T} + # fallback + throw(ArgumentError("unkown scheduler symbol :$T")) +end + end # module From 1b4c6e0530c022823ae1fcfe45b88caac03b63b1 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 17:12:42 +0100 Subject: [PATCH 02/17] tmp generic --- src/implementation.jl | 110 ++++++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/src/implementation.jl b/src/implementation.jl index 80a93e3f..6119df59 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -26,13 +26,17 @@ function auto_disable_chunking_warning() "or pass in `collect(chunks(...))`.") end -function _chunks(sched, arg; kwargs...) +function _chunks(sched, arg) C = chunking_mode(sched) @assert C != NoChunking if C == FixedCount - chunks(arg; n = sched.nchunks, split = sched.split, kwargs...) + chunks(arg; + n = sched.nchunks, + split = sched.split)::ChunkSplitters.Chunk{typeof(arg), ChunkSplitters.FixedCount} elseif C == FixedSize - chunks(arg; size = sched.chunksize, split = sched.split, kwargs...) + chunks(arg; + size = sched.chunksize, + split = sched.split)::ChunkSplitters.Chunk{typeof(arg), ChunkSplitters.FixedSize} end end @@ -41,26 +45,20 @@ const MaybeInteger = Union{Nothing, Integer} function tmapreduce(f, op, Arrs...; scheduler::MaybeScheduler = nothing, - ntasks::MaybeInteger = nothing, outputtype::Type = Any, - mapreduce_kwargs...) - min_kwarg_len = haskey(mapreduce_kwargs, :init) ? 1 : 0 - if length(mapreduce_kwargs) > min_kwarg_len - tmapreduce_kwargs_err(; mapreduce_kwargs...) - end - isnothing(ntasks) || ntasks > 0 || - throw(ArgumentError("ntasks must be a positive integer.")) + init = nothing, + kwargs...) + mapreduce_kwargs = !isnothing(init) ? (; init = kwargs.init) : (;) if scheduler isa Scheduler - isnothing(ntasks) || - throw(ArgumentError("providing an explicit scheduler as well as ntasks is currently not supported.")) + isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) _scheduler = scheduler elseif scheduler isa Symbol - _scheduler = scheduler_from_symbol(scheduler; ntasks) + _scheduler = scheduler_from_symbol(scheduler; kwargs...) else # scheduler == nothing - _scheduler = DynamicScheduler(; ntasks) + _scheduler = DynamicScheduler(; kwargs...) end - @show _scheduler + # @show _scheduler if _scheduler isa SerialScheduler mapreduce(f, op, Arrs...; mapreduce_kwargs...) else @@ -68,8 +66,9 @@ function tmapreduce(f, op, Arrs...; end end -@noinline function tmapreduce_kwargs_err(; init = nothing, kwargs...) - error("got unsupported keyword arguments: $((;kwargs...,)) ") +@noinline function scheduler_and_kwargs_err(; kwargs...) + kwargstr = join(string.(keys(kwargs)), ", ") + throw(ArgumentError("Providing an explicit scheduler as well as direct keyword arguments (e.g. $(kwargstr)) is currently not supported.")) end treducemap(op, f, A...; kwargs...) = tmapreduce(f, op, A...; kwargs...) @@ -253,33 +252,44 @@ end function tmap(f, A::Union{AbstractArray, ChunkSplitters.Chunk}, _Arrs::AbstractArray...; - scheduler::Scheduler = DynamicScheduler(), + scheduler::MaybeScheduler = nothing, kwargs...) - if scheduler isa GreedyScheduler + if scheduler isa Scheduler + isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) + _scheduler = scheduler + elseif scheduler isa Symbol + _scheduler = scheduler_from_symbol(scheduler; kwargs...) + else # scheduler == nothing + _scheduler = DynamicScheduler(; kwargs...) + end + + if _scheduler isa GreedyScheduler error("Greedy scheduler isn't supported with `tmap` unless you provide an `OutputElementType` argument, since the greedy schedule requires a commutative reducing operator.") end - if chunking_enabled(scheduler) && hasfield(typeof(scheduler), :split) && - scheduler.split != :batch - error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $scheduler)") + if chunking_enabled(_scheduler) && hasfield(typeof(_scheduler), :split) && + _scheduler.split != :batch + error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $_scheduler)") end - if A isa ChunkSplitters.Chunk && chunking_enabled(scheduler) + if A isa ChunkSplitters.Chunk && chunking_enabled(_scheduler) auto_disable_chunking_warning() - if scheduler isa DynamicScheduler - scheduler = DynamicScheduler(; nchunks = 0, scheduler.threadpool) - elseif scheduler isa StaticScheduler - scheduler = StaticScheduler(; nchunks = 0) + if _scheduler isa DynamicScheduler + _scheduler = DynamicScheduler(; + threadpool = _scheduler.threadpool, + chunking = false) + elseif _scheduler isa StaticScheduler + _scheduler = StaticScheduler(; chunking = false) else error("Can't disable chunking for this scheduler?! Shouldn't be reached.", - scheduler) + _scheduler) end end Arrs = (A, _Arrs...) - if scheduler isa SerialScheduler + if _scheduler isa SerialScheduler map(f, Arrs...; kwargs...) else check_all_have_same_indices(Arrs) - @noinline _tmap(scheduler, f, A, _Arrs...; kwargs...) + @noinline _tmap(_scheduler, f, A, _Arrs...) end end @@ -287,8 +297,7 @@ end function _tmap(scheduler::DynamicScheduler{NoChunking}, f, A::AbstractArray, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...;) (; threadpool) = scheduler Arrs = (A, _Arrs...) tasks = map(eachindex(A)) do i @@ -305,8 +314,7 @@ end function _tmap(scheduler::DynamicScheduler{NoChunking}, f, A::ChunkSplitters.Chunk, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...) (; threadpool) = scheduler tasks = map(A) do idcs @spawn threadpool promise_task_local(f)(idcs) @@ -318,8 +326,7 @@ end function _tmap(scheduler::StaticScheduler{NoChunking}, f, A::ChunkSplitters.Chunk, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...) nt = nthreads() tasks = map(enumerate(A)) do (c, idcs) tid = @inbounds nthtid(mod1(c, nt)) @@ -332,8 +339,7 @@ end function _tmap(scheduler::StaticScheduler{NoChunking}, f, A::AbstractArray, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...;) Arrs = (A, _Arrs...) nt = nthreads() tasks = map(enumerate(A)) do (c, i) @@ -351,8 +357,7 @@ end function _tmap(scheduler::Scheduler, f, A::AbstractArray, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...) Arrs = (A, _Arrs...) idcs = collect(_chunks(scheduler, A)) reduction_f = append!! @@ -362,7 +367,7 @@ function _tmap(scheduler::Scheduler, map(f, args...) end end - v = tmapreduce(mapping_f, reduction_f, idcs; scheduler, kwargs...) + v = tmapreduce(mapping_f, reduction_f, idcs; scheduler) reshape(v, size(A)...) end @@ -370,14 +375,23 @@ end out, A::AbstractArray, _Arrs::AbstractArray...; - scheduler::Scheduler = DynamicScheduler(), + scheduler::MaybeScheduler = nothing, kwargs...) - if hasfield(typeof(scheduler), :split) && scheduler.split != :batch - error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $scheduler)") + if scheduler isa Scheduler + isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) + _scheduler = scheduler + elseif scheduler isa Symbol + _scheduler = scheduler_from_symbol(scheduler; kwargs...) + else # scheduler == nothing + _scheduler = DynamicScheduler(; kwargs...) + end + + if hasfield(typeof(_scheduler), :split) && _scheduler.split != :batch + error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $_scheduler)") end Arrs = (A, _Arrs...) - if scheduler isa SerialScheduler - map!(f, out, Arrs...; kwargs...) + if _scheduler isa SerialScheduler + map!(f, out, Arrs...) else @boundscheck check_all_have_same_indices((out, Arrs...)) mapping_f = maybe_rewrap(f) do f @@ -387,7 +401,7 @@ end out[i] = res end end - @noinline tforeach(mapping_f, eachindex(out); scheduler, kwargs...) + @noinline tforeach(mapping_f, eachindex(out); scheduler = _scheduler) out end end From 2224680cf4a40cde43f39d270aacc59c14495645 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 18:48:29 +0100 Subject: [PATCH 03/17] tests passing --- src/implementation.jl | 39 +++++++++++++++++++-------------------- src/schedulers.jl | 6 ++++++ test/runtests.jl | 10 +++++----- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/implementation.jl b/src/implementation.jl index 6119df59..11d6d3b1 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -1,22 +1,22 @@ module Implementation import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect - using OhMyThreads: chunks, @spawn, @spawnat, WithTaskLocals, promise_task_local using OhMyThreads.Tools: nthtid using OhMyThreads: Scheduler, - DynamicScheduler, StaticScheduler, GreedyScheduler, - SerialScheduler + DynamicScheduler, StaticScheduler, GreedyScheduler, + SerialScheduler using OhMyThreads.Schedulers: chunking_enabled, - chunking_mode, ChunkingMode, NoChunking, - FixedSize, FixedCount, scheduler_from_symbol + chunking_mode, ChunkingMode, NoChunking, + FixedSize, FixedCount, scheduler_from_symbol, NotGiven, + isgiven using Base: @propagate_inbounds using Base.Threads: nthreads, @threads - using BangBang: append!! - using ChunkSplitters: ChunkSplitters +const MaybeScheduler = Union{NotGiven, Scheduler, Symbol} + include("macro_impl.jl") function auto_disable_chunking_warning() @@ -32,29 +32,28 @@ function _chunks(sched, arg) if C == FixedCount chunks(arg; n = sched.nchunks, - split = sched.split)::ChunkSplitters.Chunk{typeof(arg), ChunkSplitters.FixedCount} + split = sched.split)::ChunkSplitters.Chunk{ + typeof(arg), ChunkSplitters.FixedCount} elseif C == FixedSize chunks(arg; size = sched.chunksize, - split = sched.split)::ChunkSplitters.Chunk{typeof(arg), ChunkSplitters.FixedSize} + split = sched.split)::ChunkSplitters.Chunk{ + typeof(arg), ChunkSplitters.FixedSize} end end -const MaybeScheduler = Union{Nothing, Scheduler, Symbol} -const MaybeInteger = Union{Nothing, Integer} - function tmapreduce(f, op, Arrs...; - scheduler::MaybeScheduler = nothing, + scheduler::MaybeScheduler = NotGiven(), outputtype::Type = Any, - init = nothing, + init = NotGiven(), kwargs...) - mapreduce_kwargs = !isnothing(init) ? (; init = kwargs.init) : (;) + mapreduce_kwargs = isgiven(init) ? (; init) : (;) if scheduler isa Scheduler isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) _scheduler = scheduler elseif scheduler isa Symbol _scheduler = scheduler_from_symbol(scheduler; kwargs...) - else # scheduler == nothing + else # default fallback _scheduler = DynamicScheduler(; kwargs...) end @@ -252,14 +251,14 @@ end function tmap(f, A::Union{AbstractArray, ChunkSplitters.Chunk}, _Arrs::AbstractArray...; - scheduler::MaybeScheduler = nothing, + scheduler::MaybeScheduler = NotGiven(), kwargs...) if scheduler isa Scheduler isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) _scheduler = scheduler elseif scheduler isa Symbol _scheduler = scheduler_from_symbol(scheduler; kwargs...) - else # scheduler == nothing + else # default fallback _scheduler = DynamicScheduler(; kwargs...) end @@ -375,14 +374,14 @@ end out, A::AbstractArray, _Arrs::AbstractArray...; - scheduler::MaybeScheduler = nothing, + scheduler::MaybeScheduler = NotGiven(), kwargs...) if scheduler isa Scheduler isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) _scheduler = scheduler elseif scheduler isa Symbol _scheduler = scheduler_from_symbol(scheduler; kwargs...) - else # scheduler == nothing + else # default fallback _scheduler = DynamicScheduler(; kwargs...) end diff --git a/src/schedulers.jl b/src/schedulers.jl index c970ba53..2c44ceb3 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -2,6 +2,12 @@ module Schedulers using Base.Threads: nthreads +# Used to indicate that a keyword argument has not been set by the user. +# We don't use Nothing because nothing maybe sometimes be a valid user input (e.g. for init) +struct NotGiven end +isgiven(::NotGiven) = false +isgiven(::T) where {T} = true + const MaybeInteger = Union{Integer, Nothing} """ diff --git a/test/runtests.jl b/test/runtests.jl index 3eee363b..49139cac 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -61,7 +61,7 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, end end end -end +end; @testset "ChunkSplitters.Chunk" begin x = rand(100) @@ -76,7 +76,7 @@ end @test isnothing(tforeach(x -> sin.(x), chnks; scheduler)) end end -end +end; @testset "macro API" begin # basic @@ -195,7 +195,7 @@ end @set reducer=+ C.x end) == 10*var -end +end; @testset "WithTaskLocals" begin let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) @@ -234,7 +234,7 @@ end @test @fetch(h()) == (4, 4) @test @fetch(h()) == (5, 5) end -end +end; @testset "chunking mode + chunksize option" begin for sched in (DynamicScheduler, StaticScheduler) @@ -262,6 +262,6 @@ end @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) end end -end +end; # Todo way more testing, and easier tests to deal with From 925ad62dd86b07febe6652718ceba3a0f2b6df8c Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 19:07:57 +0100 Subject: [PATCH 04/17] tests --- test/runtests.jl | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/test/runtests.jl b/test/runtests.jl index 49139cac..9892e477 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -264,4 +264,35 @@ end; end end; +@testset "top-level kwargs" begin + res_tmr = mapreduce(sin, +, 1:10000) + + # scheduler not given + @test tmapreduce(sin, +, 1:10000; ntasks=2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks=2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; split=:scatter) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize=2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking=false) ≈ res_tmr + + # scheduler isa Scheduler + @test tmapreduce(sin, +, 1:10000; scheduler=StaticScheduler()) ≈ res_tmr + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=DynamicScheduler()) + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=DynamicScheduler()) + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; split=:scatter, scheduler=StaticScheduler()) + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=SerialScheduler()) + + # scheduler isa Symbol + for s in (:dynamic, :static, :serial, :greedy) + @test tmapreduce(sin, +, 1:10000; scheduler=s) ≈ res_tmr + end + for s in (:dynamic, :static, :greedy) + @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s) ≈ res_tmr + end + for s in (:dynamic, :static) + @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking=false, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks=3, scheduler=s) ≈ res_tmr + end +end; + # Todo way more testing, and easier tests to deal with From 045c40049fdee71530f41e594519dcea63301fb4 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 20:34:23 +0100 Subject: [PATCH 05/17] macro API update --- src/macro_impl.jl | 82 ++++++++++++++++++++++------------------------- test/runtests.jl | 36 +++++++++++++++++++++ 2 files changed, 75 insertions(+), 43 deletions(-) diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 74ce0287..87a9775a 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -27,35 +27,46 @@ function tasks_macro(forex) make_mapping_function = if isempty(tls_names) :(local function mapping_function($itvar,) - $(forbody) - end) + $(forbody) + end) else :(local mapping_function = WithTaskLocals(($(tls_names...),)) do ($(locals_names...),) - function mapping_function_local($itvar,) - $(forbody) - end - end) + function mapping_function_local($itvar,) + $(forbody) + end + end) end - q = if !isnothing(settings.reducer) + q = if isgiven(settings.reducer) quote $make_mapping_function - tmapreduce(mapping_function, $(settings.reducer), $(itrng); scheduler = $(settings.scheduler)) + tmapreduce(mapping_function, $(settings.reducer), + $(itrng)) end - elseif settings.collect + elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote $make_mapping_function - tmap(mapping_function, $(itrng); scheduler = $(settings.scheduler)) + tmap(mapping_function, $(itrng)) end else maybe_warn_useless_init(settings) quote $make_mapping_function - tforeach(mapping_function, $(itrng); scheduler = $(settings.scheduler)) + tforeach(mapping_function, $(itrng)) end end + # insert keyword arguments into the function call + kwexpr = :($(Expr(:parameters))) + if isgiven(settings.scheduler) + push!(kwexpr.args, Expr(:kw, :scheduler, settings.scheduler)) + end + for (k, v) in settings.kwargs + push!(kwexpr.args, Expr(:kw, k, v)) + end + insert!(q.args[4].args, 2, kwexpr) + # wrap everything in a let ... end block # and, potentially, define the `TaskLocalValue`s. result = :(let @@ -71,27 +82,17 @@ function tasks_macro(forex) end function maybe_warn_useless_init(settings) - !isnothing(settings.init) && + isgiven(settings.init) && @warn("The @set init = ... settings won't have any effect because no reduction is performed.") end Base.@kwdef mutable struct Settings - scheduler::Expr = :(DynamicScheduler()) - reducer::Union{Expr, Symbol, Nothing} = nothing - collect::Bool = false - init::Union{Expr, Symbol, Nothing} = nothing -end - -function _sym2scheduler(s) - if s == :dynamic - :(DynamicScheduler()) - elseif s == :static - :(StaticScheduler()) - elseif s == :greedy - :(GreedyScheduler()) - else - throw(ArgumentError("Unknown scheduler symbol.")) - end + # scheduler::Expr = :(DynamicScheduler()) + scheduler::Union{Expr, QuoteNode, NotGiven} = NotGiven() + reducer::Union{Expr, Symbol, NotGiven} = NotGiven() + collect::Union{Bool, NotGiven} = NotGiven() + init::Union{Expr, Symbol, NotGiven} = NotGiven() + kwargs::Vector{Pair{Symbol, Any}} = Pair{Symbol, Any}[] end function _maybe_handle_atlocal_block!(args) @@ -119,7 +120,7 @@ function _unfold_atlocal_block(ex) for x in tlsexprs localb, localn = _atlocal_assign_to_exprs(x) push!(locals_before, localb) - push!(locals_names, localn) + push!(locals_names, localn) end else throw(ErrorException("Wrong usage of @local. You must either provide a typed assignment or multiple typed assignments in a `begin ... end` block.")) @@ -159,7 +160,7 @@ function _maybe_handle_atset_block!(settings, args) end deleteat!(args, idcs) # check incompatible settings - if settings.collect && !isnothing(settings.reducer) + if isgiven(settings.collect) && settings.collect && isgiven(settings.reducer) throw(ArgumentError("Specifying both collect and reducer isn't supported.")) end end @@ -169,20 +170,15 @@ function _handle_atset_single_assign!(settings, ex) throw(ErrorException("Wrong usage of @set. Expected assignment, e.g. `scheduler = StaticScheduler()`.")) end sym = ex.args[1] - if !hasfield(Settings, sym) - throw(ArgumentError("Unknown setting \"$(sym)\". Must be ∈ $(fieldnames(Settings)).")) - end def = ex.args[2] - if sym == :collect && !(def isa Bool) - throw(ArgumentError("Setting collect can only be true or false.")) - #TODO support specifying the OutputElementType - end - def = if def isa QuoteNode - _sym2scheduler(def.value) - elseif def isa Bool - def + if hasfield(Settings, sym) + if sym == :collect && !(def isa Bool) + throw(ArgumentError("Setting collect can only be true or false.")) + #TODO support specifying the OutputElementType + end + def = def isa Bool ? def : esc(def) + setfield!(settings, sym, def) else - esc(def) + push!(settings.kwargs, sym => def) end - setfield!(settings, sym, def) end diff --git a/test/runtests.jl b/test/runtests.jl index 9892e477..72fe9741 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -141,6 +141,42 @@ end; i end) == (55.0 + 0.0im) + # top-level "kwargs" + @test @tasks(for i in 1:3 + @set scheduler=:static + @set ntasks=1 + i + end) |> isnothing + @test @tasks(for i in 1:3 + @set scheduler=:static + @set nchunks=2 + i + end) |> isnothing + @test @tasks(for i in 1:3 + @set scheduler=:dynamic + @set chunksize=2 + i + end) |> isnothing + @test @tasks(for i in 1:3 + @set scheduler=:dynamic + @set chunking=false + i + end) |> isnothing + @test_throws ArgumentError @tasks(for i in 1:3 + @set scheduler=DynamicScheduler() + @set chunking=false + i + end) + @test_throws MethodError @tasks(for i in 1:3 + @set scheduler=:serial + @set chunking=false + i + end) + @test_throws MethodError @tasks(for i in 1:3 + @set scheduler=:dynamic + @set asd=123 + i + end) # TaskLocalValue ntd = 2*Threads.nthreads() From 7c0d1a2305ffdbd17a43a14f7735e389dec379d4 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 20:39:42 +0100 Subject: [PATCH 06/17] fix init bug --- src/macro_impl.jl | 3 +++ test/runtests.jl | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 87a9775a..3d5ca1e2 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -62,6 +62,9 @@ function tasks_macro(forex) if isgiven(settings.scheduler) push!(kwexpr.args, Expr(:kw, :scheduler, settings.scheduler)) end + if isgiven(settings.init) + push!(kwexpr.args, Expr(:kw, :init, settings.init)) + end for (k, v) in settings.kwargs push!(kwexpr.args, Expr(:kw, k, v)) end diff --git a/test/runtests.jl b/test/runtests.jl index 72fe9741..df088ee4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -132,14 +132,14 @@ end; init=0.0 end i - end) == 55.0 + end) === 55.0 @test @tasks(for i in 1:10 @set begin reducer=(+) init=0.0*im end i - end) == (55.0 + 0.0im) + end) === (55.0 + 0.0im) # top-level "kwargs" @test @tasks(for i in 1:3 From 83ec2899fc6e1443e4ce426123993d6d66ab77a1 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 21:33:47 +0100 Subject: [PATCH 07/17] fixes + docstrings --- src/functions.jl | 97 +++++++++++++++++++++++++++++++++++++---------- src/macro_impl.jl | 4 +- src/macros.jl | 36 +++++++++++++----- src/schedulers.jl | 8 ++++ test/runtests.jl | 10 ++++- 5 files changed, 122 insertions(+), 33 deletions(-) diff --git a/src/functions.jl b/src/functions.jl index 4b767ca3..97436ac0 100644 --- a/src/functions.jl +++ b/src/functions.jl @@ -1,6 +1,6 @@ """ tmapreduce(f, op, A::AbstractArray...; - [scheduler::Scheduler = DynamicScheduler()], + [scheduler::Union{Scheduler, Symbol} = :dynamic], [outputtype::Type = Any], [init]) @@ -27,15 +27,23 @@ is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. -- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. -- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. +- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. +- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation. + +In addition, `tmapreduce` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tmapreduce end """ treducemap(op, f, A::AbstractArray...; - [scheduler::Scheduler = DynamicScheduler()], + [scheduler::Union{Scheduler, Symbol} = :dynamic], [outputtype::Type = Any], [init]) @@ -52,7 +60,7 @@ will get undefined results. ## Example: ``` -tmapreduce(√, +, [1, 2, 3, 4, 5]) +treducemap(+, √, [1, 2, 3, 4, 5]) ``` is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form @@ -63,15 +71,23 @@ is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. -- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. -- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. +- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. +- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation. + +In addition, `treducemap` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function treducemap end """ treduce(op, A::AbstractArray...; - [scheduler::Scheduler = DynamicScheduler()], + [scheduler::Union{Scheduler, Symbol} = :dynamic], [outputtype::Type = Any], [init]) @@ -97,15 +113,23 @@ is the parallelized version of `sum([1, 2, 3, 4, 5])` in the form ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. -- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. -- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. +- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. +- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation. + +In addition, `treduce` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function treduce end """ tforeach(f, A::AbstractArray...; - [schedule::Scheduler = DynamicScheduler()]) :: Nothing + [schedule::Union{Scheduler, Symbol} = :dynamic]) :: Nothing A multithreaded function like `Base.foreach`. Apply `f` to each element of `A` on multiple parallel tasks, and return `nothing`. I.e. it is the parallel equivalent of @@ -126,13 +150,23 @@ end ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tforeach` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tforeach(1:10; chunksize=2, scheduler=:static) do i + println(i^2) +end +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tforeach end """ tmap(f, [OutputElementType], A::AbstractArray...; - [schedule::Scheduler = DynamicScheduler()]) + [schedule::Union{Scheduler, Symbol} = :dynamic]) A multithreaded function like `Base.map`. Create a new container `similar` to `A` and fills it in parallel such that the `i`th element is equal to `f(A[i])`. @@ -149,26 +183,39 @@ tmap(sin, 1:10) ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tmap` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tmap(sin, 1:10; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tmap end """ tmap!(f, out, A::AbstractArray...; - [schedule::Scheduler = DynamicScheduler()]) + [schedule::Union{Scheduler, Symbol} = :dynamic]) A multithreaded function like `Base.map!`. In parallel on multiple tasks, this function assigns each element of `out[i] = f(A[i])` for each index `i` of `A` and `out`. ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tmap!` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tmap! end """ tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}}; - [schedule::Scheduler = DynamicScheduler()]) + [schedule::Union{Scheduler, Symbol} = :dynamic]) A multithreaded function like `Base.collect`. Essentially just calls `tmap` on the generator function and inputs. @@ -185,6 +232,14 @@ tcollect(sin(i) for i in 1:10) ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tcollect` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tcollect end diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 5a71dae1..b4e74a58 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -140,7 +140,7 @@ is frozen for the full lifetime of the TLV, so and `eval` can't change the outco potential problems from the type being maximally narrow and then them trying to write a value of another type to it 3) the task local value is not user-observable. we never let the user inspect its type, unless they themselves are using `code____` tools to inspect the generated code, hence if inference changes and gives a more or less precise -type, there's no observable semantic changes, just performance increases or decreases. +type, there's no observable semantic changes, just performance increases or decreases. =# function _atlocal_assign_to_exprs(ex) left_ex = ex.args[1] @@ -198,6 +198,6 @@ function _handle_atset_single_assign!(settings, ex) def = def isa Bool ? def : esc(def) setfield!(settings, sym, def) else - push!(settings.kwargs, sym => def) + push!(settings.kwargs, sym => esc(def)) end end diff --git a/src/macros.jl b/src/macros.jl index 3b8629e5..87bdf2e1 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -35,17 +35,27 @@ end end ``` +```julia +@tasks for i in 1:100 + @set ntasks=4*nthreads() + # non-uniform work... +end +``` + ```julia @tasks for i in 1:5 @set scheduler=:static println("i=", i, " → ", threadid()) end - ``` + ```julia @tasks for i in 1:100 - @set scheduler=DynamicScheduler(; nchunks=4*nthreads()) - # non-uniform work... + @set begin + scheduler=:static + chunksize=10 + end + println("i=", i, " → ", threadid()) end ``` """ @@ -64,10 +74,18 @@ Multiple settings are supported, either as separate `@set` statements or via ## Settings -* `scheduler` (e.g. `scheduler=:static`): Can be either a [`Scheduler`](@ref) or a `Symbol` (e.g. `:dynamic` or `:static`) * `reducer` (e.g. `reducer=+`): Indicates that a reduction should be performed with the provided binary function. See [`tmapreduce`](@ref) for more information. * `collect` (e.g. `collect=true`): Indicates that results should be collected (similar to `map`). + +All other settings will be passed on to the underlying parallel functions (e.g. [tmapreduce](@ref)) +as keyword arguments. Hence, you may provide whatever these functions accept as +keyword arguments. Among others, this includes + +* `scheduler` (e.g. `scheduler=:static`): Can be either a [`Scheduler`](@ref) or a `Symbol` (e.g. `:dynamic`, `:static`, `:serial`, or `:greedy`). * `init` (e.g. `init=0.0`): Initial value to be used in a reduction (requires `reducer=...`). + +Settings like `ntasks`, `chunksize`, and `split` etc. can be used to tune the scheduling policy (if the selected scheduler supports it). + """ macro set(args...) error("The @set macro may only be used inside of a @tasks block.") @@ -78,29 +96,29 @@ end @local name = value @local name::T = value - + Can be used inside a `@tasks for ... end` block to specify [task-local values](@ref TLS) (TLV) via explicitly typed assignments. These values will be allocated once per task (rather than once per iteration) and can be re-used between different task-local iterations. - + There can only be a single `@local` block in a `@tasks for ... end` block. To specify multiple TLVs, use `@local begin ... end`. Compared to regular assignments, there are some limitations though, e.g. TLVs can't reference each other. ## Examples - + ```julia using OhMyThreads.Tools: taskid @tasks for i in 1:10 @set scheduler=DynamicScheduler(; nchunks=2) @local x = zeros(3) # TLV - + x .+= 1 println(taskid(), " -> ", x) end ``` - + ```julia @tasks for i in 1:10 @local begin diff --git a/src/schedulers.jl b/src/schedulers.jl index 2c44ceb3..f72687a1 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -37,6 +37,8 @@ function _chunkingstr(s::Scheduler) end """ + DynamicScheduler (aka :dynamic) + The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, @@ -123,6 +125,8 @@ function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler end """ + StaticScheduler (aka :static) + A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made *sticky*, that is, @@ -202,6 +206,8 @@ function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::StaticScheduler) end """ + GreedyScheduler (aka :greedy) + A greedy dynamic scheduler. The elements of the collection are first put into a `Channel` and then dynamic, non-sticky tasks are spawned to process channel content in parallel. @@ -234,6 +240,8 @@ function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::GreedyScheduler) end """ + SerialScheduler (aka :serial) + A scheduler for turning off any multithreading and running the code in serial. It aims to make parallel functions like, e.g., `tmapreduce(sin, +, 1:100)` behave like their serial counterparts, e.g., `mapreduce(sin, +, 1:100)`. diff --git a/test/runtests.jl b/test/runtests.jl index b9058f8e..cc434ec2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -211,17 +211,25 @@ end; @set reducer = (+) sum(C * x) end)() == 1800 - + # hygiene / escaping var = 3 sched = StaticScheduler() + sched_sym = :static data = rand(10) red = (a,b) -> a+b + n = 2 @test @tasks(for d in data @set scheduler=sched @set reducer=red var * d end) ≈ var * sum(data) + @test @tasks(for d in data + @set scheduler=sched_sym + @set ntasks=n + @set reducer=red + var * d + end) ≈ var * sum(data) struct SingleInt x::Int From 0fc7bdf8bf7d29088b357652bebb3ec9690f5a6c Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 21:43:07 +0100 Subject: [PATCH 08/17] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index baa8db0c..cc1f6bab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,12 @@ OhMyThreads.jl Changelog Version 0.5.0 ------------- +- ![Feature][badge-feature] The parallel functions (e.g. tmapreduce etc.) now support `scheduler::Symbol` besides `scheduler::Scheduler`. To configure the selected scheduler (e.g. set `nchunks` etc.) one may now pass keyword arguments directly into the parallel functions (they will get passed on to the scheduler constructor). Example: `tmapreduce(sin, +, 1:10; chunksize=2, scheduler=:static)`. Analogous support has been added to the macro API: (Most) settings (`@set name = value`) will now be passed on to the parallel functions as keyword arguments (which then forward them to the scheduler constructor). Note that, to avoid ambiguity, we don't support this feature for `scheduler::Scheduler` but only for `scheduler::Symbol`. - ![Feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading. - ![Feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`) - ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error). - ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise). +- ![Enhancement][badge-enhancement] `SerialScheduler` and `DynamicScheduler` now support the keyword argument `ntasks` as an alias for `nchunks`. - ![Enhancement][badge-enhancement] Made `@tasks` use `OhMyThreads.WithTaskLocals` automatically as an optimization. - ![Enhancement][badge-enhancement] Uses of `@local` within `@tasks` no-longer require users to declare the type of the task local value, it can be inferred automatically if a type is not provided. - ![BREAKING][badge-breaking] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. (This is unlikely to break existing code but technically could because the type parameter has changed from `Bool` to `ChunkingMode`.) From 70b42ce8d6a13c595d3a143371d19bcbf05fb515 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 22:02:52 +0100 Subject: [PATCH 09/17] dont allow ntasks and nchunks at the same time --- src/schedulers.jl | 6 ++++++ test/runtests.jl | 2 ++ 2 files changed, 8 insertions(+) diff --git a/src/schedulers.jl b/src/schedulers.jl index f72687a1..b2d6aeeb 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -109,6 +109,9 @@ function DynamicScheduler(; nchunks = 2 * nthreads(threadpool) chunksize = -1 else + if !isnothing(nchunks) && !isnothing(ntasks) + throw(ArgumentError("nchunks and ntasks are aliases and only one may be provided")) + end nchunks = !isnothing(nchunks) ? nchunks : !isnothing(ntasks) ? ntasks : -1 chunksize = isnothing(chunksize) ? -1 : chunksize @@ -190,6 +193,9 @@ function StaticScheduler(; nchunks = nthreads(:default) chunksize = -1 else + if !isnothing(nchunks) && !isnothing(ntasks) + throw(ArgumentError("nchunks and ntasks are aliases and only one may be provided")) + end nchunks = !isnothing(nchunks) ? nchunks : !isnothing(ntasks) ? ntasks : -1 chunksize = isnothing(chunksize) ? -1 : chunksize diff --git a/test/runtests.jl b/test/runtests.jl index cc434ec2..c5a97c5d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -336,6 +336,8 @@ end; @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr @test tmapreduce(sin, +, 1:10000; chunking=false, scheduler=s) ≈ res_tmr @test tmapreduce(sin, +, 1:10000; nchunks=3, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=s) ≈ res_tmr + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, nchunks=2, scheduler=s) ≈ res_tmr end end; From 33bb0e6c5f8e181fe118e7d01e89a166bfb4b247 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 12 Mar 2024 22:06:36 +0100 Subject: [PATCH 10/17] Nothing -> NotGiven in scheduler.jl --- src/schedulers.jl | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/schedulers.jl b/src/schedulers.jl index b2d6aeeb..1009b703 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -8,7 +8,7 @@ struct NotGiven end isgiven(::NotGiven) = false isgiven(::T) where {T} = true -const MaybeInteger = Union{Integer, Nothing} +const MaybeInteger = Union{Integer, NotGiven} """ Supertype for all available schedulers: @@ -95,9 +95,9 @@ end function DynamicScheduler(; threadpool::Symbol = :default, - nchunks::MaybeInteger = nothing, - ntasks::MaybeInteger = nothing, # "alias" for nchunks - chunksize::MaybeInteger = nothing, + nchunks::MaybeInteger = NotGiven(), + ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks + chunksize::MaybeInteger = NotGiven(), chunking::Bool = true, split::Symbol = :batch) if !chunking @@ -105,16 +105,16 @@ function DynamicScheduler(; chunksize = -1 else # only choose nchunks default if chunksize hasn't been specified - if isnothing(nchunks) && isnothing(chunksize) && isnothing(ntasks) + if !isgiven(nchunks) && !isgiven(chunksize) && !isgiven(ntasks) nchunks = 2 * nthreads(threadpool) chunksize = -1 else - if !isnothing(nchunks) && !isnothing(ntasks) + if isgiven(nchunks) && isgiven(ntasks) throw(ArgumentError("nchunks and ntasks are aliases and only one may be provided")) end - nchunks = !isnothing(nchunks) ? nchunks : - !isnothing(ntasks) ? ntasks : -1 - chunksize = isnothing(chunksize) ? -1 : chunksize + nchunks = isgiven(nchunks) ? nchunks : + isgiven(ntasks) ? ntasks : -1 + chunksize = isgiven(chunksize) ? chunksize : -1 end end DynamicScheduler(threadpool, nchunks, chunksize, split; chunking) @@ -179,9 +179,9 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler end function StaticScheduler(; - nchunks::MaybeInteger = nothing, - ntasks::MaybeInteger = nothing, # "alias" for nchunks - chunksize::MaybeInteger = nothing, + nchunks::MaybeInteger = NotGiven(), + ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks + chunksize::MaybeInteger = NotGiven(), chunking::Bool = true, split::Symbol = :batch) if !chunking @@ -189,16 +189,16 @@ function StaticScheduler(; chunksize = -1 else # only choose nchunks default if chunksize hasn't been specified - if isnothing(nchunks) && isnothing(chunksize) && isnothing(ntasks) + if !isgiven(nchunks) && !isgiven(chunksize) && !isgiven(ntasks) nchunks = nthreads(:default) chunksize = -1 else - if !isnothing(nchunks) && !isnothing(ntasks) + if isgiven(nchunks) && isgiven(ntasks) throw(ArgumentError("nchunks and ntasks are aliases and only one may be provided")) end - nchunks = !isnothing(nchunks) ? nchunks : - !isnothing(ntasks) ? ntasks : -1 - chunksize = isnothing(chunksize) ? -1 : chunksize + nchunks = isgiven(nchunks) ? nchunks : + isgiven(ntasks) ? ntasks : -1 + chunksize = isgiven(chunksize) ? chunksize : -1 end end StaticScheduler(nchunks, chunksize, split; chunking) From eb530ca6ce88409317790196f4f70dfba4b6becd Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 10:29:22 +0100 Subject: [PATCH 11/17] try to fix CI (despite bug) --- src/implementation.jl | 34 ++++++++++++---------------------- test/runtests.jl | 4 ++-- 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/src/implementation.jl b/src/implementation.jl index 11d6d3b1..5d87ccbe 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -42,12 +42,7 @@ function _chunks(sched, arg) end end -function tmapreduce(f, op, Arrs...; - scheduler::MaybeScheduler = NotGiven(), - outputtype::Type = Any, - init = NotGiven(), - kwargs...) - mapreduce_kwargs = isgiven(init) ? (; init) : (;) +function _scheduler_from_userinput(scheduler::MaybeScheduler; kwargs...) if scheduler isa Scheduler isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) _scheduler = scheduler @@ -56,6 +51,15 @@ function tmapreduce(f, op, Arrs...; else # default fallback _scheduler = DynamicScheduler(; kwargs...) end +end + +function tmapreduce(f, op, Arrs...; + scheduler::MaybeScheduler = NotGiven(), + outputtype::Type = Any, + init = NotGiven(), + kwargs...) + mapreduce_kwargs = isgiven(init) ? (; init) : (;) + _scheduler = _scheduler_from_userinput(scheduler; kwargs...) # @show _scheduler if _scheduler isa SerialScheduler @@ -253,14 +257,7 @@ function tmap(f, _Arrs::AbstractArray...; scheduler::MaybeScheduler = NotGiven(), kwargs...) - if scheduler isa Scheduler - isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) - _scheduler = scheduler - elseif scheduler isa Symbol - _scheduler = scheduler_from_symbol(scheduler; kwargs...) - else # default fallback - _scheduler = DynamicScheduler(; kwargs...) - end + _scheduler = _scheduler_from_userinput(scheduler; kwargs...) if _scheduler isa GreedyScheduler error("Greedy scheduler isn't supported with `tmap` unless you provide an `OutputElementType` argument, since the greedy schedule requires a commutative reducing operator.") @@ -376,14 +373,7 @@ end _Arrs::AbstractArray...; scheduler::MaybeScheduler = NotGiven(), kwargs...) - if scheduler isa Scheduler - isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) - _scheduler = scheduler - elseif scheduler isa Symbol - _scheduler = scheduler_from_symbol(scheduler; kwargs...) - else # default fallback - _scheduler = DynamicScheduler(; kwargs...) - end + _scheduler = _scheduler_from_userinput(scheduler; kwargs...) if hasfield(typeof(_scheduler), :split) && _scheduler.split != :batch error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $_scheduler)") diff --git a/test/runtests.jl b/test/runtests.jl index c5a97c5d..70f5cabe 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -327,10 +327,10 @@ end; # scheduler isa Symbol for s in (:dynamic, :static, :serial, :greedy) - @test tmapreduce(sin, +, 1:10000; scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:100_000; scheduler=s) ≈ res_tmr end for s in (:dynamic, :static, :greedy) - @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:100_000; ntasks=2, scheduler=s) ≈ res_tmr end for s in (:dynamic, :static) @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr From 736f2cc4dfadf05d6080d8231096a573dbe82ea7 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 10:40:19 +0100 Subject: [PATCH 12/17] use init to fix CI --- test/runtests.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index 70f5cabe..9174800a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -327,10 +327,10 @@ end; # scheduler isa Symbol for s in (:dynamic, :static, :serial, :greedy) - @test tmapreduce(sin, +, 1:100_000; scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; scheduler=s, init=0.0) ≈ res_tmr end for s in (:dynamic, :static, :greedy) - @test tmapreduce(sin, +, 1:100_000; ntasks=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s, init=0.0) ≈ res_tmr end for s in (:dynamic, :static) @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr From 3fcfc619b55dea5ebcf04658b3b3369c1ca03b78 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 11:13:38 +0100 Subject: [PATCH 13/17] readme/index.md example --- README.md | 31 +++++++++++++++++-------------- docs/src/index.md | 31 +++++++++++++++++-------------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 612ef4ec..457e7e81 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,13 @@ focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism), tha ## Example ```julia -using OhMyThreads +using OhMyThreads: tmapreduce, @tasks +using BenchmarkTools: @btime +using Base.Threads: nthreads # Variant 1: function API -function mc_parallel(N; kw...) - M = tmapreduce(+, 1:N; kw...) do i +function mc_parallel(N; ntasks=nthreads()) + M = tmapreduce(+, 1:N; ntasks) do i rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -50,9 +52,12 @@ function mc_parallel(N; kw...) end # Variant 2: macro API -function mc_parallel_macro(N) +function mc_parallel_macro(N; ntasks=nthreads()) M = @tasks for i in 1:N - @set reducer=+ + @set begin + reducer=+ + ntasks=ntasks + end rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -62,19 +67,17 @@ end N = 100_000_000 mc_parallel(N) # gives, e.g., 3.14159924 -using BenchmarkTools - -@show Threads.nthreads() # 5 in this example - -@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread -@btime mc_parallel($N) # using all 5 threads +@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread) +@btime mc_parallel($N) # using all threads +@btime mc_parallel_macro($N) # using all threads ``` -Timings might be something like this: +With 5 threads, timings might be something like this: ``` -447.093 ms (7 allocations: 624 bytes) -89.401 ms (66 allocations: 5.72 KiB) +417.282 ms (14 allocations: 912 bytes) +83.578 ms (38 allocations: 3.08 KiB) +83.573 ms (38 allocations: 3.08 KiB) ``` (Check out the full [Parallel Monte Carlo](https://juliafolds2.github.io/OhMyThreads.jl/stable/literate/mc/mc/) example if you like.) diff --git a/docs/src/index.md b/docs/src/index.md index 91ae22bb..2018efcc 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -14,11 +14,13 @@ to add the package to your Julia environment. ### Basic example ```julia -using OhMyThreads +using OhMyThreads: tmapreduce, @tasks +using BenchmarkTools: @btime +using Base.Threads: nthreads # Variant 1: function API -function mc_parallel(N; kw...) - M = tmapreduce(+, 1:N; kw...) do i +function mc_parallel(N; ntasks=nthreads()) + M = tmapreduce(+, 1:N; ntasks) do i rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -26,9 +28,12 @@ function mc_parallel(N; kw...) end # Variant 2: macro API -function mc_parallel_macro(N) +function mc_parallel_macro(N; ntasks=nthreads()) M = @tasks for i in 1:N - @set reducer=+ + @set begin + reducer=+ + ntasks=ntasks + end rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -38,19 +43,17 @@ end N = 100_000_000 mc_parallel(N) # gives, e.g., 3.14159924 -using BenchmarkTools - -@show Threads.nthreads() # 5 in this example - -@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread -@btime mc_parallel($N) # using all 5 threads +@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread) +@btime mc_parallel($N) # using all threads +@btime mc_parallel_macro($N) # using all threads ``` -Timings might be something like this: +With 5 threads, timings might be something like this: ``` -447.093 ms (7 allocations: 624 bytes) -89.401 ms (66 allocations: 5.72 KiB) +417.282 ms (14 allocations: 912 bytes) +83.578 ms (38 allocations: 3.08 KiB) +83.573 ms (38 allocations: 3.08 KiB) ``` (Check out the full [Parallel Monte Carlo](@ref) example if you like.) From f7eaab12a6867a6a04677b2a7b6220ecf88c9987 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 11:24:48 +0100 Subject: [PATCH 14/17] default dynamic scheduler to ntasks=nthreads --- CHANGELOG.md | 1 + src/schedulers.jl | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc1f6bab..7a6782d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Version 0.5.0 - ![Breaking][badge-breaking] `DynamicScheduler` and `StaticScheduler` don't support `nchunks=0` or `chunksize=0` any longer. Instead, chunking can now be turned off via an explicit new keyword argument `chunking=false`. - ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed). - ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped. +- ![BREAKING][badge-breaking] The default value for `ntasks`/`nchunks` for `DynamicScheduler` has been changed from `2*nthreads()` to `nthreads()`. With the new value we now align with `@threads :dynamic`. The old value wasn't giving good load balancing anyways and choosing a higher value penalizes uniform use cases even more. To get the old behavior, set `nchunks=2*nthreads()`. Version 0.4.6 ------------- diff --git a/src/schedulers.jl b/src/schedulers.jl index 1009b703..bc87cc5f 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -49,7 +49,7 @@ with other multithreaded code. ## Keyword arguments: -- `nchunks::Integer` or `ntasks::Integer` (default `2 * nthreads(threadpool)`): +- `nchunks::Integer` or `ntasks::Integer` (default `nthreads(threadpool)`): * Determines the number of chunks (and thus also the number of parallel tasks). * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing. * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. @@ -106,7 +106,7 @@ function DynamicScheduler(; else # only choose nchunks default if chunksize hasn't been specified if !isgiven(nchunks) && !isgiven(chunksize) && !isgiven(ntasks) - nchunks = 2 * nthreads(threadpool) + nchunks = nthreads(threadpool) chunksize = -1 else if isgiven(nchunks) && isgiven(ntasks) From 7c04a3c171d93ed1b9bac5c30c7e9254a3e8b6df Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 12:02:41 +0100 Subject: [PATCH 15/17] doc/examples update --- docs/src/literate/integration/integration.jl | 3 ++- docs/src/literate/integration/integration.md | 19 ++++++++++--- docs/src/literate/juliaset/juliaset.jl | 8 +++--- docs/src/literate/juliaset/juliaset.md | 18 ++++++------- docs/src/literate/mc/mc.jl | 4 +-- docs/src/literate/mc/mc.md | 28 ++++++++++---------- docs/src/translation.md | 6 ++--- 7 files changed, 49 insertions(+), 37 deletions(-) diff --git a/docs/src/literate/integration/integration.jl b/docs/src/literate/integration/integration.jl index 0ec74dcb..8b5ef1d0 100644 --- a/docs/src/literate/integration/integration.jl +++ b/docs/src/literate/integration/integration.jl @@ -29,7 +29,6 @@ end # interval, as a multiple of the number of available Julia threads. using Base.Threads: nthreads -@show nthreads() N = nthreads() * 1_000_000 @@ -82,3 +81,5 @@ using BenchmarkTools # Because the problem is trivially parallel - all threads to the same thing and don't need # to communicate - we expect an ideal speedup of (close to) the number of available threads. + +nthreads() diff --git a/docs/src/literate/integration/integration.md b/docs/src/literate/integration/integration.md index 6d979b34..59bec287 100644 --- a/docs/src/literate/integration/integration.md +++ b/docs/src/literate/integration/integration.md @@ -46,13 +46,12 @@ interval, as a multiple of the number of available Julia threads. ````julia using Base.Threads: nthreads -@show nthreads() N = nthreads() * 1_000_000 ```` ```` -5000000 +10000000 ```` Calling `trapezoidal` we do indeed find the (approximate) value of $\pi$. @@ -101,6 +100,10 @@ end # end ```` +```` +trapezoidal_parallel (generic function with 1 method) +```` + First, we check the correctness of our parallel implementation. ````julia @@ -120,14 +123,22 @@ using BenchmarkTools ```` ```` - 12.782 ms (0 allocations: 0 bytes) - 2.563 ms (37 allocations: 3.16 KiB) + 24.348 ms (0 allocations: 0 bytes) + 2.457 ms (69 allocations: 6.05 KiB) ```` Because the problem is trivially parallel - all threads to the same thing and don't need to communicate - we expect an ideal speedup of (close to) the number of available threads. +````julia +nthreads() +```` + +```` +10 +```` + --- *This page was generated using [Literate.jl](https://github.com/fredrikekre/Literate.jl).* diff --git a/docs/src/literate/juliaset/juliaset.jl b/docs/src/literate/juliaset/juliaset.jl index 33b5c3ee..3a290792 100644 --- a/docs/src/literate/juliaset/juliaset.jl +++ b/docs/src/literate/juliaset/juliaset.jl @@ -111,12 +111,12 @@ img = zeros(Int, N, N) # the load balancing of the default dynamic scheduler. The latter divides the overall # workload into tasks that can then be dynamically distributed among threads to adjust the # per-thread load. We can try to fine tune and improve the load balancing further by -# increasing the `nchunks` parameter of the scheduler, that is, creating more and smaller -# tasks. +# increasing the `ntasks` parameter of the scheduler, that is, creating more tasks with +# smaller per-task workload. using OhMyThreads: DynamicScheduler -@btime compute_juliaset_parallel!($img; scheduler=DynamicScheduler(; nchunks=N)) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; ntasks=N, scheduler=:dynamic) samples=10 evals=3; # Note that while this turns out to be a bit faster, it comes at the expense of much more # allocations. @@ -126,4 +126,4 @@ using OhMyThreads: DynamicScheduler using OhMyThreads: StaticScheduler -@btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; scheduler=:static) samples=10 evals=3; diff --git a/docs/src/literate/juliaset/juliaset.md b/docs/src/literate/juliaset/juliaset.md index 1ffb385b..eb7e9003 100644 --- a/docs/src/literate/juliaset/juliaset.md +++ b/docs/src/literate/juliaset/juliaset.md @@ -121,9 +121,9 @@ img = zeros(Int, N, N) ```` ```` -nthreads() = 5 - 138.157 ms (0 allocations: 0 bytes) - 40.373 ms (67 allocations: 6.20 KiB) +nthreads() = 10 + 131.295 ms (0 allocations: 0 bytes) + 31.422 ms (68 allocations: 6.09 KiB) ```` @@ -135,17 +135,17 @@ As stated above, the per-pixel computation is non-uniform. Hence, we do benefit the load balancing of the default dynamic scheduler. The latter divides the overall workload into tasks that can then be dynamically distributed among threads to adjust the per-thread load. We can try to fine tune and improve the load balancing further by -increasing the `nchunks` parameter of the scheduler, that is, creating more and smaller -tasks. +increasing the `ntasks` parameter of the scheduler, that is, creating more tasks with +smaller per-task workload. ````julia using OhMyThreads: DynamicScheduler -@btime compute_juliaset_parallel!($img; scheduler=DynamicScheduler(; nchunks=N)) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; ntasks=N, scheduler=:dynamic) samples=10 evals=3; ```` ```` - 31.751 ms (12011 allocations: 1.14 MiB) + 17.438 ms (12018 allocations: 1.11 MiB) ```` @@ -158,11 +158,11 @@ To quantify the impact of load balancing we can opt out of dynamic scheduling an ````julia using OhMyThreads: StaticScheduler -@btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; scheduler=:static) samples=10 evals=3; ```` ```` - 63.147 ms (37 allocations: 3.26 KiB) + 30.097 ms (73 allocations: 6.23 KiB) ```` diff --git a/docs/src/literate/mc/mc.jl b/docs/src/literate/mc/mc.jl index 89bea82d..6a9abd37 100644 --- a/docs/src/literate/mc/mc.jl +++ b/docs/src/literate/mc/mc.jl @@ -74,8 +74,8 @@ using Base.Threads: nthreads using OhMyThreads: StaticScheduler -@btime mc_parallel($N) samples=10 evals=3; -@btime mc_parallel($N; scheduler = StaticScheduler()) samples=10 evals=3; +@btime mc_parallel($N; scheduler=:dynamic) samples=10 evals=3; # default +@btime mc_parallel($N; scheduler=:static) samples=10 evals=3; # ## Manual parallelization # diff --git a/docs/src/literate/mc/mc.md b/docs/src/literate/mc/mc.md index 5696e5d9..44506abb 100644 --- a/docs/src/literate/mc/mc.md +++ b/docs/src/literate/mc/mc.md @@ -34,7 +34,7 @@ mc(N) ```` ```` -3.14145748 +3.14171236 ```` ## Parallelization with `tmapreduce` @@ -69,7 +69,7 @@ mc_parallel(N) ```` ```` -3.14134792 +3.14156496 ```` Let's run a quick benchmark. @@ -86,9 +86,9 @@ using Base.Threads: nthreads ```` ```` -nthreads() = 5 - 317.745 ms (0 allocations: 0 bytes) - 88.384 ms (66 allocations: 5.72 KiB) +nthreads() = 10 + 301.636 ms (0 allocations: 0 bytes) + 41.864 ms (68 allocations: 5.81 KiB) ```` @@ -100,13 +100,13 @@ and compare the performance of static and dynamic scheduling (with default param ````julia using OhMyThreads: StaticScheduler -@btime mc_parallel($N) samples=10 evals=3; -@btime mc_parallel($N; scheduler=StaticScheduler()) samples=10 evals=3; +@btime mc_parallel($N; scheduler=:dynamic) samples=10 evals=3; # default +@btime mc_parallel($N; scheduler=:static) samples=10 evals=3; ```` ```` - 88.222 ms (66 allocations: 5.72 KiB) - 88.203 ms (36 allocations: 2.98 KiB) + 41.839 ms (68 allocations: 5.81 KiB) + 41.838 ms (68 allocations: 5.81 KiB) ```` @@ -121,7 +121,7 @@ simulation. Finally, we fetch the results and compute the average estimate for $ using OhMyThreads: @spawn, chunks function mc_parallel_manual(N; nchunks = nthreads()) - tasks = map(chunks(1:N; n = nchunks)) do idcs # TODO: replace by `tmap` once ready + tasks = map(chunks(1:N; n = nchunks)) do idcs @spawn mc(length(idcs)) end pi = sum(fetch, tasks) / nchunks @@ -132,7 +132,7 @@ mc_parallel_manual(N) ```` ```` -3.1414609999999996 +3.14180504 ```` And this is the performance: @@ -142,7 +142,7 @@ And this is the performance: ```` ```` - 64.042 ms (31 allocations: 2.80 KiB) + 30.224 ms (65 allocations: 5.70 KiB) ```` @@ -161,8 +161,8 @@ end samples=10 evals=3; ```` ```` - 88.041 ms (0 allocations: 0 bytes) - 63.427 ms (0 allocations: 0 bytes) + 41.750 ms (0 allocations: 0 bytes) + 30.148 ms (0 allocations: 0 bytes) ```` diff --git a/docs/src/translation.md b/docs/src/translation.md index f4775576..a5a28f7d 100644 --- a/docs/src/translation.md +++ b/docs/src/translation.md @@ -45,7 +45,7 @@ end # or -tforeach(1:10; scheduler=StaticScheduler()) do i +tforeach(1:10; scheduler=:static) do i println(i) end ``` @@ -62,13 +62,13 @@ end ```julia # OhMyThreads @tasks for i in 1:10 - @set scheduler=DynamicScheduler(; nchunks=0) # turn off chunking + @set chunking=false println(i) end # or -tforeach(1:10; scheduler=DynamicScheduler(; nchunks=0)) do i +tforeach(1:10; chunking=false) do i println(i) end ``` From f290096a97e54c3dd1d37bcf8ed7c68d125a3ded Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 13:39:40 +0100 Subject: [PATCH 16/17] update tls docs --- docs/src/literate/tls/tls.jl | 64 ++++++++-------------- docs/src/literate/tls/tls.md | 101 +++++++++++++---------------------- 2 files changed, 59 insertions(+), 106 deletions(-) diff --git a/docs/src/literate/tls/tls.jl b/docs/src/literate/tls/tls.jl index 43e1782c..20c77adc 100644 --- a/docs/src/literate/tls/tls.jl +++ b/docs/src/literate/tls/tls.jl @@ -172,8 +172,8 @@ using OhMyThreads: @tasks function matmulsums_tlv_macro(As, Bs; kwargs...) N = size(first(As), 1) - @tasks for i in eachindex(As,Bs) - @set collect=true + @tasks for i in eachindex(As, Bs) + @set collect = true @local C = Matrix{Float64}(undef, N, N) mul!(C, As[i], Bs[i]) sum(C) @@ -184,8 +184,8 @@ res_tlv_macro = matmulsums_tlv_macro(As, Bs) res ≈ res_tlv_macro # Here, `@local` expands to a pattern similar to the `TaskLocalValue` one above, although automatically -# infers that the object's type is `Matrix{Float64}`, and it carries some optimizations (see -# [`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task local values more efficient in +# infers that the object's type is `Matrix{Float64}`, and it carries some optimizations (see +# [`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task local values more efficient in # loops which take on the order of 100ns to complete. # # @@ -211,23 +211,7 @@ sleep(2) #hide # As we can see, `matmulsums_tlv` (and `matmulsums_tlv_macro`) isn't only convenient # but also efficient: It allocates much less memory than `matmulsums_naive` and is about on # par with the manual implementation. - -# #### Tuning the scheduling # -# Since the workload is uniform, we don't need load balancing. We can thus try to improve -# the performance and reduce the number of allocations by choosing the number of chunks -# (i.e. tasks) to match the number of Julia threads. Concretely, this -# amounts to passing in `DynamicScheduler(; nchunks=nthreads())`. If we further want to -# opt-out of dynamic scheduling alltogether, we can choose the `StaticScheduler()`. - -using OhMyThreads: DynamicScheduler, StaticScheduler - -@btime matmulsums_tlv( - $As, $Bs; scheduler = $(DynamicScheduler(; nchunks = nthreads()))); -@btime matmulsums_tlv($As, $Bs; scheduler = $(StaticScheduler())); - -# Interestingly, this doesn't always lead to speedups (maybe even a slight slowdown). - # # ## Per-thread allocation # @@ -285,13 +269,14 @@ res_nu ≈ res_pt_naive # ### The quick fix (with caveats) # # A simple solution for the task-migration issue is to opt-out of dynamic scheduling with -# the `StaticScheduler()`. This scheduler statically assigns tasks to threads -# upfront without any dynamic rescheduling (the tasks are sticky and won't migrate). +# `scheduler=:static` (or `scheduler=StaticScheduler()`). This scheduler statically +# assigns tasks to threads upfront without any dynamic rescheduling +# (the tasks are sticky and won't migrate). # function matmulsums_perthread_static(As, Bs) N = size(first(As), 1) Cs = [Matrix{Float64}(undef, N, N) for _ in 1:nthreads()] - tmap(As, Bs; scheduler = StaticScheduler()) do A, B + tmap(As, Bs; scheduler = :static) do A, B C = Cs[threadid()] mul!(C, A, B) sum(C) @@ -336,27 +321,21 @@ res_nu ≈ res_pt_channel # ### Benchmark # # Let's benchmark the variants above and compare them to the task-local implementation. -# We want to look at both `nchunks = nthreads()` and `nchunks > nthreads()`, the latter +# We want to look at both `ntasks = nthreads()` and `ntasks > nthreads()`, the latter # of which gives us dynamic load balancing. # -## no load balancing because nchunks == nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +## no load balancing because ntasks == nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu); @btime matmulsums_perthread_static($As_nu, $Bs_nu); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu); -## load balancing because nchunks > nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); +## load balancing because ntasks > nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 2 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 2 * nthreads()); -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 10 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 10 * nthreads()); # # Note that the runtime of `matmulsums_perthread_channel` improves with increasing number @@ -371,14 +350,15 @@ res_nu ≈ res_pt_channel # a limited number of tasks (e.g. `nthreads()`) with task-local buffers. # using OhMyThreads: tmapreduce + function matmulsums_perthread_channel_flipped(As, Bs; ntasks = nthreads()) N = size(first(As), 1) - chnl = Channel{Int}(length(As); spawn=true) do chnl + chnl = Channel{Int}(length(As); spawn = true) do chnl for i in 1:length(As) put!(chnl, i) end end - tmapreduce(vcat, 1:ntasks; scheduler = DynamicScheduler(; nchunks = 0)) do _ # we turn chunking off + tmapreduce(vcat, 1:ntasks; chunking=false) do _ # we turn chunking off local C = Matrix{Float64}(undef, N, N) map(chnl) do i # implicitly takes the values from the channel (parallel safe) A = As[i] @@ -408,7 +388,7 @@ sort(res_nu) ≈ sort(res_channel_flipped) # give [Bumper.jl](https://github.com/MasonProtter/Bumper.jl) a try. Essentially, it # allows you to *bring your own stacks*, that is, task-local bump allocators which you can # dynamically allocate memory to, and reset them at the end of a code block, just like -# Julia's stack. +# Julia's stack. # Be warned though that Bumper.jl is (1) a rather young package with (likely) some bugs # and (2) can easily lead to segfaults when used incorrectly. If you can live with the # risk, Bumper.jl is especially useful for causes we don't know ahead of time how large @@ -430,7 +410,7 @@ function matmulsums_bumper(As, Bs) end res_bumper = matmulsums_bumper(As, Bs); -sort(res_nu) ≈ sort(res_bumper) +sort(res) ≈ sort(res_bumper) @btime matmulsums_bumper($As, $Bs); diff --git a/docs/src/literate/tls/tls.md b/docs/src/literate/tls/tls.md index 2aa1140f..23894478 100644 --- a/docs/src/literate/tls/tls.md +++ b/docs/src/literate/tls/tls.md @@ -222,8 +222,8 @@ using OhMyThreads: @tasks function matmulsums_tlv_macro(As, Bs; kwargs...) N = size(first(As), 1) - @tasks for i in eachindex(As,Bs) - @set collect=true + @tasks for i in eachindex(As, Bs) + @set collect = true @local C = Matrix{Float64}(undef, N, N) mul!(C, As[i], Bs[i]) sum(C) @@ -239,8 +239,8 @@ true ```` Here, `@local` expands to a pattern similar to the `TaskLocalValue` one above, although automatically -infers that the object's type is `Matrix{Float64}`, and it carries some optimizations (see -[`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task local values more efficient in +infers that the object's type is `Matrix{Float64}`, and it carries some optimizations (see +[`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task local values more efficient in loops which take on the order of 100ns to complete. @@ -263,11 +263,11 @@ using BenchmarkTools ```` nthreads() = 10 - 49.077 ms (3 allocations: 518.17 KiB) - 32.658 ms (1691 allocations: 384.08 MiB) - 9.513 ms (200 allocations: 10.08 MiB) - 9.588 ms (236 allocations: 10.05 MiB) - 9.650 ms (239 allocations: 10.05 MiB) + 61.314 ms (3 allocations: 518.17 KiB) + 22.122 ms (1621 allocations: 384.06 MiB) + 7.620 ms (204 allocations: 10.08 MiB) + 7.702 ms (126 allocations: 5.03 MiB) + 7.600 ms (127 allocations: 5.03 MiB) ```` @@ -275,29 +275,6 @@ As we can see, `matmulsums_tlv` (and `matmulsums_tlv_macro`) isn't only convenie but also efficient: It allocates much less memory than `matmulsums_naive` and is about on par with the manual implementation. -#### Tuning the scheduling - -Since the workload is uniform, we don't need load balancing. We can thus try to improve -the performance and reduce the number of allocations by choosing the number of chunks -(i.e. tasks) to match the number of Julia threads. Concretely, this -amounts to passing in `DynamicScheduler(; nchunks=nthreads())`. If we further want to -opt-out of dynamic scheduling alltogether, we can choose the `StaticScheduler()`. - -````julia -using OhMyThreads: DynamicScheduler, StaticScheduler - -@btime matmulsums_tlv( - $As, $Bs; scheduler = $(DynamicScheduler(; nchunks = nthreads()))); -@btime matmulsums_tlv($As, $Bs; scheduler = $(StaticScheduler())); -```` - -```` - 9.561 ms (124 allocations: 5.03 MiB) - 9.618 ms (124 allocations: 5.03 MiB) - -```` - -Interestingly, this doesn't always lead to speedups (maybe even a slight slowdown). ## Per-thread allocation @@ -361,14 +338,15 @@ above, but you can't rely on it!) ### The quick fix (with caveats) A simple solution for the task-migration issue is to opt-out of dynamic scheduling with -the `StaticScheduler()`. This scheduler statically assigns tasks to threads -upfront without any dynamic rescheduling (the tasks are sticky and won't migrate). +`scheduler=:static` (or `scheduler=StaticScheduler()`). This scheduler statically +assigns tasks to threads upfront without any dynamic rescheduling +(the tasks are sticky and won't migrate). ````julia function matmulsums_perthread_static(As, Bs) N = size(first(As), 1) Cs = [Matrix{Float64}(undef, N, N) for _ in 1:nthreads()] - tmap(As, Bs; scheduler = StaticScheduler()) do A, B + tmap(As, Bs; scheduler = :static) do A, B C = Cs[threadid()] mul!(C, A, B) sum(C) @@ -423,37 +401,31 @@ true ### Benchmark Let's benchmark the variants above and compare them to the task-local implementation. -We want to look at both `nchunks = nthreads()` and `nchunks > nthreads()`, the latter +We want to look at both `ntasks = nthreads()` and `ntasks > nthreads()`, the latter of which gives us dynamic load balancing. ````julia -# no load balancing because nchunks == nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +# no load balancing because ntasks == nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu); @btime matmulsums_perthread_static($As_nu, $Bs_nu); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu); -# load balancing because nchunks > nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); +# load balancing because ntasks > nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 2 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 2 * nthreads()); -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 10 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 10 * nthreads()); ```` ```` - 149.095 ms (124 allocations: 5.03 MiB) - 175.355 ms (107 allocations: 5.02 MiB) - 148.470 ms (112 allocations: 5.02 MiB) - 137.638 ms (235 allocations: 10.05 MiB) - 135.293 ms (183 allocations: 5.04 MiB) - 124.591 ms (1116 allocations: 50.13 MiB) - 124.716 ms (744 allocations: 5.10 MiB) + 170.563 ms (126 allocations: 5.03 MiB) + 165.647 ms (108 allocations: 5.02 MiB) + 172.216 ms (114 allocations: 5.02 MiB) + 108.662 ms (237 allocations: 10.05 MiB) + 114.673 ms (185 allocations: 5.04 MiB) + 97.933 ms (1118 allocations: 50.13 MiB) + 96.868 ms (746 allocations: 5.10 MiB) ```` @@ -470,14 +442,15 @@ a limited number of tasks (e.g. `nthreads()`) with task-local buffers. ````julia using OhMyThreads: tmapreduce + function matmulsums_perthread_channel_flipped(As, Bs; ntasks = nthreads()) N = size(first(As), 1) - chnl = Channel{Int}(length(As); spawn=true) do chnl + chnl = Channel{Int}(length(As); spawn = true) do chnl for i in 1:length(As) put!(chnl, i) end end - tmapreduce(vcat, 1:ntasks; scheduler = DynamicScheduler(; nchunks = 0)) do _ # we turn chunking off + tmapreduce(vcat, 1:ntasks; chunking=false) do _ # we turn chunking off local C = Matrix{Float64}(undef, N, N) map(chnl) do i # implicitly takes the values from the channel (parallel safe) A = As[i] @@ -511,9 +484,9 @@ Quick benchmark: ```` ```` - 121.715 ms (163 allocations: 5.07 MiB) - 122.457 ms (267 allocations: 10.11 MiB) - 122.374 ms (1068 allocations: 50.37 MiB) + 94.389 ms (170 allocations: 5.07 MiB) + 94.580 ms (271 allocations: 10.10 MiB) + 94.768 ms (1071 allocations: 50.41 MiB) ```` @@ -546,13 +519,13 @@ function matmulsums_bumper(As, Bs) end res_bumper = matmulsums_bumper(As, Bs); -sort(res_nu) ≈ sort(res_bumper) +sort(res) ≈ sort(res_bumper) @btime matmulsums_bumper($As, $Bs); ```` ```` - 9.865 ms (254 allocations: 50.92 KiB) + 7.814 ms (134 allocations: 27.92 KiB) ```` From a9a128a3a102f82c3455c45af577110922bb8446 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 13 Mar 2024 17:22:09 +0100 Subject: [PATCH 17/17] collapse docs --- docs/make.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/make.jl b/docs/make.jl index f2718244..4cb90c74 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -28,7 +28,7 @@ makedocs(; ] ], repo = "https://github.com/JuliaFolds2/OhMyThreads.jl/blob/{commit}{path}#{line}", - format = Documenter.HTML(repolink = "https://github.com/JuliaFolds2/OhMyThreads.jl")) + format = Documenter.HTML(repolink = "https://github.com/JuliaFolds2/OhMyThreads.jl"; collapselevel = 1)) if ci @info "Deploying documentation to GitHub"